JUC
JUC 原子操作类

阿里巴巴手册

【参考】volatile 解决多线程内存不可见问题对于一写多读,是可以解决变量同步问题,但是如果多写,同样无法解决线程安全问题。
说明:如果是 count++ 操作,使用如下类实现:
AtomicInteger count = new AtomicInteger();
count.addAndGet(1);
如果是 JDK8,推荐使用 LongAdder 对象,比 AtomicLong 性能更好(减少乐观锁的重试次数)。

基本类型原子类

  • AtomicInteger(整型原子类)
  • AtomicBoolean(布尔型原子类)
  • AtomicLong(长整型原子类)

常用 API

public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)

代码示例

AtomicIntegerDemo.java
class MyNumber {
    AtomicInteger atomicInteger = new AtomicInteger();
 
    public void addPlusPlus() {
        atomicInteger.getAndIncrement();
    }
 
}
 
public class AtomicIntegerDemo {
 
    public static final int SIZE = 50;
 
    public static void main(String[] args) throws InterruptedException {
        MyNumber myNumber = new MyNumber();
        CountDownLatch countDownLatch = new CountDownLatch(SIZE);
        for (int i = 1; i <= SIZE; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 10; j++) {
                        myNumber.addPlusPlus();
                    }
                } finally {
                    countDownLatch.countDown();
                }
            }, String.valueOf(i)).start();
 
        }
        countDownLatch.await();
 
        System.out.println(Thread.currentThread().getName() + "\t" + "result: " + myNumber.atomicInteger.get());//main	result: 500
    }
}

数字类型原子类

  • AtomicIntegerArray(整型数组原子类)
  • AtomicLongrArray(长整型数组原子类)
  • AtomicLongrArray(用类型数组原子类)

常用 API

public final int get(int i) //获取 index=i 位置元素的值
public final int getAndSet(int i, int newValue)//返回 index=i 位置的当前的值,并将其设置为新值:newValue
public final int getAndIncrement(int i)//获取 index=i 位置元素的值,并让该位置的元素自增
public final int getAndDecrement(int i) //获取 index=i 位置元素的值,并让该位置的元素自减
public final int getAndAdd(int i, int delta) //获取 index=i 位置元素的值,并加上预期的值
boolean compareAndSet(int i, int expect, int update) //如果输入的数值等于预期值,则以原子方式将 index=i 位置的元素值设置为输入值(update)
public final void lazySet(int i, int newValue)//最终 将index=i 位置的元素设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

代码示例

AtomicIntegerArrayDemo.java
public class AtomicIntegerArrayDemo {
    public static void main(String[] args) {
//        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1, 2, 3, 4, 5});
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
        for (int i = 0; i < atomicIntegerArray.length(); i++) {
            System.out.println(atomicIntegerArray.get(i));
        }
        System.out.println();
        int tempInt = 0;
        tempInt = atomicIntegerArray.getAndSet(0, 1122);
        System.out.println(tempInt + "\t" + atomicIntegerArray.get(0));
        tempInt = atomicIntegerArray.getAndIncrement(0);
        System.out.println(tempInt + "\t" + atomicIntegerArray.get(0));
    }
}

引用类型原子类

AtomicReference

自旋锁,具体代码示例查看上一章节内容

AtomicStampedReference

携带版本号的引用类型原子类,可以解决 ABA 问题;主要解决修改过几次问题;属于状态戳原子引用。代码示例查看上一章节 ABA 问题

AtomicMarkableReference

原子更新带有标记位的引用类型对象;主要解决是否修改过问题,定义是将状态戳简化为 true 或者 false;属于状态戳(true|false)原子引用。

代码示例

AtomicMarkableReferenceDemo.java
public class AtomicMarkableReferenceDemo {
    static AtomicMarkableReference markableReference = new AtomicMarkableReference(100, false);
 
    public static void main(String[] args) {
        new Thread(() -> {
            boolean marked = markableReference.isMarked();
            System.out.println(Thread.currentThread().getName() + "\t" + "默认标识: " + marked);//t1	默认标识: false
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            markableReference.compareAndSet(100, 1000, marked, !marked);//t2	默认标识: false
 
        }, "t1").start();
 
        new Thread(() -> {
            boolean marked = markableReference.isMarked();
            System.out.println(Thread.currentThread().getName() + "\t" + "默认标识: " + marked);//t2	t2线程CASResult:false
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            boolean b = markableReference.compareAndSet(100, 2000, marked, !marked);
            System.out.println(Thread.currentThread().getName() + "\t" + "t2线程CASResult:" + b);
            System.out.println(Thread.currentThread().getName() + "\t" + markableReference.isMarked());//t2	true
            System.out.println(Thread.currentThread().getName() + "\t" + markableReference.getReference());//t2	1000
 
        }, "t2").start();
    }
}

