多线程-AQS源码分析

前言

之前我们已经对JUC中的各种同步器有所了解,并且看到了ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore这些同步器内部都是通过AQS(Abstract Queued Synchronizer)来实现的。

我们就来看看AQS的源码。

AQS概览

在分析AQS的源码之前,我们先从整体脉络上来理解AQS的设计思路。

AQS功能

ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore这些同步器都是基于AQS实现的,它们的功能都是实现多线程间的协调同步,因此,AQS便是将多线程同步中的共性逻辑抽象了出来。

首先,由JMM我们知道,多线程之间同步肯定少不了通过共享变量来实现信息在多个线程之间的传递;

其次,多线程编程中往往涉及到等待,比如等待其他线程的结果、等待锁等等(最终归根结底是等待其他线程对共享变量的修改),而等待就涉及到线程的挂起和唤醒。

基于此,AQS中有一个共享变量用于多线程间传递信息,并将各个线程对共享变量的修改抽象为了两个行为:

  • 获取资源(acquire):获取资源的结果直接关系到本线程的执行,如果成功,线程便可继续向下执行,否则,线程被挂起。
  • 释放资源(release):释放资源的行为不影响本线程的执行,但是如果归还成功,会尝试唤醒之前因为获取资源失败而挂起的其他线程,让其继续尝试获取资源。

至于 acquire 和 release 到底是对共享变量的什么操作(加或减,加1或加2)、acquire 和 release 行为结果的判定,就由继承了AQS的子类自己决定了。

此外,acquire 和 release 操作又分为了几种类型:

  • 根据是否同时只能有一个线程acquire成功:独占的 或 共享的
  • 根据挂起后是否可被interrupt唤醒:可被打断的 和 不可被打断的
  • 根据挂起是否有超时时间:限时等待的 和 无限等待的

ReentrantLock采用的是独占模式,CountDownLatch和Semaphore采用的是共享模式,ReentrantReadWriteLock中,读锁采用共享模式,写锁采用独占模式。

AQS数据结构

AQS的主要成员变量就三个:

1
2
3
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;

其中 state 被称为同步状态共享资源,而 headtail 分别是同步队列的头、尾节点。

而通过Node内部类的数据结构:

1
2
3
4
5
6
7
8
9
static final class Node {
...
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
...
}

我们可以看到,这个同步队列是个双向链表,且节点中还存储了线程实例。

也就是说,AQS的核心数据结构就是一个volatile共享变量和一个同步队列。

  • 共享变量如之前所述,用于线程间传递信息。
  • 同步队列则用于存储所有需要同步的线程,通过队列来管理线程的挂起和唤醒。

以ReentrantLock为引分析AQS的独占同步

acqurie 操作

首先我们以ReentrantLock的非公平加锁lock()为源头,来追踪 acquire 操作的调用链。

ReentrantLock类的lock():

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ReentrantLock implements Lock, java.io.Serializable {
...
private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {
...
}
...
public void lock() {
sync.lock();
}
...
}

lock() 中调用了 synclock() 方法,其中 syncReentrantLock 的成员变量,类型是 Sync,而 SyncReentrantLock 的内部类,继承自 AQS

1
2
3
4
5
6
7
/* Performs lock.  Try immediate barge, backing up to normal acquire on failure. */
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

synclock() 中,首先立刻尝试通过CAS操作将AQS中的state变量设为1,如果成功了,就将锁的持有者设置为本线程,也就是拿到锁了。

如果CAS失败了,表示state中的当前值不为0,那么有两种可能,一种是锁被其他线程持有,另一种是锁被本线程持有,因此,还需要继续往下执行 acquire() 方法。

acquire() 方法在 AQS 类中:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

if 判断中,首先执行 tryAcquire() 方法, 在AQS中,tryAcquire() 是个抽象方法,需要其子类自己实现,这就又回到了 ReentrantLock 的内部类 Sync 中:

1
2
3
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

