JUC
longAccumulate

longAccumulate 入参说明

  • long x:需求增加的值,一般默认都是 1
  • LongBinaryOperator fn:默认传递的都是 null
  • wasUncontended :竞争标识,如果 false 增代表有竞争。只有 cells 初始化之后,并且当前线程 CAS 竞争修改失败,才会是 false

Striped64 中一些变量或者方法的定义

  • base:类似于 AtomicLong 中全局的 value 值。在没有竞争情况下数据直接累加到 base 上,或者 cells 扩容时,也需要将数据写入到 base 上
  • collide:表示扩容意向,false 一定不会扩容,true 可能会扩容。
  • cellsBusy:初始化 cells 或者扩容 cells 需要获取锁,0:表示无锁状态;1:表示其他线程已经持有了锁
  • casCellsBusy():通过 CAS 操作修改 cellsBusy 的值,CAS 成功代表获取锁,返回 true
  • NCPU:前计算机 CPU 数量,Cell 数组扩容时会使用到
  • getProbe():获取当前线程的 hash 值
  • advanceProbe():重置当前线程的 hash 值

步骤

线程 hash 值:probe

Striped64.java
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
  	// 存储线程的 probe 值
    int h;
  	// 如果 getProbe() 方法返回 0,说明随机数未初始化
    if ((h = getProbe()) == 0) {
      	// 使用 ThreadLocalRandom 为当前线程重新计算一个 hash 值,强制初始化
        ThreadLocalRandom.current(); // force initialization
      	// 重新获取 probe 值,hash 值被重置就好比一个全新的线程一样,所以设置了 wasUncontended 竞争状态为 true
        h = getProbe();
      	// 重新计算了当前线程的 hash 后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激烈 wasUncontended 竞争状态为 true
        wasUncontended = true;
    }
}
 
......
 
static final int getProbe() {
    return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
 
......
 
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> sk = Striped64.class;
        BASE = UNSAFE.objectFieldOffset
            (sk.getDeclaredField("base"));
        CELLSBUSY = UNSAFE.objectFieldOffset
            (sk.getDeclaredField("cellsBusy"));
        Class<?> tk = Thread.class;
        PROBE = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomProbe"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

代码结构总纲

Striped64.java
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) { // 这个 if 相当于给当前线程生成一个非 0 的 hash 值
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
  	// 如果 hash 取模映射得到的 Cell 单元不是 null,则为 true,此值也可以看作是扩容意向
    boolean collide = false;                // True if last slot nonempty
    // 自旋
    for (;;) {
        Cell[] as; Cell a; int n; long v;
      	// CASE1:cells 已经被初始化了
        if ((as = cells) != null && (n = as.length) > 0) {
            // ...
        }
      	// CASE2:cells 没有加锁且没有初始化,则尝试对它进行加锁,并初始化 cells 数组(首次新建)
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // ...
        }
        // CASE3:cells 正在进行初始化,并尝试直接在基数 base 上进行累加操作
        else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) {
            // ...
        }
    }
}

计算

刚刚要初始化 Cell[] 数组(首次新建)

未初始化过 Cell[] 数组,尝试占有锁并首次初始化cells数组

Striped64.java
// cells == as 如果不 double check,就会再次 new 一个 cell 数组,上一个线程对应数组中的值将会被篡改
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
    boolean init = false;
    try {                           // Initialize table
        if (cells == as) {
            Cell[] rs = new Cell[2]; // 创建一个大小为 2 的 Cell 数组
            rs[h & 1] = new Cell(x); // 找到当前线程 hash 到数组中的位置并创建其对应的 Cell
            cells = rs;
            init = true;
        }
    } finally {
        cellsBusy = 0;
    }
    if (init)
        break;
}

如果上面条件都执行成功就会执行数组的初始化及赋值操作,

  • Cell[] rs = new Cell[2] 表示数组的长度为2;

  • rs[h & 1] = new Cell(x) 表示创建一个新的Cell元素,value是x值,默认为1;

  • h & 1 类似于我们之前 HashMap 常用到的计算散列桶 index 的算法,通常都是 hash & (table.len - 1)。同hashmap一个意思。

兜底

多个线程尝试 CAS 修改失败的线程会走到这个分支

Striped64.java
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) {
    break;                          // Fall back on using base
}

该分支实现直接操作base基数,将值累加到base上,也即其它线程正在初始化,多个线程正在更新base的值。

Cell[] 数组不在为空且可能存在 Cell[] 数组扩容

多个线程同时命中一个 cell 的竞争

