AQS

AQSAbstractQueuedSynchronizer 的简称,即 抽象队列同步器,从字面意思上理解:

  • 抽象:抽象类,只实现一些主要逻辑,有些方法由子类实现;
  • 队列:使用先进先出(FIFO)队列存储数据;
  • 同步:实现了同步的功能。

AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的同步器。

资源共享模式

资源有两种共享模式,或者说两种同步方式:

  • 独占模式(Exclusive):资源是独占的,一次只能一个线程获取。如 ReentrantLock。
  • 共享模式(Share):同时可以被多个线程获取,具体的资源个数可以通过参数指定。如 Semaphore/CountDownLatch。

源码解析

基本结构

AQS 类本身实现的是一些排队和阻塞的机制,比如具体线程等待队列的维护(如获取资源失败入队/唤醒出队等)。它内部使用了一个先进先出(FIFO)的双端队列,并使用了两个指针 head 和 tail 用于标识队列的头部和尾部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/** CLH Nodes */
abstract static class Node {
volatile Node prev; // initially attached via casTail
volatile Node next; // visibly nonnull when signallable
Thread waiter; // visibly nonnull when enqueued
volatile int status; // written by owner, atomic bit ops by others

// methods for atomic operations
final boolean casPrev(Node c, Node v) { // for cleanQueue
return U.weakCompareAndSetReference(this, PREV, c, v);
}
final boolean casNext(Node c, Node v) { // for cleanQueue
return U.weakCompareAndSetReference(this, NEXT, c, v);
}
final int getAndUnsetStatus(int v) { // for signalling
return U.getAndBitwiseAndInt(this, STATUS, ~v);
}
final void setPrevRelaxed(Node p) { // for off-queue assignment
U.putReference(this, PREV, p);
}
final void setStatusRelaxed(int s) { // for off-queue assignment
U.putInt(this, STATUS, s);
}
final void clearStatus() { // for reducing unneeded signals
U.putIntOpaque(this, STATUS, 0);
}

private static final long STATUS
= U.objectFieldOffset(Node.class, "status");
private static final long NEXT
= U.objectFieldOffset(Node.class, "next");
private static final long PREV
= U.objectFieldOffset(Node.class, "prev");
}

状态

AQS 使用 int 成员变量 state 表示同步状态,通过内置的 FIFO 线程等待/等待队列 来完成获取资源线程的排队工作。

state 变量由 volatile 修饰,用于展示当前临界资源的获锁情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* The synchronization state.
*/
private volatile int state;

/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}

/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}

获取资源

获取资源的入口是 acquire(int arg) 方法。arg 是要获取的资源的个数,在独占模式下始终为 1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg))
acquire(null, arg, false, false, false, 0L);
}
  • 首先调用 tryAcquire(arg) 尝试去获取资源。前面提到了这个方法是在子类具体实现的。
  • 如果获取资源失败,就通过 addWaiter(Node.EXCLUSIVE) 方法把这个线程插入到等待队列中。其中传入的参数代表要插入的 Node 是独占式的。
  • 处于等待队列的结点是从头结点一个一个去获取资源的。
  • 结点进入等待队列后,是调用 park 使它进入阻塞状态的。只有头结点的线程是处于活跃状态的。

释放资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

private void unparkSuccessor(Node node) {
// 如果状态是负数,尝试把它设置为0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 得到头结点的后继结点head.next
Node s = node.next;
// 如果这个后继结点为空或者状态大于0
// 通过前面的定义我们知道,大于0只有一种可能,就是这个结点已被取消
if (s == null || s.waitStatus > 0) {
s = null;
// 等待队列中所有还有用的结点,都向前移动
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果后继结点不为空,
if (s != null)
LockSupport.unpark(s.thread);
}