对象的属性修改原子类

  • AtomicIntegerFieldUpdater(原子更新对象中 volatile int 类型字段的值)
  • AtomicLongFieldUpdater(原子更新对象中 volatile long 类型字段的值)
  • AtomicReferenceFieldUpdater(原子更新引用类型字段的值)

使用目的

以一种线程安全的方式操作非线程安全对象内的某些字段。(减少使用 synchronized,但又能保证线程安全)

使用要求

  • 更新的对象属性必须使用 public volatile 修饰符。
  • 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须 使用静态方法 newUpdater() 创建一个更新器,并且需要设置想要更新的类和属性。

代码示例

AtomicIntegerFieldUpdaterDemo.java
class BankAccount {
    public volatile int money = 0;
 
    AtomicIntegerFieldUpdater<BankAccount> atomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class, "money");
 
    public void transferMoney(BankAccount bankAccount) {
        atomicIntegerFieldUpdater.getAndIncrement(bankAccount);
    }
}
 
public class AtomicIntegerFieldUpdaterDemo {
    public static void main(String[] args) throws InterruptedException {
        BankAccount bankAccount = new BankAccount();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 1; i <= 10; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 1000; j++) {
                        bankAccount.transferMoney(bankAccount);
                    }
                } finally {
                    countDownLatch.countDown();
                }
            }, String.valueOf(i)).start();
 
        }
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + '\t' + "result: " + bankAccount.money); //main	result: 10000
    }
}
AtomicReferenceFieldUpdater.java
/**
 * 需求:多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作
 * 要求只能被初始化一次,只有一个线程操作成功
 */
class MyVar {
    public volatile Boolean isInit = Boolean.FALSE;
    AtomicReferenceFieldUpdater<MyVar, Boolean> referenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(MyVar.class, Boolean.class, "isInit");
 
    public void init(MyVar myVar) {
        if (referenceFieldUpdater.compareAndSet(myVar, Boolean.FALSE, Boolean.TRUE)) {
            System.out.println(Thread.currentThread().getName() + "\t" + "--------------start init ,need 2 secondes");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "\t" + "--------------over init");
        } else {
            System.out.println(Thread.currentThread().getName() + "\t" + "--------------已经有线程进行初始化工作了。。。。。");
        }
    }
}
 
public class AtomicReferenceFieldUpdaterDemo {
 
    public static void main(String[] args) {
        MyVar myVar = new MyVar();
        for (int i = 1; i <= 5; i++) {
            new Thread(() -> {
                myVar.init(myVar);
            }, String.valueOf(i)).start();
        }
    }
}
/**
 * 1	--------------start init ,need 2 secondes
 * 5	--------------已经有线程进行初始化工作了。。。。。
 * 2	--------------已经有线程进行初始化工作了。。。。。
 * 4	--------------已经有线程进行初始化工作了。。。。。
 * 3	--------------已经有线程进行初始化工作了。。。。。
 * 1	--------------over init
 */

原子操作增强类原理深度解析

  • DoubleAccumulator(一个或多个变量,它们一起保持运行double使用所提供的功能更新值)
  • DoubleAdder(一个或多个变量一起保持初始为零double总和)
  • LongAccumulator(一个或多个变量,一起保持使用提供的功能更新运行的值long ,提供了自定义的函数操作)
  • LongAdder(一个或多个变量一起维持初始为零long总和(重点),只能用来计算加法,且从0开始计算)

面试题

  1. 热点商品点赞计算器,点赞数加加统计,不要求实时精确
  2. 一个很大的list,里面都是int类型,如何实现加加,思路?

案例:点赞计数器

LongAdder 常用 API

void add(long x); // 将当前的 value 加 x
void increment(); // 将当前的 value 加 1
void decrement(); // 将当前的 value 减 1
long sum(); // ⭐️返回当前值。特别注意,在没有并发更新 value 的情况下,sum 会返回一个精确值,在存在并发的情况下,sum 不保证返回精确值。
void reset(); // 将 value 重置为 0,可用于替代重新 new 一个 LongAdder,但此方法只可以在没有并发更新的情况下使用
long sumThenReset(); // 获取当前 value,并将 value 重置为 0

入门案例

LongAdder 只能用来计算加法,且从零开始计算

