longAccumulate 入参说明
- long x:需求增加的值,一般默认都是 1
- LongBinaryOperator fn:默认传递的都是 null
- wasUncontended :竞争标识,如果 false 增代表有竞争。只有 cells 初始化之后,并且当前线程 CAS 竞争修改失败,才会是 false
Striped64 中一些变量或者方法的定义
- base:类似于 AtomicLong 中全局的 value 值。在没有竞争情况下数据直接累加到 base 上,或者 cells 扩容时,也需要将数据写入到 base 上
- collide:表示扩容意向,false 一定不会扩容,true 可能会扩容。
- cellsBusy:初始化 cells 或者扩容 cells 需要获取锁,0:表示无锁状态;1:表示其他线程已经持有了锁
- casCellsBusy():通过 CAS 操作修改 cellsBusy 的值,CAS 成功代表获取锁,返回 true
- NCPU:前计算机 CPU 数量,Cell 数组扩容时会使用到
- getProbe():获取当前线程的 hash 值
- advanceProbe():重置当前线程的 hash 值
步骤
线程 hash 值:probe
Striped64.java
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
// 存储线程的 probe 值
int h;
// 如果 getProbe() 方法返回 0,说明随机数未初始化
if ((h = getProbe()) == 0) {
// 使用 ThreadLocalRandom 为当前线程重新计算一个 hash 值,强制初始化
ThreadLocalRandom.current(); // force initialization
// 重新获取 probe 值,hash 值被重置就好比一个全新的线程一样,所以设置了 wasUncontended 竞争状态为 true
h = getProbe();
// 重新计算了当前线程的 hash 后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激烈 wasUncontended 竞争状态为 true
wasUncontended = true;
}
}
......
static final int getProbe() {
return UNSAFE.getInt(Thread.currentThread(), PROBE);
}
......
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long BASE;
private static final long CELLSBUSY;
private static final long PROBE;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> sk = Striped64.class;
BASE = UNSAFE.objectFieldOffset
(sk.getDeclaredField("base"));
CELLSBUSY = UNSAFE.objectFieldOffset
(sk.getDeclaredField("cellsBusy"));
Class<?> tk = Thread.class;
PROBE = UNSAFE.objectFieldOffset
(tk.getDeclaredField("threadLocalRandomProbe"));
} catch (Exception e) {
throw new Error(e);
}
}
代码结构总纲
Striped64.java
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) { // 这个 if 相当于给当前线程生成一个非 0 的 hash 值
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
// 如果 hash 取模映射得到的 Cell 单元不是 null,则为 true,此值也可以看作是扩容意向
boolean collide = false; // True if last slot nonempty
// 自旋
for (;;) {
Cell[] as; Cell a; int n; long v;
// CASE1:cells 已经被初始化了
if ((as = cells) != null && (n = as.length) > 0) {
// ...
}
// CASE2:cells 没有加锁且没有初始化,则尝试对它进行加锁,并初始化 cells 数组(首次新建)
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// ...
}
// CASE3:cells 正在进行初始化,并尝试直接在基数 base 上进行累加操作
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) {
// ...
}
}
}
计算
刚刚要初始化 Cell[] 数组(首次新建)
未初始化过 Cell[] 数组,尝试占有锁并首次初始化cells数组
Striped64.java
// cells == as 如果不 double check,就会再次 new 一个 cell 数组,上一个线程对应数组中的值将会被篡改
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2]; // 创建一个大小为 2 的 Cell 数组
rs[h & 1] = new Cell(x); // 找到当前线程 hash 到数组中的位置并创建其对应的 Cell
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
如果上面条件都执行成功就会执行数组的初始化及赋值操作,
-
Cell[] rs = new Cell[2]
表示数组的长度为2; -
rs[h & 1] = new Cell(x)
表示创建一个新的Cell元素,value是x值,默认为1; -
h & 1
类似于我们之前 HashMap 常用到的计算散列桶 index 的算法,通常都是hash & (table.len - 1)
。同hashmap一个意思。
兜底
多个线程尝试 CAS 修改失败的线程会走到这个分支
Striped64.java
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) {
break; // Fall back on using base
}
该分支实现直接操作base基数,将值累加到base上,也即其它线程正在初始化,多个线程正在更新base的值。
Cell[] 数组不在为空且可能存在 Cell[] 数组扩容
多个线程同时命中一个 cell 的竞争
总体代码
Striped64.java
// CASE1:cells 已经被初始化了
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) { // 当前线程的 hash 值运算后映射得到的 Cell 单元为 null,说明该 Cell 没有被使用
if (cellsBusy == 0) { // Try to attach new Cell(Cell[] 数组没有正在扩容)
Cell r = new Cell(x); // Optimistically create(创建一个 Cell 单元)
if (cellsBusy == 0 && casCellsBusy()) { // 双端检锁,尝试加锁,成功后 cellsBusy == 1
boolean created = false;
try { // Recheck under lock(在有锁的情况下再检测一边之前的判断)
Cell[] rs; int m, j; // 将 Cell 单元赋到 Cell[] 数组上
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0; // 释放锁
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail(wasUncontended 标识前一次 CAS 更新 Cell 单元是否成功)
wasUncontended = true; // Continue after rehash(重新置为 true,后面会重新计算线程的 hash 值)
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) // 尝试 CAS 更新 Cell 单元值
break;
else if (n >= NCPU || cells != as) // 尝试加锁进行扩容
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1]; // 扩容后的大小等于当前容量的 2 倍
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h); // 重新计算当前线程的 hash 值
}
CASE1
Striped64.java
if ((a = as[(n - 1) & h]) == null) { // 当前线程的 hash 值运算后映射得到的 Cell 单元为 null,说明该 Cell 没有被使用
if (cellsBusy == 0) { // Try to attach new Cell(Cell[] 数组没有正在扩容)
Cell r = new Cell(x); // Optimistically create(创建一个 Cell 单元)
if (cellsBusy == 0 && casCellsBusy()) { // 双端检锁,尝试加锁,成功后 cellsBusy == 1
boolean created = false;
try { // Recheck under lock(在有锁的情况下再检测一边之前的判断)
Cell[] rs; int m, j; // 将 Cell 单元赋到 Cell[] 数组上
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0; // 释放锁
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
上面代码判断当前线程hash后指向的数据位置元素是否为空;如果为空则将Cell数据放入数组中,跳出循环;如果不空则继续循环。
CASE2
Striped64.java
else if (!wasUncontended) { // CAS already known to fail(wasUncontended 标识前一次 CAS 更新 Cell 单元是否成功)
wasUncontended = true; // Continue after rehash(重新置为 true,后面会重新计算线程的 hash 值)
}
...
h = advanceProbe(h); // 重新计算当前线程的 hash 值
wasUncontended
表示 cells 初始化后,当前线程竞争修改失败 wasUncontended = false
,这里只是重新设置了这个值为 true,紧接着执行 advanceProbe(h)
重新计算当前线程的 hash,重新循环。
CASE3
Striped64.java
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x)))) // 尝试 CAS 更新 Cell 单元值
break;
说明当前线程对应的数组中有了数据,也重置过hash值,这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环。
CASE4
Striped64.java
else if (n >= NCPU || cells != as) { // 尝试加锁进行扩容
collide = false; // At max size or stale
}
...
h = advanceProbe(h); // 重新计算当前线程的 hash 值
如果 n 大于 CPU 数量,不可扩容,并通过下面的 h = advanceProbe(h)
方法修改线程的 probe
再重新尝试
CASE5
Striped64.java
else if (!collide) {
collide = true;
}
...
h = advanceProbe(h); // 重新计算当前线程的 hash 值
- 如果扩容意向
collide = false
则修改它为 true,然后重新计算当前线程的 hash 值继续循环; - 如果当前数组的长度已经大于 CPU 核数,就会再次设置扩容意向
collide = false
(见上一步)
CASE6
Striped64.java
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 当前的 cells 数组和最先赋值的 as 是同一个,代表没有被其他线程扩容过
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1]; // 扩容后的大小等于当前容量的 2 倍
for (int i = 0; i < n; ++i) // 扩容后再将之前的数组的元素拷贝到新数组中
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0; // 释放锁设置 cellsBusy = 0,设置扩容状态,然后继续循环执行。
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h); // 重新计算当前线程的 hash 值