JUC
JUC AQS

面试题

  • reentrantlock 实现原理,简单说下 aqs

前置知识

  • 公平锁和非公平锁

    • 公平锁:锁被释放以后,先申请的线程先得到锁。性能较差一些,因为公平锁为了保证时间上的绝对顺序,上下文切换更频繁

    • 非公平锁:锁被释放以后,后申请的线程可能会先获取到锁,是随机或者按照其他优先级排序的。性能更好,但可能会导致某些线程永远无法获取到锁

  • 可重入锁

    也叫做递归锁,指的是线程可以再次获取自己的内部锁,比如一个线程获取到了对象锁,此时这个对象锁还没有释放,当其想再次获取这个对象锁的时候还是可以获取的,如果不可重入的话,会导致死锁。

  • 自旋思想

    当线程请求锁时,如果锁已经被其他线程持有,那么该线程会不断地重试获取锁,而不是被挂起等待,这种不断尝试获取锁的行为称为自旋

  • LockSupport

    • 一个工具类,用于线程的阻塞和唤醒操作,类似于 wait() 和 notify() 方法,但是更加灵活和可控

    • 提供了 park() 和 unpark() 两个静态方法用于线程阻塞和唤醒操作。

    • 优点在于可以在任意时刻阻塞和唤醒线程而不需要事先获取锁或监视器对象。

  • 数据结构之双向链表

    双向链表(Doubly Linked List)是一种常见的数据结构,它是由一系列结点(Node)组成的,每个结点包含三个部分:数据域、前驱指针和后继指针。其中,数据域存储结点的数据,前驱指针指向前一个结点,后继指针指向后一个结点。通过这种方式,双向链表可以实现双向遍历和插入、删除操作。

  • 设计模式之模板设计模式

    • 模板设计模式是一种行为型设计模式,定义了一种算法的框架,并将某些步骤延迟到子类中事先,这种设计模式的主要目的是允许子类在不改变算法结构的情况下重新定义算法中的某些步骤。

    • 优点是能够提高代码复用性和可维护性。

AQS 理论知识

概述

字面意思

抽象的队列同步器,源码地址 java.util.concurrent.locks.AbstractQueuedSynchronizer,简称为 AQS。

技术解释

是用来构建锁或者其他同步器组件的公共基础部分的抽象实现,是重量级基础框架及整个 JUC 体系的基石,主要用于解决锁分配给“谁”的问题;整体就是一个抽象的 FIFO 队列来完成资源获取线程的排队工作,并通过一个 int 类变量表示持有锁的状态

CLH-队列

CLH:Craig、Landin and Hagersten 队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列 FIFO

AQS 为什么是 JUC 内容中最重要的基石

AQS 相关类

AQS-框架