LongAdderAPIDemo.java
public class LongAdderAPIDemo {
    public static void main(String[] args) {
        LongAdder longAdder = new LongAdder();
 
        longAdder.increment();
        longAdder.increment();
        longAdder.increment();
 
        System.out.println(longAdder.longValue());
 
        LongAccumulator longAccumulator = new LongAccumulator((x,y) -> x * y,2);
 
        longAccumulator.accumulate(1);
        longAccumulator.accumulate(2);
        longAccumulator.accumulate(3);
 
        System.out.println(longAccumulator.longValue());
    }
}

LongAccumulator 需要提供一个 long 类型的聚合器,需要传入一个 long 类型的二元操作,可以用来计算各种聚合操作,包括加乘等

LongAccumulatorDemo.java
public class LongAccumulatorDemo {
 
    LongAdder longAdder = new LongAdder();
    public void add_LongAdder() {
        longAdder.increment();
    }
 
    //LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y,0);
    LongAccumulator longAccumulator = new LongAccumulator(new LongBinaryOperator() {
        @Override
        public long applyAsLong(long left, long right) {
            return left - right;
        }
    },777);
 
    public void add_LongAccumulator() {
        longAccumulator.accumulate(1);
    }
 
    public static void main(String[] args) {
        LongAccumulatorDemo demo = new LongAccumulatorDemo();
 
        demo.add_LongAccumulator();
        demo.add_LongAccumulator();
        System.out.println(demo.longAccumulator.longValue());
    }
}

LongAdder 高性能对比案例

AccumulatorCompareDemo.java
class ClickNumber {
    int number = 0;
 
    public synchronized void clickBySynchronized() {
        number++;
    }
 
    AtomicLong atomicLong = new AtomicLong(0);
    public void clickByAtomicLong() {
        atomicLong.getAndIncrement();
    }
 
    LongAdder longAdder = new LongAdder();
    public void clickByLongAdder() {
        longAdder.increment();
    }
 
    LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);
    public void clickByLongAccumulator() {
        longAccumulator.accumulate(1);
    }
}
 
public class AccumulatorCompareDemo {
    public static final int _1W = 10000;
    public static final int THREAD_NUMBER = 50;
 
    public static void main(String[] args) throws InterruptedException {
        ClickNumber clickNumber = new ClickNumber();
        long StartTime;
        long endTime;
        CountDownLatch countDownLatch1 = new CountDownLatch(THREAD_NUMBER);
        CountDownLatch countDownLatch2 = new CountDownLatch(THREAD_NUMBER);
        CountDownLatch countDownLatch3 = new CountDownLatch(THREAD_NUMBER);
        CountDownLatch countDownLatch4 = new CountDownLatch(THREAD_NUMBER);
 
        StartTime = System.currentTimeMillis();
        for (int i = 1; i <= 50; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * _1W; j++) {
                        clickNumber.clickBySynchronized();
                    }
                } finally {
                    countDownLatch1.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch1.await();
        endTime = System.currentTimeMillis();
        System.out.println("costTime: " + (endTime - StartTime) + " 毫秒" + "\t clickBySynchronized: " + clickNumber.number);
 
        StartTime = System.currentTimeMillis();
        for (int i = 1; i <= 50; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * _1W; j++) {
                        clickNumber.clickByAtomicLong();
                    }
                } finally {
                    countDownLatch2.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch2.await();
        endTime = System.currentTimeMillis();
        System.out.println("costTime: " + (endTime - StartTime) + " 毫秒" + "\t clickByAtomicLong: " + clickNumber.atomicLong.get());
 
        StartTime = System.currentTimeMillis();
        for (int i = 1; i <= 50; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * _1W; j++) {
                        clickNumber.clickByLongAdder();
                    }
                } finally {
                    countDownLatch3.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch3.await();
        endTime = System.currentTimeMillis();
        System.out.println("costTime: " + (endTime - StartTime) + " 毫秒" + "\t clickByLongAdder: " + clickNumber.longAdder.sum());
 
        StartTime = System.currentTimeMillis();
        for (int i = 1; i <= 50; i++) {
            new Thread(() -> {
                try {
                    for (int j = 1; j <= 100 * _1W; j++) {
                        clickNumber.clickByLongAccumulator();
                    }
                } finally {
                    countDownLatch4.countDown();
                }
            }, String.valueOf(i)).start();
        }
        countDownLatch4.await();
        endTime = System.currentTimeMillis();
        System.out.println("costTime: " + (endTime - StartTime) + " 毫秒" + "\t clickByLongAccumulator: " + clickNumber.longAccumulator.get());
 
    }
}
/**
 * costTime: 1313 毫秒	 clickBySynchronized: 50000000
 * costTime: 825 毫秒	 clickByAtomicLong: 50000000
 * costTime: 92 毫秒	 clickByLongAdder: 50000000
 * costTime: 61 毫秒	 clickByLongAccumulator: 50000000
 */

