引:java.util.concurrent.locks包中有很多Lock的实现类,内部实现都依赖AbstractQueuedSynchronizer(AQS)类,今天我们就看看AQS如何完成代码块的并发访问控制。
概览
抽象队列同步器AQS是用来构建锁或其他同步组件的基础框架,内部使用一个int成员变量state表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,其中内部同步状态state,等待队列的头节点head和尾节点head,都是通过volatile修饰,保证了多线程之间的可见,同时这些状态上的操作都用通过CAS操作来保持同步。
核心成员变量如下:1
2
3
4
5
6// 等待队列头节点,懒加载
private transient volatile Node head;
// 等待队列尾节点,懒加载
private transient volatile Node tail;
// 同步状态
private volatile int state;
整个AQS的示意图如下:
等待队列
上面说到AQS是通过内置的队列(LH lock实现,不懂的在后面参考中找到解释)来完成同步的,我们就先看看队列的定义:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30/**
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
*/
static final class Node {
// 标记表示节点正在共享模式中等待
static final Node SHARED = new Node();
// 标记表示节点正在独占模式下等待
static final Node EXCLUSIVE = null;
// waitStatus值表示线程取消
static final int CANCELLED = 1;
// waitStatus值表示当前线程的后继线程需要被唤醒
// 一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel
static final int SIGNAL = -1;
// waitStatus值表示线程等待Condition唤醒
static final int CONDITION = -2;
// waitStatus值表示下一个acquireShared应该无条件传播,都获得共享锁
static final int PROPAGATE = -3;
// 表示节点状态值,SIGNAL,CANCELLED,CONDITION,PROPAGATE,0
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 对应线程
volatile Thread thread;
// 资源共享方式:SHARED、独占模式
Node nextWaiter;
}
使用
不同的自定义同步器(后面会挑几种进行源码分析)争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
1 | // 该线程是否正在独占资源。只有用到condition才需要去实现它 |
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。下面我将分别对独占锁的管理流程 acquire-release 和 共享锁管理流程 acquireShared-releaseShare 进行分析。
acquire-release
acquire
acquire方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。它将被Lock接口的lock方法调用。源码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 1. 当前线程通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待。
// 交由自定义同步器去实现,主要是设置同步State,获取成功返回true,失败返回false
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 2. 当前线程在尝试失败的情况下,先通过addWaiter(Node.EXCLUSIVE)来将当前线程加入到CLH队列末尾。
// 根据当前线程和模式创建节点并入队
private Node addWaiter(Node mode) {
// 根据当前线程和模式创建节点
Node node = new Node(Thread.currentThread(), mode);
// tail节点不为空,尝试快速方式直接放到队尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 利用Unsafe的CAS操作,下面将不再提
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// tail节点为空,调用enq方法
enq(node);
return node;
}
// 将节点入队,如果有必要初始化队列
private Node enq(final Node node) {
// CAS自旋,直到成功加入队尾
for (;;) {
Node t = tail;
// 末端节点为空
if (t == null) { // Must initialize
// 创建头节点,并利用CAS操作设置头节点
if (compareAndSetHead(new Node()))
// 将尾节点指针也指向头节点
tail = head;
} else {
// 正常入队
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 3. 当前线程执行完addWaiter(Node.EXCLUSIVE)之后,调用acquireQueued()来获取锁,因为这个时候可能前面的线程已经释放了锁。
// 如果当前线程获取到了锁,则返回;否则,当前线程被挂起,直到唤醒并重新获取锁了才返回。
final boolean acquireQueued(final Node node, int arg) {
//标记是否成功拿到资源
boolean failed = true;
try {
//标记等待过程中是否被中断过
boolean interrupted = false;
// 自旋
for (;;) {
// 拿到前驱节点
final Node p = node.predecessor();
// 如果前驱节点是head节点 并且 获取锁成功
// 这里涉及到锁的公平性问题,体现了非公平性
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为head节点
// head所指的结点,就是当前获取到锁的那个结点或null(初始化时)
setHead(node);
p.next = null; // help GC
failed = false;
// 返回等待过程中是否被中断过
return interrupted;
}
// 如果上面的条件不成立,则挂起当前线程,等待被唤醒或者中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果获取锁出错,则设置为取消状态
if (failed)
cancelAcquire(node);
}
}
// 在获取锁失败之后检查和更新节点的状态,返回当前线程是否应该阻塞(被唤醒)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 拿到前驱节点的等待状态
int ws = pred.waitStatus;
// 等待状态为SIGNAL(-1)
if (ws == Node.SIGNAL)
return true;
// 等待状态为CANCELLED(1)
if (ws > 0) {
// 往前找到最近有效的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
// 等待状态为其他情况
} else {
// 将状态设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 4. 当前线程在执行acquireQueued()时,会进入到CLH队列中休眠等待,直到获取锁了才返回!
// 阻塞并检查是否中断,直到当前线程被唤醒才从parkAndCheckInterrupt()中返回
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 返回中断状态,并清除中断标志,中断并不会取消锁,如果中断会再次被阻塞
return Thread.interrupted();
}
// 5. 如果当前线程在休眠等待过程中被中断过,acquireQueued会返回true,此时当前线程会调用selfInterrupt()来自己给自己产生一个中断。
static void selfInterrupt() {
// 记录中断标志
Thread.currentThread().interrupt();
}
release
release方法是独占模式线程释放资源的入口。它将被Lock接口的release方法调用。源码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public final boolean release(int arg) {
if (tryRelease(arg)) {
// 找到头节点
Node h = head;
// 头节点不为空 且 头节点等待状态不为0
if (h != null && h.waitStatus != 0)
// 唤醒等待队列的下一个线程
unparkSuccessor(h);
return true;
}
return false;
}
// 1. 当前线程通过tryRelease(arg)尝试释放锁,成功则返回true,失败则返回false。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 2. 当前线程释放锁成功后,唤醒等待队列的下一个线程
private void unparkSuccessor(Node node) {
// 获取当前线程状态
int ws = node.waitStatus;
// 如果状态 < 0,则设置状态为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 节点为空 或者 waitStatus > 0 ,从最后开始向前寻找,找到waitStatus小于等于0的节点
// 涉及到了弱一致性的问题(next指针在有个短暂瞬间不一定存在)
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒后继节点对应的线程
if (s != null)
LockSupport.unpark(s.thread);
}
acquireShared-releaseShare
acquireShared
acquireShared(int arg)是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。源码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 1. 当前线程通过tryAcquireShared(int arg)尝试获取锁,负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取
// 交由自定义同步器去实现,主要是设置同步State
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 2. 当前线程通过doAcquireShared(int arg)将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回
private void doAcquireShared(int arg) {
// 加入队列尾部
final Node node = addWaiter(Node.SHARED);
// 标记是否成功
boolean failed = true;
try {
// 标记是否被中断
boolean interrupted = false;
// 自旋
for (;;) {
// 拿到前驱节点
final Node p = node.predecessor();
// 此时node节点被唤醒
if (p == head) {
// 尝试获得资源
int r = tryAcquireShared(arg);
if (r >= 0) {
//将head指向自己,还有剩余资源可以再唤醒之后的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 如果等待过程中被中断过,记录中断标记
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 与独占模式类似
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
// 将head指针指向自己
Node h = head;
setHead(node);
// 唤醒后继节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 关注这里
doReleaseShared();
}
}
releaseShare
doReleaseShared()是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。源码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 1. 当前线程通过tryReleaseShared(int arg)尝试释放锁,成功返回true,失败返回false
// 交由自定义同步器去实现,主要是设置同步State
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 2. 当前线程通过doReleaseShared()唤醒后继节点
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒线程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
总结
知道了AQS的原理,锁的原理应该也就不难了。这次我们再一次见到CAS的威力。当然这次有些还是有一点迷惑,后面有机会再来补充!