总体代码
Striped64.java
// CASE1:cells 已经被初始化了
if ((as = cells) != null && (n = as.length) > 0) {
    if ((a = as[(n - 1) & h]) == null) {  // 当前线程的 hash 值运算后映射得到的 Cell 单元为 null,说明该 Cell 没有被使用
        if (cellsBusy == 0) {       // Try to attach new Cell(Cell[] 数组没有正在扩容)
            Cell r = new Cell(x);   // Optimistically create(创建一个 Cell 单元)
            if (cellsBusy == 0 && casCellsBusy()) {  // 双端检锁,尝试加锁,成功后 cellsBusy == 1
                boolean created = false;
                try {               // Recheck under lock(在有锁的情况下再检测一边之前的判断)
                    Cell[] rs; int m, j; // 将 Cell 单元赋到 Cell[] 数组上
                    if ((rs = cells) != null &&
                        (m = rs.length) > 0 &&
                        rs[j = (m - 1) & h] == null) {
                        rs[j] = r;
                        created = true;
                    }
                } finally {
                    cellsBusy = 0; // 释放锁
                }
                if (created)
                    break;
                continue;           // Slot is now non-empty
            }
        }
        collide = false;
    }
    else if (!wasUncontended) // CAS already known to fail(wasUncontended 标识前一次 CAS 更新 Cell 单元是否成功)
        wasUncontended = true; // Continue after rehash(重新置为 true,后面会重新计算线程的 hash 值)
    else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                 fn.applyAsLong(v, x)))) // 尝试 CAS 更新 Cell 单元值
        break;
    else if (n >= NCPU || cells != as)  // 尝试加锁进行扩容
        collide = false;            // At max size or stale
    else if (!collide)
        collide = true;
    else if (cellsBusy == 0 && casCellsBusy()) {
        try {
            if (cells == as) {      // Expand table unless stale
                Cell[] rs = new Cell[n << 1]; // 扩容后的大小等于当前容量的 2 倍
                for (int i = 0; i < n; ++i)
                    rs[i] = as[i];
                cells = rs;
            }
        } finally {
            cellsBusy = 0;
        }
        collide = false;
        continue;                   // Retry with expanded table
    }
    h = advanceProbe(h); // 重新计算当前线程的 hash 值
}
CASE1
Striped64.java
if ((a = as[(n - 1) & h]) == null) {  // 当前线程的 hash 值运算后映射得到的 Cell 单元为 null,说明该 Cell 没有被使用
    if (cellsBusy == 0) {       // Try to attach new Cell(Cell[] 数组没有正在扩容)
        Cell r = new Cell(x);   // Optimistically create(创建一个 Cell 单元)
        if (cellsBusy == 0 && casCellsBusy()) {  // 双端检锁,尝试加锁,成功后 cellsBusy == 1
            boolean created = false;
            try {               // Recheck under lock(在有锁的情况下再检测一边之前的判断)
                Cell[] rs; int m, j; // 将 Cell 单元赋到 Cell[] 数组上
                if ((rs = cells) != null &&
                    (m = rs.length) > 0 &&
                    rs[j = (m - 1) & h] == null) {
                    rs[j] = r;
                    created = true;
                }
            } finally {
                cellsBusy = 0; // 释放锁
            }
            if (created)
                break;
            continue;           // Slot is now non-empty
        }
    }
    collide = false;
}

上面代码判断当前线程hash后指向的数据位置元素是否为空;如果为空则将Cell数据放入数组中,跳出循环;如果不空则继续循环。

CASE2
Striped64.java
else if (!wasUncontended) { // CAS already known to fail(wasUncontended 标识前一次 CAS 更新 Cell 单元是否成功)
    wasUncontended = true; // Continue after rehash(重新置为 true,后面会重新计算线程的 hash 值)
}
...
h = advanceProbe(h); // 重新计算当前线程的 hash 值

wasUncontended 表示 cells 初始化后,当前线程竞争修改失败 wasUncontended = false,这里只是重新设置了这个值为 true,紧接着执行 advanceProbe(h) 重新计算当前线程的 hash,重新循环。

CASE3
Striped64.java
else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                 fn.applyAsLong(v, x)))) // 尝试 CAS 更新 Cell 单元值
        break;

说明当前线程对应的数组中有了数据,也重置过hash值,这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环。

CASE4
Striped64.java
else if (n >= NCPU || cells != as) {  // 尝试加锁进行扩容
        collide = false;            // At max size or stale
}
...
h = advanceProbe(h); // 重新计算当前线程的 hash 值

如果 n 大于 CPU 数量,不可扩容,并通过下面的 h = advanceProbe(h) 方法修改线程的 probe 再重新尝试

CASE5
Striped64.java
else if (!collide) {
    collide = true;
}
...
h = advanceProbe(h); // 重新计算当前线程的 hash 值
  • 如果扩容意向 collide = false 则修改它为 true,然后重新计算当前线程的 hash 值继续循环;
  • 如果当前数组的长度已经大于 CPU 核数,就会再次设置扩容意向  collide = false (见上一步)
CASE6
Striped64.java
else if (cellsBusy == 0 && casCellsBusy()) {
    try {
        // 当前的 cells 数组和最先赋值的 as 是同一个,代表没有被其他线程扩容过
        if (cells == as) {      // Expand table unless stale
            Cell[] rs = new Cell[n << 1]; // 扩容后的大小等于当前容量的 2 倍
            for (int i = 0; i < n; ++i) // 扩容后再将之前的数组的元素拷贝到新数组中
                rs[i] = as[i];
            cells = rs;
        }
    } finally {
        cellsBusy = 0; // 释放锁设置 cellsBusy = 0,设置扩容状态,然后继续循环执行。
    }
    collide = false;
    continue;                   // Retry with expanded table
}
h = advanceProbe(h); // 重新计算当前线程的 hash 值
总结

longAccumulate-方法流程图