ReentrantLock.java
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;
 
    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
 
        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();
        // ...
    }
    // ...
}
CountDownLatch.java
public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
 
        Sync(int count) {
            setState(count);
        }
 
        int getCount() {
            return getState();
        }
 
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        // ...
    }
    // ...
}
ReentrantReadWriteLock.java
public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;
    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** Performs all synchronization mechanics */
    final Sync sync;
 
    /**
     * Creates a new {@code ReentrantReadWriteLock} with
     * default (nonfair) ordering properties.
     */
    public ReentrantReadWriteLock() {
        this(false);
    }
 
    /**
     * Creates a new {@code ReentrantReadWriteLock} with
     * the given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
 
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
 
    /**
     * Synchronization implementation for ReentrantReadWriteLock.
     * Subclassed into fair and nonfair versions.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;
        // ...
    }
    // ...
}
Semaphore.java
public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    /** All mechanics via AbstractQueuedSynchronizer subclass */
    private final Sync sync;
 
    /**
     * Synchronization implementation for semaphore.  Uses AQS state
     * to represent permits. Subclassed into fair and nonfair
     * versions.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        // ...
    }
    // ...
}

锁和同步器的关系

  • 锁,面向锁的使用者:定义了程序员和锁交互的使用层 API,隐藏了实现细节,你调用即可。
  • 同步器,面向锁的实现者:比如 Java 并发大神 DougLee 提出统一规范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。

作用

加锁会导致阻塞,有阻塞就需要排队,实现排队必然需要队列。

抢到资源的线程直接使用处理业务,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。

既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是 CLH 队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是 AQS 同步队列的抽象表现。它将请求共享资源的线程封装成队列的结点(Node),通过 CAS、自旋以及 LockSupport.park() 的方式,维护 state 变量的状态,使并发达到同步的效果。

CLH-队列

AQS 初识

AQS 概述

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState, setState and compareAndSetState is tracked with respect to synchronization.

—— 摘自 java.util.concurrent.locks.AbstractQueuedSynchronizer.java

提供了一个框架,用于实现基于先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量、事件等)。这个类旨在作为大多数依赖单个原子整数值来表示状态的同步器的有用基础。子类必须定义改变这种状态的受保护方法,以及定义这个状态在对象被获取或释放方面的含义。鉴于这些,本类中的其他方法执行所有排队和阻塞机制。子类可以维护其他状态字段,但只有使用getState、setState和compareAndSetState方法操作的原子更新的int值与同步有关

有阻塞就需要排队,实现排队必然需要队列。

AQS 使用一个 volatile 的 int 类型的成员变量来表示同步状态,通过内置的 FIFO 队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成一个 Node 节点来实现锁的分配,通过 CAS 完成对 state 值的修改。

AbstractQueuedSynchronizer.java
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
 
    private static final long serialVersionUID = 7373984972572414691L;
    protected AbstractQueuedSynchronizer() { }
    
    static final class Node {
        // ...
    }
 
    private transient volatile Node head;
 
    private transient volatile Node tail;
 
    // 同步状态的标识
    private volatile int state;
 
    protected final int getState() {
        return state;
    }
    // ...
}

AQS 内部体系架构

AQS-内部体系架构图

AQS 自身

AQS 的 int 变量

AQS 的同步状态 state 成员变量 private volatile int state

  • state = 0,无占用;
  • state >= 1,占用,需要等待。
AQS 的 CLH 队列

CLH-队列

CLH 队列(三个大牛的名字组成),为一个双向队列

总结
  • 有阻塞就需要排队,实现排队必然需要队列
  • state 变量 + CLH 双端队列

AQS 内部类 Node

Node 的 int 变量

Node 的等待状态 waitState 成员变量 volatile int waitStatus

等待队列中其他线程的等待状态,队列中每个排队的个体就是一个 Node。

Node 类讲解
  • 内部结构

    AbstractQueuedSynchronizer
    static final class Node {
     
        // 共享
        static final Node SHARED = new Node();
     
        // 独占
        static final Node EXCLUSIVE = null;
     
        // 线程被取消了
        static final int CANCELLED =  1;
     
        // 后继线程需要唤醒
        static final int SIGNAL    = -1;
     
        // 等待 condition 唤醒
        static final int CONDITION = -2;
     
        // 共享式同步状态获取将会无条件地传播下去
        static final int PROPAGATE = -3;
     
        // 初始为 0,状态是上面的几种
        volatile int waitStatus;
     
        // 前置节点
        volatile Node prev;
     
        // 后继节点
        volatile Node next;
     
        volatile Thread thread;
     
     
        Node nextWaiter;
     
     
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
     
     
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
     
        Node() {    // Used to establish initial head or SHARED marker
        }
     
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
     
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
  • 属性说明

AQS 同步队列的基本结构

CLH-队列2

CLH:Craig、Landin and Hagersten 队列,是个单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO)

深入剖析 AQS

Lock 接口的实现类,基本都是通过聚合了一个队列同步器的子类完成线程访问控制的

从最简单的 lock 方法看公平锁和非公平锁

ReentrantLock.java
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    
    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {...}
    
    static final class NonfairSync extends Sync {...}
    
    static final class FairSync extends Sync {...}
    
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
}
public ReentrantLock() {sync = new NonfairSync();}
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
// 创建的是公平锁
private ReentrantLock lock = new ReentrantLock(true);
 
// 创建的是非公平锁
private ReentrantLock lock = new ReentrantLock(false);
 
// 默认创建非公平锁
private ReentrantLock lock = new ReentrantLock();

继续看公平锁和非公平锁的 lock() 代码

公平锁和非公平锁 lock() 的 acquire() 比较

可以明显看出公平锁与非公平锁的 lock() 方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors()hasQueuedPredecessors() 是公平锁加锁时判断等待队列中是否存在有效节点的方法。
hasQueuedPredecessors()
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

从非公平锁的 lock() 方法入手

对比公平锁和非公平锁的 tryAcquire() 方法的实现代码,其实差别就在于非公平锁获取锁时比公平锁中少了一个判断 !hasQueuedPredecessors()hasQueuedPredecessors() 中判断了是否需要排队,导致公平锁和非公平锁的差异如下:

  • 公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中
  • 非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程在 unpark(),之后还是需要竞争锁(存在线程竞争的情况下)

lock()

源码解读

lock()
lock()
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
 
    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    @ReservedStackAccess
    final void lock() {
        // 第一个线程抢占
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // 第二个线程以及后续线程抢占
            acquire(1);
    }
 
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
acquire()
acquire()
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

AQS acquire 方法的三条主要流程

tryAcquire(arg)

以非公平锁为例,tryAcquire(arg) 会调用 nonfairTryAcquire(acquires) 方法

tryAcquire(arg)
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
nonfairTryAcquire(arg)
final boolean nonfairTryAcquire(int acquires) {
    // 获取当前尝试获取锁的线程实例
    final Thread current = Thread.currentThread();
    // 获取当前锁的状态,通常状态0表示锁是空闲的,任何正数值表示锁被持有(在可重入锁的情况下,这个数值表示锁的持有次数)
    int c = getState();
    // 如果当前锁是空闲的(即状态为0)
    if (c == 0) {
        // 尝试通过CAS来设置锁的状态,如果设置成功,意味着当前线程成功获取了锁
        if (compareAndSetState(0, acquires)) {
            // 将当前线程设置为该锁的独占所有者
            setExclusiveOwnerThread(current);
            // 返回true表示获取锁成功。
            return true;
        }
    }
    // 如果当前锁不是空闲的,但当前线程是锁的独占所有者(即可重入的情况),则尝试增加锁的重入次数。
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 设置新的锁状态
        setState(nextc);
        return true;
    }
    // 如果上述条件都不满足,即当前线程既不是尝试获取一个空闲的锁,也不是尝试增加已经由自己持有的锁的重入次数,那么方法返回false,表示获取锁失败。
    return false;
}
addWaiter(Node.EXCLUSIVE)
addWaiter(Node mode)
private Node addWaiter(Node mode) {
    // 创建一个新的节点node,这个节点包含了当前线程和传入的模式(mode),模式可能指示了节点是独占模式还是共享模式。
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 如果当前的尾节点(pred,即前驱节点)不为空
    if (pred != null) {
        // 新节点的前驱就设置为这个尾节点,并尝试使用compareAndSetTail原子操作更新尾节点为新节点。如果这个原子操作成功,意味着新节点已经被成功地添加到队列的末尾,此时将前驱节点的下一个节点指向新节点,然后返回新节点。
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 调用入队方法
    enq(node);
    return node;
}
enq(final Node node)
private Node enq(final Node node) {
    for (;;) {
        // 获取当前尾节点
        Node t = tail;
        // 判断队列是否为空
        if (t == null) { // Must initialize
            // 尝试通过compareAndSetHead原子操作创建一个新的空节点作为头节点(head)。如果成功,同时将tail也指向这个新的头节点。这个步骤是初始化同步队列的过程。
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 设置新节点的prev字段为当前的尾节点t,这样就建立了新节点向前的链接。
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                // 将原尾节点t的next字段指向新节点node,这样就完成了新节点的插入,并建立了从旧尾节点到新尾节点的链接。
                t.next = node;
                return t;
            }
        }
    }
}

在双向链表种,第一个节点为头节点,其实不存储任何信息,知识占位。真正的第一个由数据的节点,是从第二个节点开始的。

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
acquireQueued(final Node node, int arg)
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 拿到node的前驱节点
            final Node p = node.predecessor();
            // node的前驱节点是头节点并且tryAcquire尝试抢锁成功
            if (p == head && tryAcquire(arg)) {
                // 将当前节点设置为头节点,这标志着当前线程已成功获取资源。
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 检查是否应该挂起
            // 如果当前线程的资源获取尝试失败,并且根据shouldParkAfterFailedAcquire(p, node)的结果决定当前线程应该被挂起等待,则调用parkAndCheckInterrupt()挂起当前线程。如果在挂起过程中线程被中断,则interrupted被设置为true。
            // 如果前驱节点的 waitStatis 是SIGNAL状态,即shouldParkAfterFailedAcquire方法会返回 true,程序会继续向下执行parkAndCheckInterrupt方法,用于将当前线程挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 取消这次资源获取尝试
            cancelAcquire(node);
    }
}
shouldParkAfterFailedAcquire(Node pred, Node node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取当前节点的前驱节点的等待状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 这表示前驱节点已经处于等待唤醒(通知)后继节点的状态。在这种情况下,当前节点可以安全地挂起,因为它知道当前驱节点释放资源或被唤醒时,它将收到通知。
        return true;
    if (ws > 0) {
        // 这表示前驱节点被取消。在AQS框架中,取消的节点是不会参与到资源获取的竞争中,也不会转发唤醒信号。因此,如果前驱节点被取消,当前节点需要跳过被取消的节点,向前回溯找到一个未被取消的节点作为新的前驱节点。这个过程一直持续,直到找到一个等待状态不大于0的节点。然后,更新当前节点的前驱节点,以及新前驱节点的后继节点。这种情况下,方法返回false,因为需要重新尝试获取资源而不是直接挂起。
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果前驱节点的等待状态既不是SIGNAL也不是被取消(即等待状态为0或PROPAGATE),这表示需要设置前驱节点的等待状态为SIGNAL,让它知道在释放资源或者被唤醒时需要通知后继节点。这是通过compareAndSetWaitStatus原子操作完成的。在这种情况下,方法同样返回false,表示当前节点不应该立即挂起,而是应该再次尝试获取资源,如果失败,那么在下一轮迭代中可能会决定挂起。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
parkAndCheckInterrupt()
// parkAndCheckInterrupt方法实现了在等待某个条件满足时挂起线程的功能,并在线程被唤醒时检查是否是因为中断而唤醒。如果线程确实因为中断被唤醒,方法返回true,允许调用者处理中断情况。
private final boolean parkAndCheckInterrupt() {
    // 这一行调用LockSupport类的park方法来挂起当前线程。LockSupport是Java并发包中的一个工具类,提供了基本的线程同步机制。park方法可以阻塞当前线程直到它被unpark调用唤醒,或者被中断。
    LockSupport.park(this);
    // 根据park方法API描述,程序在下述三种情况下会继续向下执行
    // 1.被unpark
    // 2.被中断(interrupt)
    // 3.其他不合逻辑的返回才会继续向下执行
    
    // 检查当前线程是否被中断,并且清除当前线程的中断状态。
    // 如果被中断,返回true
    return Thread.interrupted();
}
unlock
unlock()
public void unlock() {
    sync.release(1);
}
release(int arg)
public final boolean release(int arg) {
    // 调用抽象方法tryRelease尝试释放锁
    if (tryRelease(arg)) {
        Node h = head;
        // 如果头节点存在且头节点的等待状态(waitStatus)不为0(即,不是初始状态),这表示队列中可能有因为等待资源而被挂起的线程。
        if (h != null && h.waitStatus != 0)
            // 调用unparkSuccessor(h)方法来唤醒头节点的后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}
unparkSuccessor(Node node)
// 该方法的目的是唤醒在同步队列中等待的后继线程。
private void unparkSuccessor(Node node) {
    
    int ws = node.waitStatus;
    // 检查传入的节点(通常是同步队列的头节点)的等待状态(waitStatus)。如果状态是负数,这表示节点可能在等待一个信号(唤醒)。方法尝试将节点的等待状态更新为0,以预期发送信号。
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
 
    // 找到当前线程的后继节点
    Node s = node.next;
    // 判断后继节点不存在(null)或者已被取消(waitStatus大于0)
    if (s == null || s.waitStatus > 0) {
        // 从同步队列的尾部开始向前遍历,寻找第一个未被取消的后继节点(即,waitStatus小于或等于0的节点)。
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒该节点代表的线程
        LockSupport.unpark(s.thread);
}

总结

https://www.processon.com/view/5e29b0e8e4b04579e40c15a7 (opens in a new tab)