final V putVal(K key, V value, boolean onlyIfAbsent) {// key和value不能为NULLif (key == null || value == null) throw new NullPointerException();// key所对应的hashcodeint hash = spread(key.hashCode());int binCount = 0;// 通过自旋的方式来插入数据for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 如果数组为空,则初始化if (tab == null || (n = tab.length) == 0)tab = initTable();// 算出数组下标,然后获取数组上对应下标的元素,如果为null,则通过cas来赋值// 如果赋值成功,则退出自旋,否则是因为数组上当前位置已经被其他线程赋值了,// 所以失败,所以进入下一次循环后就不会再符合这个判断了else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break; // no lock when adding to empty bin}// 如果数组当前位置的元素的hash值等于MOVED,表示正在进行扩容,当前线程也进行扩容else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;// 对数组当前位置的元素进行加锁synchronized (f) {// 加锁后检查一下tab[i]上的元素是否发生了变化,如果发生了变化则直接进入下一次循环// 如果没有发生变化,则开始插入新key,valueif (tabAt(tab, i) == f) {// 如果tab[i]的hashcode是大于等于0的,那么就将元素插入到链表尾部if (fh >= 0) {binCount = 1; // binCount表示当前链表上节点的个数,不包括新节点for (Node<K,V> e = f;; ++binCount) {K ek;// 遍历链表的过程中比较key是否存在一样的if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;// 插入到尾节点if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}// 如果tab[i]是TreeBin类型,表示tab[i]位置是一颗红黑树else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {// 在新插入元素的时候,如果不算这个新元素链表上的个数大于等于8了,那么就要进行树化// 比如binCount为8,那么此时tab[i]上的链表长度为9,因为包括了新元素if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);// 存在key相同的元素if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null;}
// 初始化数组// 一个线程在put时如果发现tab是空的,则需要进行初始化private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {// sizeCtl默认等于0,如果为-1表示有其他线程正在进行初始化,本线程不竞争CPU// yield表示放弃CPU,线程重新进入就绪状态,重新竞争CPU,如果竞争不到就等,如果竞争到了又继续循环if ((sc = sizeCtl) < 0)Thread.yield(); // lost initialization race; just spin// 通过cas将sizeCtl改为-1,如果改成功了则进行后续操作// 如果没有成功,则表示有其他线程在进行初始化或已经把数组初始化好了else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {// 当前线程将sizeCtl改为-1后,再一次判断数组是否为空// 会不会存在一个线程进入到此处之后,数组不为空了?if ((tab = table) == null || tab.length == 0) {// 如果在构造ConcurrentHashMap时指定了数组初始容量,那么sizeCtl就为初始化容量int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;// 如果n为16,那么就是16-4=12// sc = 3*n/4 = 0.75n, 初始化完成后sizeCtl的数字表示扩容的阈值sc = n - (n >>> 2);}} finally {// 此时sc为阈值sizeCtl = sc;}break;}}return tab;}
private final void addCount(long x, int check) {// 先通过CAS更新baseCount(+1)// 如果更新失败则通过CAS更新CELLVALUE// 如果仍然失败则调用fullAddCount// as是一个CounterCell数组,一个CounterCell对象表示一个计数器,// 多个线程在添加元素时,手写都会尝试去更新baseCount,那么只有一个线程能更新成功,另外的线程将更新失败// 那么其他的线程就利用一个CounterCell对象来记一下数CounterCell[] as; long b, s;//if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {// 某个线程更新baseCount失败了CounterCell a; long v; int m;boolean uncontended = true;// 如果CounterCell[]是null// 或者CounterCell[]不为null的情况下CounterCell[]的长度小于1也就是等于0,// 或者CounterCell[]长度不为0的情况下随机计算一个CounterCell[]的下标,并判断此下标位置是否为空// 或者CounterCell[]中的某下标位置不为null的情况下通过cas修改CounterCell中的值失败了// 才调用fullAddCount方法,然后返回if (as == null || (m = as.length - 1) < 0 ||(a = as[ThreadLocalRandom.getProbe() & m]) == null ||!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}// 如果修改CELLVALUE成功了,这里的check就是binCount,这里为什么要判断小于等于1if (check <= 1)return;// 如果修改CELLVALUE成功了,则统计ConcurrentHashMap的元素个数s = sumCount();}if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;// 如果元素个数大于等于了阈值或-1就自旋扩容while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {// resizeStamp这个方法太难理解,反正就是返回一个数字,比如n=16,rs则=32795int rs = resizeStamp(n);// 如果sc小于0,表示已经有其他线程在进行扩容了,sc+1if (sc < 0) {// 如果全部元素已经转移完了,或者已经达到了最大并发扩容数限制则breackif ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;// 如果没有,则sizeCtl加1,然后进行转移元素if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}// 如果sc是大于0的并且如果修改sizeCtl为一个特定的值,比如n=16, rs << RESIZE_STAMP_SHIFT) + 2= -2145714174else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))// 转移元素,转移完了之后继续进入循环中transfer(tab, null);s = sumCount();}}}
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;// stride表示步长,步长最小为16,如果CPU只有一核,那么步长为n// 既如果只有一个cpu,那么只有一个线程来进行扩容// 步长代表一个线程负责转移的桶的个数if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range// 新数组初始化,长度为两倍if (nextTab == null) { // initiatingtry {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) { // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;// 因为是两倍扩容,相当于两个老数组结合成了一个新数组,transferIndex表示第二个小数组的第一个元素的下标transferIndex = n;}// 新数组的长度int nextn = nextTab.length;ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);// advance为true时,当前桶是否已经迁移完成,如果迁移完成则开始处理下一个桶boolean advance = true;// 是否完成boolean finishing = false; // to ensure sweep before committing nextTab// 开始转移一个步长内的元素,i表示for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;while (advance) {int nextIndex, nextBound;// i先减1,如果减完之后小于bound,那么继续转移if (--i >= bound || finishing)advance = false;// transferIndexelse if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}// 通过cas来修改TRANSFERINDEX,如果修改成功则对bound和i进行赋值// 第一循环将进入到这里,来赋值bound和i// nextIndex就是transferIndex,假设为16,假如步长为4,那么就分为4个组,每组4个桶// 0-3,4-7,8-11,12-15// nextBound = 16-4=12// i=16-1=15// 所以bound表示一个步长里的最小的下标,i表示一个步长里的最大下标// TRANSFERINDEX是比较重要的,每个线程在进行元素的转移之前需要确定当前线程从哪个位置开始(从后往前)// TRANSFERINDEX每次减掉一个步长,所以当下一个线程准备转移元素时就可以从最新的TRANSFERINDEX开始了// 如果没有修改成功则继续循环else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}// i表示一个步长里的最大下标, 如果i小于或者大于等于老数组长度,或者下标+老数组长度大于等等新数组长度if (i < 0 || i >= n || i + n >= nextn) {int sc;// 转移完成if (finishing) {nextTable = null;table = nextTab;// sizeCtl = 1.5n = 2n*0.75sizeCtl = (n << 1) - (n >>> 1);return;}// 每个线程负责的转移任务结束后利用cas来对sizeCtl减1if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 当前线程负责的任务做完了,同时还有其他线程还在做任务,则回到上层重新申请任务来做if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;// 当前线程负责的任务做完了,也没有其他线程在做任务了,那么则表示扩容结束了finishing = advance = true;i = n; // recheck before commit}}// 从i位置开始转移元素// 如果老数组的i位置元素为null,则表示该位置上的元素已经被转移完成了,// 则通过cas设置为ForwardingNode,表示无需转移else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);// 如果i位置已经是ForwardingNode,则跳过该位置(就是桶)else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {// 加锁,开始转移synchronized (f) {// 加锁完了之后再次检查一遍tab[i]是否发生了变化if (tabAt(tab, i) == f) {Node<K,V> ln, hn;// fh大于等于0表示是链表if (fh >= 0) {// n是老数组的长度// 因为n是2的幂次方数,所以runbit只有两种结果:0和nint runBit = fh & n;// 遍历链表,lastRun为当前链表上runbit连续相同的一小段的最后一段Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}// 如果最后一段的runBit为0,则则该段应该保持在当前位置// 否则应该设置到i+n位置if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}//从头节点开始,遍历链表到lastRun结束for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;// 如果ph & n,则将遍历到的节点插入到ln的前面// 否则将遍历到的节点插入到hn的前面if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}// 将ln链表赋值在新tab的i位置setTabAt(nextTab, i, ln);// 将hn链表赋值在新tab的i+n位置setTabAt(nextTab, i + n, hn);// 这是老tab的i位置ForwardingNode节点,表示转移完成setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;}else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;setTabAt(nextTab, i, ln);setTabAt(nextTab, i + n, hn);setTabAt(tab, i, fwd);advance = true;}}}}}}
private final void fullAddCount(long x, boolean wasUncontended) {int h;if ((h = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit(); // force initializationh = ThreadLocalRandom.getProbe();wasUncontended = true;}boolean collide = false; // True if last slot nonemptyfor (;;) {CounterCell[] as; CounterCell a; int n; long v;// 如果counterCells不等于空if ((as = counterCells) != null && (n = as.length) > 0) {// h可以理解为当前线程的hashcode,如果对应的counterCells数组下标位置元素当前是空的// 那么则应该在该位置去生成一个CounterCell对象if ((a = as[(n - 1) & h]) == null) {// counterCells如果空闲if (cellsBusy == 0) { // Try to attach new Cell// 生成CounterCell对象CounterCell r = new CounterCell(x); // Optimistic create// 再次判断counterCells如果空闲,并且cas成功修改cellsBusy为1if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean created = false;try { // Recheck under lockCounterCell[] rs; int m, j;// 如果counterCells对象没有发生变化,那么就将刚刚创建的CounterCell赋值到数组中if ((rs = counterCells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;// 便是CounterCell创建成功created = true;}} finally {cellsBusy = 0;}// 如果CounterCell创建成功,则退出循环,方法执行结束if (created)break;// 如果没有创建成功,则继续循环continue; // Slot is now non-empty}}// 应该当前位置为空,所以肯定没有发生碰撞collide = false;}// 如果当前位置不为空,则进入以下分支判断// 如果调用当前方法之前cas失败了,那么先将wasUncontended设置为true,else if (!wasUncontended) // CAS already known to failwasUncontended = true; // Continue after rehash// 通过cas修改CELLVALUE的值,修改成功则退出循环,修改失败则继续进行分支判断else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))break;// counterCells发生了改变,或者当前counterCells数组的大小大于等于CPU核心数,设置collide为false,// 如果到了这个极限,counterCells不会再进行扩容了else if (counterCells != as || n >= NCPU)collide = false; // At max size or stale// 一旦走到这个分支了,那么就是发生了碰撞了,一个当前这个位置不为空else if (!collide)collide = true;// 当collide为true进入这个分支,表示发生了碰撞会进行扩容else if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {try {// 对counterCells进行扩容if (counterCells == as) {// Expand table unless staleCounterCell[] rs = new CounterCell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];counterCells = rs;}} finally {cellsBusy = 0;}collide = false;continue; // Retry with expanded table}// 重新进行hashh = ThreadLocalRandom.advanceProbe(h);}// 如果counterCells等于空的情况下会走下面两个分支// cellsBusy == 0表示counterCells没有线程在用// 如果counterCells空闲,并且当前线程所获得counterCells对象没有发生变化// 先通过CAS将cellsBusy标记改为1,如果修改成功则证明可以操作counterCells了,// 其他线程暂时不能使用counterCellselse if (cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean init = false;try { // Initialize table// cellsBusy标记改成后就初始化CounterCell[]if (counterCells == as) {CounterCell[] rs = new CounterCell[2];// 并且把x赋值到CounterCell中完成计数rs[h & 1] = new CounterCell(x);counterCells = rs;init = true;}} finally {cellsBusy = 0;}// 如果没有初始化成功,则证明counterCells发生了变化,当前线程修改cellsBusy的过程中,// 可能其他线程已经把counterCells对象替换掉了// 如果初始化成功,则退出循环,方法执行结束if (init)break;}else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))break; // Fall back on using base}}
sizeCtl默认等于0或者用户设置的数组初始容量
在初始化map的时候会先减1,初始化完成之后就会被设置为扩容的阈值
当map的元素数量大于等于扩容的阈值之后就会进行循环扩容:
第一个线程扩容时会把sizeCtl修改为一个很大的负数,然后开始转移元素,如果在这个线程扩容的过程中有其他线程也来帮助扩容了,那么sizeCtl就会加1,如果某个线程扩容结束后就会减1,每个线程减完1之后都判断一下sizeCtl是否不等于之前很大的负数,如果等于则表示当前线程时扩容的最后一个线程了,那么完成map属性的赋值工作,如果不等于并且又没有其他转移任务要做了,那么则退出转移方法,退出之后