其中逻辑也很简单,如果 state 为0,表示锁是空闲的,尝试通过 CAS 操作将 state 设为1,若CAS操作成功,则成功获取锁,返回 true,否则返回 false

如果 state 不为0,再判断锁的持有者是否是当前线程,如果是,那么可重入,成功获取锁,返回 true,否则的话,获取锁失败,返回 false

我们再回过头来看acquire():

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

如果成功获取锁,返回的是ture,那么就直接跳出 if 判断,从acquire()方法中返回了,接着从 sync.lock 中返回,ReentrantLocklock 方法就执行完了,成功获取到了锁,线程可以继续往下运行。

我们重点分析下 tryAcquire() 返回 false 的情况。

如果返回 false, 会继续执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

其中 addWaiter() 是向队列中添加节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 首先直接尝试通过CAS操作将新节点插入到队尾,失败了的话,再调用enq()方法入队
// 插入失败有两种可能:1. 队列尚未初始化 2. CAS失败,那就意味着有其他线程竞争
Node pred = tail;
if (pred != null) { // 判断队列头尾节点是否已经初始化完成
node.prev = pred; // ①
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

private Node enq(final Node node) {
// 死循环,直到新节点成功入队
for (;;) {
Node t = tail;
if (t == null) { // 如果没有初始化,首先初始化头尾节点
if (compareAndSetHead(new Node()))
tail = head;
} else { // 尝试将新节点入队
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

addWaiter() 方法中,首先尝试直接用CAS操作将新节点快速插入队尾,如果插入失败,就调用 enq() 方法,enq() 方法中通过循环CAS的方式保证插入成功才返回。

有两种可能会导入快速插入失败:

  1. 队列未初始化。队列的headtail节点都是懒初始化的,也就是需要的时候才初始化。if (pred != null) 实际上是在判断队列是否已经初始化完成。初始化的过程在 enq() 方法中。
  2. 有线程竞争。

通过enq()中对队列的初始化过程,不难发现,队列的头结点实际上是个空节点,真正的有意义的节点从第二个开始。

还有一个细节①:将新节点插入队尾的时候,先将新节点的前序节点指向了tail,之后再通过CAS操作设置tail节点的值。细节我们最后再讨论,先梳理整体脉络。

入队后,就进入 acquireQueued 方法了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 如果排在第一个节点,尝试acquire
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

首先看最外层的try{...}finally{...},如果发生了异常,就取消acquire动作,实际上就是把该线程的节点从队列中删除,然后返回。

方法的主要逻辑是个 for (;;) 死循环,循环中的步骤:

  1. if (p == head && tryAcquire(arg)) p是当前节点的前序节点,我们之前分析过,head是一个空节点,那么如果p是head,就表示当前节点是排在队列最前面的节点,根据 && 的熔断原理,只有p == head条件满足,才会执行 tryAcquire 方法,尝试 acquire 操作,如果成功,那么本节点出队,从该方法返回,接着依次从 acquire() 方法、sync.lock() 方法、lock()方法返回,也就是获取锁成功,线程继续往下运行了。

  2. 如果不是排在第一个的节点,或者尝试 acquire 失败了,就需要执行 shouldParkAfterFailedAcquire 方法判断是否需要将线程挂起。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; // 前序节点中的ws存储了本线程是否需要唤醒
    if (ws == Node.SIGNAL) // 如果需要唤醒,直接返回ture,接下来就会挂起本线程
    return true;
    if (ws > 0) { // 如果上一个节点已经无效了,删除上一个节点
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else { // 如果前序节点中的ws是0,说明本节点是新插入的,将前序节点的ws设为SIGNAL
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    }

    要理解这个方法,就要弄明白节点数据结构 NodewaitStatus 的含义,Node类中的注释解释的很详细:

    1
    2
    3
    4
    5
    static final int CANCELLED =  1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    volatile int waitStatus;

    waitStatus 是个int类型的变量,可能的取值有:1(CANCELLED),0(初始值),-1(SIGNAL),-2(CONDITION),-3(PROPAGATE)。

    其中 CONDITION 只会在条件通知 Condition 中用到, 而 PROPAGATE 只会在共享同步中用到,对于我们现在分析的独占同步来说,waitStatus只有可能是:

    • 0:初始值,acquire()中调用了addWaiter(),而addWaiter()new Node(Thread.currentThread(), mode);,此时,节点的waitStatus就是0。
    • -1:SIGNAL,表示后序节点需要唤醒,注意是后续节点!这也是为什么需要一个空的头结点的原因,头节点中的waitStatus实际上指示了第一个有意义的节点中的线程是否需要唤醒。
    • 1:CANCELLED,即本节点已经被取消了,无效了,需要删除。

    了解了waitStatus的含义,我们再回头来看shouldParkAfterFailedAcquire方法,就很清晰明了了,首先查看前序节点中的waitState:

    • 如果是SIGNAL,就直接返回ture,表示本线程需要挂起;
    • 如果是CANCELLED,表示前序节点已经无效了,这里正式删除出队列;
    • 如果是0,表示本节点是新插入,那么将前序节点的ws设为SIGNAL。

    对于后两种情况,实际上是返回false的,但是别忘了这个函数外面还是个死循环呢,所以还会回来,终究会返回true的。

    于是返回true。同样根据 && 的熔断原理,parkAndCheckInterrupt() 方法就会执行。

    1
    2
    3
    4
    private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted(); // ②
    }

    parkAndCheckInterrupt方法非常简单,就是通过 LockSupport.park(this) 将本线程挂起。

    也就是说,程序执行到这里,本线程就因为争抢锁失败而挂起了。

  3. 程序被唤醒,通常是由于其他线程释放了锁,执行了 release 操作,于是,程序继续从 return Thread.interrupted(); 处执行,并从parkAndCheckInterrupt() 方法中返回。

    这里还有个细节②:调用 Thread.interrupted()方法查询当前线程的中断标志后,会把中断标志清除。

  4. 继续 for(;;) 循环,也就是回到了步骤1。

至此,AQS中的 acquire 操作就基本梳理清楚了:

  • AQS中对state的实际修改如何在子类中实现的:通过 tryAcquire 抽象方法
  • AQS如何判断 acquire 操作是否成功:还是通过 tryAcquire 抽象方法
  • AQS如何通过消息队列管理线程,将线程挂起

release操作

release操作相对就简单多了。

ReentrantLockunlock() 方法开始追踪:

1
2
3
public void unlock() {
sync.release(1);
}
1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease 同样是抽象方法,由子类实现。这里是 ReentrantLock 的逻辑,比较好理解,就不多解释了。

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

接下来回头继续看 release() 方法,判断 if (h != null && h.waitStatus != 0),这是在判断队列中是否有节点,如果有节点,就唤醒第一个节点(即头结点的后序节点):unparkSuccessor(h)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void unparkSuccessor(Node node) {	// 这里node就是head,头结点
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next; // 直接取出头结点的后序节点
if (s == null || s.waitStatus > 0) { // 如果head的后序节点无效,那么从tail尾节点往前找,找到队列最前面的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // ③
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

unparkSuccessor() 就是唤醒队列排最前面的线程,但是这里面也是有个细节③:为什么头节点的后序节点可能无效?为什么要从尾节点往前找?

细节:

① :AddWaiter() 方法中,将新节点插入队尾的时候,为何先将新节点的前序节点指向了tail,之后再通过CAS操作设置tail节点的值?

首先明确一下插入节点的三个动作:

  • A. 将原tail节点的 next 指向新节点
  • B. 将新节点的 prev 指向原tail节点
  • C. 将tail节点指向新节点,这一步是CAS操作。

其次,我们需要明白这个操作可能发生在高并发竞争下,即可能同时有多个线程调用AddWaiter()尝试添加新节点。

那么,A肯定不能在C之前执行,因为CAS是可能失败的,如果线程1和线程2同时调用AddWaiter(),线程1先执行了A,接着线程2执行了A,然后线程1执行C成功,而线程2执行C失败了,这样,原tail的next节点本应该指向线程1节点的,但实际上却指向了线程2节点。

于是只剩下了 C -> A -> B,C -> B -> A,A -> C -> B

其实C -> A -> B 和 C -> B -> A是一样的,所以一起讨论,这种顺序有没有问题呢?

乍一看,多个线程同时AddWaiter()肯定是没问题了,但是如果一个线程在AddWaiter()且恰好执行完C,但A和B还没有执行,而另一个线程在unparkSuccessor唤醒呢?这时候,tail节点已经指向新节点了,但是从head正序遍历是找不到新节点的,而新节点也找不到其前序节点,而唤醒条件waitState却需要从前序节点中取,因此,在高并发下也是有问题的。

而A -> C -> B的顺序,则至少保证了,只要tail节点的值更新了,那么其前序节点一定是有效的。实际上这也解释了细节③。

我们这就看看细节③:unparkSuccessor中为什么头节点的后序节点可能无效?为什么要从尾节点往前找?

正如细节①所述,A -> C -> B 的执行顺序下,高并发下,从head往后遍历不一定是可靠的,可能原tail节点的next还没有来得及指向新节点。但是prev节点一定是可靠的,所以优先尝试往后找,如果找不到,就从tail往前找。

②:parkAndCheckInterrupt为何要调用 Thread.interrupted()?该方法查询当前线程的中断标志后,会把中断标志清除。

同时,我们注意到,在acquire方法中,如果发现是中断唤醒的,还会用selfInterrupt()补上中断标志。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

这不是多此一举吗?明明可以调用不清中断标志位的isInterrupted()方法。

原因就在于,如果不把中断标志清除,调用park()方法是不会挂起线程的。由于park()是native方法,就需要去Hotspot代码中去求证了,这里就不赘述了。

以CountDownLatch为引分析AQS的共享同步

acquire操作

我们最开始就分析过,acquire操作失败会导致线程挂起,因此我们从CountDownLatch的await()方法开始追踪:

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

和ReentrantLock一样,sync的类型是Sync,也是一个内部类,继承自AQS:

1
2
3
4
5
6
7
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
...
}
private final Sync sync;
...
}

我们继续看 sync.acquireSharedInterruptibly(),这是个AQS中的方法:

1
2
3
4
5
6
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

和独占同步类似,tryAcquireShared()也是个抽象方法,由子类CountDownLatch的内部类Sync实现,其中定义了对共享变量state的操作,以及acquire的结果:

1
2
3
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

但和独占同步的 tryAcquire()方法不同的是,tryAcquire() 方法返回Boolean类型,true是成功,false是失败;而 tryAcquireShared() 方法返回的是int,负数表示失败,0表示成功但已经没有可用资源,正数表示成功且仍有可用资源。

如果acquire成功的话,就会依次从acquireSharedInterruptibly()await()方法返回,线程也就继续往下运行了。

如果acquire失败,就会执行 doAcquireSharedInterruptibly() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) { // 判断是否排在同步队列最前面
int r = tryAcquireShared(arg); // 尝试获取同步资源
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

和独占acquire的逻辑几乎一样:一个for(;;)死循环,循环中首先判断是否排在同步队列最前面,如果是,则尝试acquire,如果acquire成功了,那么就跳出循环,线程继续往下运行,如果acquire失败,就再次把线程挂起。

不同点就在于:

1
2
3
4
5
6
7
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}

setHeadAndPropagate() 方法中会判断传入的变量 r,也就是 tryAcquireShared() 方法的返回值,如果大于0(即仍然有共享资源可以acquire),就会再调用一次 doReleaseShared(),唤醒下一个线程尝试 acquire。

release操作

CountDownLatchcountDown() 方法追踪:

1
2
3
public void countDown() {
sync.releaseShared(1);
}

AQS中的releaseShared()

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

同样,tryReleaseShared()方法由子类实现:

1
2
3
4
5
6
7
8
9
10
11
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

最后来看 doReleaseShared() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}