源码分析

类结构图

原理(LongAdder 为什么这么快)

LongAdder 是 Striped64 的子类

Striped64
Striped64 重要的成员函数(高亮表示更重要)
/** Number of CPUS, to place bound on table size        CPU数量,即cells数组的最大长度 */
static final int NCPU = Runtime.getRuntime().availableProcessors();
 
/**
 * Table of cells. When non-null, size is a power of 2.
 
cells数组,为2的幂,2,4,8,16.....,方便以后位运算
 */
transient volatile Cell[] cells;
 
/**基础value值,当并发较低时,只累加该值主要用于没有竞争的情况,通过CAS更新。
 * Base value, used mainly when there is no contention, but also as
 * a fallback during table initialization races. Updated via CAS.
 */
transient volatile long base;
 
/**创建或者扩容Cells数组时使用的自旋锁变量调整单元格大小(扩容),创建单元格时使用的锁。
 * Spinlock (locked via CAS) used when resizing and/or creating Cells. 
 */
transient volatile int cellsBusy;
Striped64 中变量或方法定义
  • base:类似于 AtomicLong 中全局的 value 值。在没有竞争情况下数据直接累加到 base 上,或者 cells 扩容时,也需要将数据写入到 base 上
  • collide:表示扩容意向,false 一定不会扩容,true 可能会扩容。
  • cellsBusy:初始化 cells 或者扩容 cells 需要获取锁,0:表示无锁状态;1:表示其他线程已经持有了锁
  • casCellsBusy():通过 CAS 操作修改 cellsBusy 的值,CAS 成功代表获取锁,返回 true
  • NCPU:前计算机 CPU 数量,Cell 数组扩容时会使用到
  • getProbe():获取当前线程的 hash 值
  • advanceProbe():重置当前线程的 hash 值
Cell

java.util.concurrent.atomicStriped64 的一个内部类

Striped64.java
/**
 * Padded variant of AtomicLong supporting only raw accesses plus CAS.
 *
 * JVM intrinsics note: It would be possible to use a release-only
 * form of CAS here, if it were provided.
 */
@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }
 
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
LongAdder 为什么这么快

LongAdder 的基本思路就是分散热点,将 value 值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行 CAS 操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的 long 值,只要将各个槽中的变量值累加返回。sum() 会将所有 Cell 数组中的 value 和 base 累加作为返回值,核心的思想就是将之前 AtomicLong 一个 value 的更新压力分散到多个 value 中去,从而降级更新热点

数学表达

LongAdder 内部有一个 base 变量,一个 Cell[] 数组。

  • base 变量:非竞态条件下(低并发),直接累加到该变量上
  • Cell[] 数组:竞态条件下(高并发),累加个各个线程自己的槽Cell[i]中

Value=Base+i=0nCell[i]Value = Base + \sum_{i=0}^{n} Cell[i]

源码解读深度分析

总结

LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则是采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果。

LongAdder-单/多线程竞争

数学表达式:Value=Base+i=0nCell[i]Value = Base + \sum_{i=0}^{n} Cell[i]

longAdder.increment()
add(1L)
longAccumulate
sum()

使用总结

  • AtomicLong
    • 线程安全,可允许一些性能损耗,要求高精度时可使用
    • 保证精度,性能代价
    • AtomicLong 是多个线程针对单个热点值value进行原子操作
  • LongAdder
    • 当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用
    • 保证性能,精度代价
    • LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作

总结

  • AtomicLong

    • 原理
      • CAS+自旋
      • incrementAndGet
    • 场景
      • 低并发下的全局计算
      • AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全性的问题。
    • 缺陷
      • 高并发后性能急剧下降
      • AtomicLong的自旋会成为瓶颈:N个线程CAS操作修改线程的值,每次只有一个成功过,其它N - 1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。
  • LongAdder vs AtomicLong Performance

    http://blog.palominolabs.com/2014/02/10/java-8-performance-improvements-longadder-vs-atomiclong/ (opens in a new tab)

  • LongAdder

    • 原理
      • CAS+Base+Cell数组分散
      • 空间换时间并分散了热点数据
    • 场景:高并发下的全局计算
    • 缺陷:sum求和后还有计算线程修改结果的话,最后结果不够准确