JUC
JUC CAS

原子类

java.util.concurrent.atomic

没有 CAS 之前

多线程环境中不使用原子类保证线程安全i++(基本数据类型)

Test.java
class Test {
    private volatile int count = 0;
    //若要线程安全执行执行count++,需要加锁
    public synchronized void increment() {
        count++;
    }
 
    public int getCount() {
        return count;
    }
}

使用 CAS 之后

多线程环境中使用原子类保证线程安全i++(基本数据类型)类似于乐观锁

Test.java
class Test {
    private AtomicInteger count = new AtomicInteger();
 
    public void increment() {
        count.incrementAndGet();
    }
  
     //使用AtomicInteger之后,不需要加锁,也可以实现线程安全。
    public int getCount() {
        return count.get();
    }
}

CAS 概述

CAS(compare and swap),中文翻译为比较并交换,实现并发算法时常用到的一种技术,用于保证共享变量的原子性更新,它包含三个操作数内存位置、预期原值与更新值

执行CAS操作的时候,将内存位置的值与预期原值进行比较:

  • 如果相匹配,那么处理器会自动将该位置更新为新值
  • 如果不匹配,处理器不做任何操作,多个线程同时执行CAS操作只有一个会成功。

原理

CAS (CompareAndSwap) CAS 有 3 个操作数,位置内存值 V,旧的预期值 A,要修改的更新值 B。当且仅当旧的预期值 A 和内存值 V 相同时,将内存值 V 修改为 B,否则什么都不做或重来。

CAS

CASDemo

CASDemo.java
public class CASDemo {
 
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(5);
        System.out.println(atomicInteger.compareAndSet(5, 2024) + "\t" + atomicInteger.get());
        System.out.println(atomicInteger.compareAndSet(5, 2024) + "\t" + atomicInteger.get());
    }
}

硬件级别保证

CAS 是 JDK 提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性。它是非阻塞的且自身原子性,也就是说这玩意效率更高且通过硬件保证,说明这玩意更可靠。

CAS 是一条 CPU 的原子指令(cmpxchg指令),不会造成所谓的数据不一致问题,Unsafe 提供的 CAS 方法(如compareAndSwapXXX)底层实现即为 CPU 指令 cmpxchg。执行 cmpxchg 指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功,加锁成功之后会执行 cas 操作,也就是说 CAS 的原子性实际上是 CPU 实现的, 其实在这一点上还是有排他锁的,只是比起用 synchronized, 这里的排他时间要短的多, 所以在多线程情况下性能会比较好。

源码分析

compareAndSet()方法的源代码:

Unsafe.java
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);
 
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
 
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long x);

上面三个方法都是类似的,主要对4个参数做一下说明。

  • o:表示要操作的对象
  • offset:表示要操作对象中属性地址的偏移量
  • expected:表示需要修改数据的期望的值
  • x:表示需要修改为的新值

CAS 底层原理?对 UnSafe 的理解

Unsafe

AtomicInteger.java
public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;
 
    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
 
    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
 
    private volatile int value;
    ......
}

Unsafe 是 CAS 的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe类存在于sun.misc包中,其内部方法操作可以像C的指针一样直接操作内存,因为Java中CAS操作的执行依赖于Unsafe类的方法。

注意Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务

  • 变量valueOffset,表示该变量值在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的。

    AtomicInteger.java
    /**
     * Atomically increments by one the current value.
     *
     * @return the previous value
     */
    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }
  • 变量value用volatile修饰,保证了多线程之间的内存可见性。

源码分析

CAS的全称为Compare-And-Swap,它是一条CPU并发原语

它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。AtomicInteger 类主要利用 CAS (compare and swap) + volatile 和 native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升。

CASDemo.java
atomicInteger.getAndIncrement();
AtomicInteger.java
/**
 * Atomically increments by one the current value.
 *
 * @return the previous value
 */
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}
Unsafe.java
/**
 * Atomically adds the given value to the current value of a field
 * or array element within the given object <code>o</code>
 * at the given <code>offset</code>.
 *
 * @param o object/array to update the field/element in
 * @param offset field/element offset
 * @param delta the value to add
 * @return the previous value
 * @since 1.8
 */
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    return v;
}

CAS并发原语体现在JAVA语言中就是sun.misc.Unsafe类中的各个方法。调用UnSafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令。这是一种完全依赖于硬件的功能,通过它实现了原子操作。再次强调,由于CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。

示例

