简介 HashMap 和 LinkedHashMap 都是线程不安全的,要实现使用线程安全的Map,可以有以下几个选项:
Hashtable :Hashtable是遗留类,很多映射的常用功能与HashMap类似,不同的是它承自Dictionary类,并且是线程安全的,任一时间只有一个线程能写Hashtable(synchrionized
),并发性不如ConcurrentHashMap,因为ConcurrentHashMap引入了分段锁。Hashtable不建议在新代码中使用,不需要线程安全的场合可以用HashMap替换,需要线程安全的场合可以用ConcurrentHashMap替换。
Collections.synchronizedMap(Map<K,V> m):通过集合工具类可以将一个Map转换成线程安全的实现,它本质上是通过一个包装类,然后把所有的功能都委托给 Map 实现,而且包装类是基于 synchrionized
关键字来保证线程安全的,底层使用互斥锁,同一时间只能由持有锁的线程访问,其他竞争线程进入睡眠状态,性能和吞吐量都比较差。
ConcurrentHashMap:支持并发操作,性能是这几个里面最好的,JDK 1.7 和 JDK 1.8 的实现不同,1.7基于 Segment 实现,1.8抛弃了 Segment,使用 Synchronized 和 CAS 进行并发操作,同时引入了红黑树。
JDK 1.7 版的 ConcurrentHashMap 整个 ConcurrentHashMap 由一个个 Segment 组成,Segment 代表”部分“或”一段“的意思,所以很多地方都会将其描述为分段锁 。下文中用“槽 ”来代表一个 segment。
简单理解就是,ConcurrentHashMap 是一个 Segment 数组,Segment 通过继承 ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。
concurrencyLevel :并行级别、并发数、Segment 数,怎么翻译不重要,理解它。默认是 16,也就是说 ConcurrentHashMap 有 16 个 Segments,所以理论上,这个时候,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的 。
再具体到每个 Segment 内部,其实每个 Segment 很像之前介绍的 HashMap,不过它要保证线程安全,所以处理起来要麻烦些。
JDK 1.7 的结构图
如图所示,是由 Segment 数组、HashEntry 组成,和 HashMap 一样,仍然是数组加链表。
它的核心成员变量:
1 2 3 4 5 6 7 final Segment<K,V>[] segments; transient Set<K> keySet; transient Set<Map.Entry<K,V>> entrySet;
Segment 是 ConcurrentHashMap 的一个内部类,主要的组成如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 static final class Segment <K ,V > extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L ; transient volatile HashEntry<K,V>[] table; transient int count; transient int modCount; transient int threshold; final float loadFactor; }
看看其中 HashEntry 的组成:
1 2 3 4 5 6 7 8 9 static final class HashEntry <K ,V > { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; }
和 HashMap 非常类似,唯一的区别就是其中的核心数据如 value ,以及链表都是 volatile 修饰的,保证了获取时的可见性。
原理上来说:ConcurrentHashMap 采用了分段锁技术,其中 Segment 继承于 ReentrantLock。不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理,理论上 ConcurrentHashMap 支持 CurrencyLevel (Segment 数组数量)的线程并发。每当一个线程占用锁访问一个 Segment 时,不会影响到其他的 Segment。
Put() 方法 1 2 3 4 5 6 7 8 9 10 11 public V put (K key, V value) { Segment<K,V> s; if (value == null ) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null ) s = ensureSegment(j); return s.put(key, hash, value, false ); }
首先是通过 key 定位到 Segment,之后在对应的 Segment 中进行具体的 put。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 final V put (K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1 ) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null ) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break ; } e = e.next; } else { if (node != null ) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1 ; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null ; break ; } } } finally { unlock(); } return oldValue; }
虽然 HashEntry 中的 value 是用 volatile 关键词修饰的,但是并不能保证并发的原子性,所以 put 操作时仍然需要加锁处理。
首先第一步的时候会尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用 scanAndLockForPut()
自旋获取锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private HashEntry<K,V> scanAndLockForPut (K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this , hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null ; int retries = -1 ; while (!tryLock()) { HashEntry<K,V> f; if (retries < 0 ) { if (e == null ) { if (node == null ) node = new HashEntry<K,V>(hash, key, value, null ); retries = 0 ; } else if (key.equals(e.key)) retries = 0 ; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break ; } else if ((retries & 1 ) == 0 && (f = entryForHash(this , hash)) != first) { e = first = f; retries = -1 ; } } return node; }
尝试自旋获取锁。
如果重试的次数达到了 MAX_SCAN_RETRIES
则改为阻塞锁获取,保证能获取成功。
整个put 的流程。
将当前 Segment 中的 table 通过 key 的 hashcode 定位到 HashEntry。
获取锁,遍历该 HashEntry,如果不为空则判断传入的 key 和当前遍历的 key 是否相等,相等则覆盖旧的 value。
不为空则需要新建一个 HashEntry 并加入到 Segment 中,同时会先判断是否需要扩容。
最后会解除在 1 中所获取当前 Segment 的锁。
Get() 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public V get (Object key) { Segment<K,V> s; HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null ) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long )(((tab.length - 1 ) & h)) << TSHIFT) + TBASE); e != null ; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null ; }
get()流程:
计算 hash 值,找到 segment 数组中的具体位置,或我们前面用的“槽”
槽中也是一个数组,根据 hash 找到数组中具体的位置
到这里是链表了,顺着链表进行查找即可
由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。
ConcurrentHashMap 的 get 方法是非常高效的,因为整个过程都不需要加锁 。
并发问题 现在我们已经说完了 put 过程和 get 过程,我们可以看到 get 过程中是没有加锁的,那自然我们就需要去考虑并发问题。
添加节点的操作 put 和删除节点的操作 remove 都是要加 segment 上的独占锁的,所以它们之间自然不会有问题,我们需要考虑的问题就是 get 的时候在同一个 segment 中发生了 put 或 remove 操作。
put 操作的线程安全性。
初始化槽,这个我们之前就说过了,使用了 CAS 来初始化 Segment 中的数组。
添加节点到链表的操作是插入到表头的,所以,如果这个时候 get 操作在链表遍历的过程已经到了中间,是不会影响的。当然,另一个并发问题就是 get 操作在 put 之后,需要保证刚刚插入表头的节点被读取,这个依赖于 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
扩容。扩容是新创建了数组,然后进行迁移数据,最后面将 newTable 设置给属性 table。所以,如果 get 操作此时也在进行,那么也没关系,如果 get 先行,那么就是在旧的 table 上做查询操作;而 put 先行,那么 put 操作的可见性保证就是 table 使用了 volatile 关键字。
remove 操作的线程安全性。
remove 操作我们没有分析源码,所以这里说的读者感兴趣的话还是需要到源码中去求实一下的。
get 操作需要遍历链表,但是 remove 操作会”破坏”链表。
如果 remove 破坏的节点 get 操作已经过去了,那么这里不存在任何问题。
如果 remove 先破坏了一个节点,分两种情况考虑。 1、如果此节点是头结点,那么需要将头结点的 next 设置为数组该位置的元素,table 虽然使用了 volatile 修饰,但是 volatile 并不能提供数组内部操作的可见性保证,所以源码中使用了 UNSAFE 来操作数组,请看方法 setEntryAt。2、如果要删除的节点不是头结点,它会将要删除节点的后继节点接到前驱节点中,这里的并发保证就是 next 属性是 volatile 的。
JDK 1.8 的 ConcurrentHashMap Java8 放弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构来实现,并发控制使用Synchronized和CAS来操作,整个看起来就像是优化过且线程安全的HashMap。
根据 Java7 HashMap 的介绍,我们知道,查找的时候,根据 hash 值我们能够快速定位到数组的具体下标,但是之后的话,需要顺着链表一个个比较下去才能找到我们需要的,时间复杂度取决于链表的长度,为 O(n) 。
为了降低这部分的开销,在 Java8 中,当链表中的元素达到了 8 个时,会将链表转换为红黑树,在这些位置进行查找的时候可以降低时间复杂度为 O(logn) 。
示意图:
Java7 中使用 Entry 来代表每个 HashMap 中的数据节点,Java8 中使用 Node ,基本没有区别,都是 key,value,hash 和 next 这四个属性,不过,Node 只能用于链表的情况,红黑树的情况需要使用 TreeNode 。
我们根据数组元素中,第一个节点数据类型是 Node 还是 TreeNode 来判断该位置下是链表还是红黑树的。
继承关系:
通过继承关系,可以发现,ConcurrentHashMap并没有继承HashMap,而是单独实现了ConcurrentMap这个接口,其余的跟HashMap是一样的
成员变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private static final int MAXIMUM_CAPACITY = 1 << 30 ; private static final int DEFAULT_CAPACITY = 16 ; private static final int DEFAULT_CONCURRENCY_LEVEL = 16 ; private static final float LOAD_FACTOR = 0.75f ; static final int TREEIFY_THRESHOLD = 8 ; static final int UNTREEIFY_THRESHOLD = 6 ; static final int MIN_TREEIFY_CAPACITY = 64 ; transient volatile Node<K,V>[] table; private transient volatile Node<K,V>[] nextTable; private transient volatile long baseCount; private transient volatile int sizeCtl; static final int MOVED = -1 ; static final int TREEBIN = -2 ;
TreeNode结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 static final class TreeNode <K ,V > extends Node <K ,V > { TreeNode<K,V> parent; TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; boolean red; TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) { super (hash, key, val, next); this .parent = parent; } Node<K,V> find (int h, Object k) { return findTreeNode(h, k, null ); } final TreeNode<K,V> findTreeNode (int h, Object k, Class<?> kc) { }
注释说Nodes for use in TreeBins,也就是说这个TreeNode是用来服务TreeBin的,那么继续看Treebins
TreeBins 这个通过注释可以知道,是用来包装TreeNode的,然后作为红黑树的树节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 static final class TreeBin <K ,V > extends Node <K ,V > { TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; static final int WRITER = 1 ; static final int WAITER = 2 ; static final int READER = 4 ; TreeBin(TreeNode<K,V> b) { super (TREEBIN, null , null , null ); this .first = b; TreeNode<K,V> r = null ; for (TreeNode<K,V> x = b, next; x != null ; x = next) { next = (TreeNode<K,V>)x.next; x.left = x.right = null ; if (r == null ) { x.parent = null ; x.red = false ; r = x; } else { K k = x.key; int h = x.hash; Class<?> kc = null ; for (TreeNode<K,V> p = r;;) { int dir, ph; K pk = p.key; if ((ph = p.hash) > h) dir = -1 ; else if (ph < h) dir = 1 ; else if ((kc == null && (kc = comparableClassFor(k)) == null ) || (dir = compareComparables(kc, k, pk)) == 0 ) dir = tieBreakOrder(k, pk); TreeNode<K,V> xp = p; if ((p = (dir <= 0 ) ? p.left : p.right) == null ) { x.parent = xp; if (dir <= 0 ) xp.left = x; else xp.right = x; r = balanceInsertion(r, x); break ; } } } } this .root = r; assert checkInvariants (root) ; } }
ForwardingNode ForwardingNode有两个功能,一个是用来指向nextTable节点,便于将原有的table数组复制到新的table中,另外一个是用来占位空节点,告诉其他节点这个节点已经被处理过了,自动遍历后面的节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 static final class ForwardingNode <K ,V > extends Node <K ,V > { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super (MOVED, null , null , null ); this .nextTable = tab; } Node<K,V> find (int h, Object k) { outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1 ) & h)) == null ) return null ; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0 ) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null ) return null ; } } } }
CAS 操作 tabAt 1 2 3 static final <K,V> Node<K,V> tabAt (Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long )i << ASHIFT) + ABASE); }
获得在i索引位置上的Node节点
casTabAt 1 2 3 static final <K,V> boolean casTabAt (Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long )i << ASHIFT) + ABASE, c, v); }
1.利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少
2.在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改
setTabAt 1 2 3 static final <K,V> void setTabAt (Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long )i << ASHIFT) + ABASE, v); }
利用volatile方法设置节点位置的值
transfer 方法 transfer实际上执行的就是table数组的扩容操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 private final void transfer (Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1 ) ? (n >>> 3 ) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; if (nextTab == null ) { try { @SuppressWarnings ("unchecked" ) Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1 ]; nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return ; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true ; boolean finishing = false ; for (int i = 0 , bound = 0 ;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false ; else if ((nextIndex = transferIndex) <= 0 ) { i = -1 ; advance = false ; } else if (U.compareAndSwapInt (this , TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0 ))) { bound = nextBound; i = nextIndex - 1 ; advance = false ; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null ; table = nextTab; sizeCtl = (n << 1 ) - (n >>> 1 ); return ; } if (U.compareAndSwapInt(this , SIZECTL, sc = sizeCtl, sc - 1 )) { if ((sc - 2 ) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return ; finishing = advance = true ; i = n; } } else if ((f = tabAt(tab, i)) == null ) advance = casTabAt(tab, i, null , fwd); else if ((fh = f.hash) == MOVED) advance = true ; else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0 ) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null ; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0 ) { ln = lastRun; hn = null ; } else { hn = lastRun; ln = null ; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0 ) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null , loTail = null ; TreeNode<K,V> hi = null , hiTail = null ; int lc = 0 , hc = 0 ; for (Node<K,V> e = t.first; e != null ; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null , null ); if ((h & n) == 0 ) { if ((p.prev = loTail) == null ) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null ) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0 ) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0 ) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } } } } } }
1.根据当前数组长度n,新建一个两倍长度的数组nextTable;
2.初始化ForwardingNode节点,其中保存了新数组nextTable的引用,在处理完每个槽位的节点之后当做占位节点,表示该槽位已经处理过了
3.通过for自循环处理每个槽位中的链表元素,默认advace为真,通过CAS设置transferIndex属性值
4.如果槽位中没有节点,则通过CAS插入在第二步中初始化的ForwardingNode节点,用于告诉其它线程该槽位已经处理过了
5.如果当前Node已经被线程A处理了,那么线程B处理到这个节点时,取到该节点的hash值应该为MOVED,值为-1,则直接跳过,继续处理下一个Node
6.处理不为null的节点,如果是链表结构,先定义两个变量节点ln和hn,通过CAS把ln链表设置到新数组的i位置,hn链表设置到i+n的位置
7.如果该槽位是红黑树结构,同样将节点分成两类,通过CAS插入到nextTable中
helpTransfer 如果一个线程在进行扩容的时候中断了,但是此时nextTable已经创建了,那么他就会调用helpTransfer方法,帮助扩容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null ) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) { transfer(tab, nextTab); break ; } } return nextTab; } return table; }
这是一个协助扩容的方法。这个方法被调用的时候,当前ConcurrentHashMap一定已经有了nextTable对象,首先拿到这个nextTable对象,调用上面讲到的transfer方法来进行扩容。
Put() 方法 1 2 3 public V put (K key, V value) { return putVal(key, value, false ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null ,new Node<K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node<K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; }
下面分解一下步骤:
1.计算记录的key的hashCode,然后通过hashcode计算table的index位置
2.获取table[index],为null直接采用 casTabAt() 插入一条新的记录
3.不为null,则锁住根节点,然后通过节点的hashcode值来区分是链表还是红黑树
4.遍历链表或者红黑树,找到相应的节点,进行更新操作
5.如果是更新操作,则不需要进行 transfer() 操作,否则判断一下更新之后的size有没有超过阈值,主要是判断链表的节点数跟table的数组长度
如何保证线程安全
当对table中的某个节点进行写操作的时候,如果为null,直接进行插入,注意如果不为null,则用syncronized关键字锁住当前节点,然后进行写操作。为什么为空的时候插入不需要上锁,因为ConcurrentHashMap采用的是CAS的乐观锁机制,如果有线程在进行修改,就会在外面等待,然后过一会儿再过来查询一下,这样可以保证高效并发。
Get() 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public V get (Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0 ) return (p = e.find(h, key)) != null ? p.val : null ; while ((e = e.next) != null ) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null ; }
主要流程:
1.通过计算key的hash值进而计算key在table中的index
2.拿到table[index],为null,直接返回null
3.如果table[index]不为null,通过Node的hash值来判断是链表还是红黑树
4.遍历到相应的value之后直接返回,否则返回null
如何保证线程安全
首先当多个线程同时读取的时候,ConcurrentHashMap给每个Node节点加了volatile关键字修饰来保证可见性。