根据上一小节的代码分析,我们继续分析 new AtomicInteger.getAndIncrement()。假设线程A和线程B两个线程同时执行getAndAddInt操作(分别跑在不同CPU上):

  • AtomicInteger里面的value原始值为3,即主内存中AtomicInteger的value为3,根据JMM模型,线程A和线程B各自持有一份值为3的value的副本分别到各自的工作内存。
  • 线程A通过getIntVolatile(var1, var2)拿到value值3,这时线程A被挂起。
  • 线程B也通过getIntVolatile(var1, var2)方法获取到value值3,此时刚好线程B没有被挂起并执行compareAndSwapInt方法比较内存值也为3,成功修改内存值为4,线程B打完收工,一切OK。
  • 这时线程A恢复,执行compareAndSwapInt方法比较,发现自己手里的值数字3和主内存的值数字4不一致,说明该值已经被其它线程抢先一步修改过了,那A线程本次修改失败,只能重新读取重新来一遍了。
  • 线程A重新获取value值,因为变量value被volatile修饰,所以其它线程对它的修改,线程A总是能够看到,线程A继续执行compareAndSwapInt进行比较替换,直到成功。

底层

unsafe.cpp

Unsafe类中的compareAndSwapInt,是一个本地方法,该方法的实现位于unsafe.cpp中

unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
// 先想办法拿到变量value在内存中的地址,根据偏移量valueOffset,计算 value 的地址
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
// 调用 Atomic 中的函数 cmpxchg来进行比较交换,其中参数x是即将更新的值,参数e是原内存的值
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

cmpxchg

unsigned Atomic::cmpxchg(unsigned int exchange_value,volatile unsigned int* dest, unsigned int compare_value) {
    assert(sizeof(unsigned int) == sizeof(jint), "more work to do");
  /*
   * 根据操作系统类型调用不同平台下的重载函数,这个在预编译期间编译器会决定调用哪个平台下的重载函数*/
    return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest, (jint)compare_value);
}
WIN10
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
  //判断是否是多核CPU
  int mp = os::is_MP();
  __asm {
    //三个move指令表示的是将后面的值移动到前面的寄存器上
    mov edx, dest
    mov ecx, exchange_value
    mov eax, compare_value
    //CPU原语级别,CPU触发
    LOCK_IF_MP(mp)
    //比较并交换指令
    //cmpxchg: 即“比较并交换”指令
    //dword: 全称是 double word 表示两个字,一共四个字节
    //ptr: 全称是 pointer,与前面的 dword 连起来使用,表明访问的内存单元是一个双字单元 
    //将 eax 寄存器中的值(compare_value)与 [edx] 双字内存单元中的值进行对比,
    //如果相同,则将 ecx 寄存器中的值(exchange_value)存入 [edx] 内存单元中
    cmpxchg dword ptr [edx], ecx
  }
}

总结

  • CAS是靠硬件实现的从而在硬件层面提升效率,最底层还是交给硬件来保证原子性和可见性

  • 实现方式是基于硬件平台的汇编指令,在intel的CPU中(X86机器上),使用的是汇编指令cmpxchg指令。

  • 核心思想就是:比较要更新变量的值V和预期值E(compare),相等才会将V的值设为新值N(swap)如果不相等自旋再来。

原子引用

我们可以借助于 AtomicReference<V> 自己定义属于自己的原子对象

AtomicReferenceDemo.java
class User {
    String username;
    int age;
 
    public User(String username, int age) {
        this.username = username;
        this.age = age;
    }
 
    @Override
    public String toString() {
        return "User{" +
                "username='" + username + '\'' +
                ", age=" + age +
                '}';
    }
}
 
public class AtomicReferenceDemo {
    public static void main(String[] args) {
        AtomicReference<User> atomicReference = new AtomicReference<>();
        User user1 = new User("user1", 22);
        User user2 = new User("user2", 25);
 
        atomicReference.set(user1);
        System.out.println(atomicReference.compareAndSet(user1, user2) + "\t" + atomicReference.get().toString());
        System.out.println(atomicReference.compareAndSet(user1, user2) + "\t" + atomicReference.get().toString());
    }
}
// true	User{username='user2', age=25}
// false	User{username='user2', age=25}

⭐️CAS 与自旋锁

概述

自旋锁(spinlock)是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,当线程发现锁被占用时,会不断循环判断锁的状态,直到获取。这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗 CPU

SpinLockDemo.java
public class SpinLockDemo {
 
    AtomicReference<Thread> atomicReference = new AtomicReference<>();
 
    public void lock() {
        Thread thread = Thread.currentThread();
        System.out.println(Thread.currentThread().getName() + "\t invoked lock()");
        while (!atomicReference.compareAndSet(null, thread)) {
 
        }
 
    }
 
    public void unlock() {
        Thread thread = Thread.currentThread();
        atomicReference.compareAndSet(thread, null);
        System.out.println(Thread.currentThread().getName() + "\t invoked unlock()");
    }
 
    public static void main(String[] args) {
        SpinLockDemo spinLockDemo = new SpinLockDemo();
        new Thread(() -> {
            spinLockDemo.lock();
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            spinLockDemo.unlock();
        }, "A").start();
 
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        new Thread(() -> {
            spinLockDemo.lock();
            spinLockDemo.unlock();
        }, "B").start();
    }
}

CAS 缺点

循环时间长

我们可以看到 getAndAddInt() 方法执行时,有个 do while,如果 CAS 失败,会一直进行尝试。如果 CAS 长时间一直不成功,可能会给 CPU 带来很大的开销。

Unsafe.java
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    return v;
}

ABA 问题

概述

CAS算法实现一个重要前提需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差类会导致数据的变化。

比如说一个线程1从内存位置V中取出A,这时候另一个线程2也从内存中取出A,并且线程2进行了一些操作将值变成了B,然后线程2又将V位置的数据变成A,这时候线程1进行CAS操作发现内存中仍然是A,然后线程1操作成功。尽管线程1的CAS操作成功,但是不代表这个过程就是没有问题的。

版本号时间戳原子引用

  • AtomicStampedReference

  • AtomicStampedReference带戳记流水的简单演示(单线程)

    AtomicStampedReferenceDemo.java
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    class Book {
        private int id;
        private String bookName;
    }
     
    public class AtomicStampedReferenceDemo {
        public static void main(String[] args) {
            Book javaBook = new Book(1, "javaBook");
            AtomicStampedReference<Book> atomicStampedReference = new AtomicStampedReference<>(javaBook, 1);
            System.out.println(atomicStampedReference.getReference() + "\t" + atomicStampedReference.getStamp());
     
            Book mysqlBook = new Book(2, "mysqlBook");
            boolean b;
            b = atomicStampedReference.compareAndSet(javaBook, mysqlBook, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
            System.out.println(b + "\t" + atomicStampedReference.getReference() + "\t" + atomicStampedReference.getStamp());
     
            b = atomicStampedReference.compareAndSet(mysqlBook, javaBook, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
            System.out.println(b + "\t" + atomicStampedReference.getReference() + "\t" + atomicStampedReference.getStamp());
        }
    }
    /**
     * Book(id=1, bookName=javaBook)	1
     * true	Book(id=2, bookName=mysqlBook)	2
     * true	Book(id=1, bookName=javaBook)	3
     */
  • 多线程情况下演示AtomicStampedReference解决ABA问题

    ABADemo.java
    public class ABADemo {
        static AtomicInteger atomicInteger = new AtomicInteger(100);
        static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);
     
        public static void main(String[] args) {
    //        abaHappen();//true	2023
            /**
             * t3	首次版本号: 1
             * t4	首次版本号: 1
             * t3	2次版本号: 2
             * t3	3次版本号: 3
             * false	100	3
             */
            abaNoHappen();
     
        }
     
        private static void abaNoHappen() {
            new Thread(() -> {
                int stamp = atomicStampedReference.getStamp();
                System.out.println(Thread.currentThread().getName() + "\t" + "首次版本号: " + stamp);
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
                System.out.println(Thread.currentThread().getName() + "\t" + "2次版本号: " + atomicStampedReference.getStamp());
                atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
                System.out.println(Thread.currentThread().getName() + "\t" + "3次版本号: " + atomicStampedReference.getStamp());
            }, "t3").start();
     
     
            new Thread(() -> {
                int stamp = atomicStampedReference.getStamp();
                System.out.println(Thread.currentThread().getName() + "\t" + "首次版本号: " + stamp);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                boolean b = atomicStampedReference.compareAndSet(100, 200, stamp, stamp + 1);
                System.out.println(b + "\t" + atomicStampedReference.getReference() + "\t" + atomicStampedReference.getStamp());
            }, "t4").start();
        }
     
        private static void abaHappen() {
            new Thread(() -> {
                atomicInteger.compareAndSet(100, 101);
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                atomicInteger.compareAndSet(101, 100);
            }, "t1").start();
     
     
            new Thread(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(atomicInteger.compareAndSet(100, 2023) + "\t" + atomicInteger.get());//true	2023
            }, "t2").start();
        }
    }