JDK源码分析——ReentrantLock

引:虽然有了synchronized这种内置的锁功能,但是在JDK5之后又新增了Lock接口,它的实现类可比内置的锁强大多了。今天我们主要看看它的实现类之一ReentrantLock。

概览

在看之前,我们一定要对AbstractQueuedSynchronizer熟悉,不熟悉的可以参考我这篇文章——JDK源码分析——AbstractQueuedSynchronizer

我们先看看他的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ReentrantLock implements Lock, java.io.Serializable {
// 同步器(可选择公平同步器或者非公平同步器)
private final Sync sync;

// 默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// fair为true时,采用公平锁策略
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}

}

同步器抽象类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
// 非公平的TryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 为0,则可获得锁,进行CAS操作设置state,并设置当前线程为独占线程
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入锁的体现
// 如果当前线程为独占线程,则可重入
else if (current == getExclusiveOwnerThread()) {
// 重入次数增加
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 获取失败
return false;
}

// 释放锁
protected final boolean tryRelease(int releases) {
// 截取释放次数
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 重入次数要减到0,才是真正得释放了锁
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 维护了一个条件队列
final ConditionObject newCondition() {
return new ConditionObject();
}
}

公平锁

公平锁是指如果同步器的队列中有线程在等待,后来的线程则直接加入到队列中
公平同步器实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static final class FairSync extends Sync {
final void lock() {
// AQS分析逻辑
acquire(1);
}
// 公平实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 可获得锁
if (c == 0) {
// 如果同步器的队列中没有线程在等待 或者 CAS成功 或者设置独占线程成功
// 公平的体现
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

非公平锁

非公平锁是指后来的线程也有同样的优先级。

非公平同步器实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static final class NonfairSync extends Sync {

final void lock() {
// 利用CAS把state从0设置为1
// 成功则获得锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 进入AQS分析逻辑
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
// 非公平获取
return nonfairTryAcquire(acquires);
}
}

条件变量Condition

Synchronized中,所有的线程都在同一个object的条件队列上等待。而ReentrantLock中,每个condition都维护了一个条件队列。我们先看看在同步器定义的ConditionObject对象。

1
2
3
4
5
6
public class ConditionObject implements Condition, java.io.Serializable {
// 条件队列的头节点
private transient Node firstWaiter;
// 条件队列的尾节点
private transient Node lastWaiter;
}

我们关注它的两个核心方法(条件变量Condition为了解决Object.wait/notify/notifyAll难以使用的问题):

1
2
void await() throws InterruptedException;
void signal();

await

阻塞线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public final void await() throws InterruptedException {
// 如果线程被中断,则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 1. 将线程添加到条件等待队列
Node node = addConditionWaiter();
// 2. 释放持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 是否在AQS的同步队列中
while (!isOnSyncQueue(node)) {
// 3. 不是则挂起该线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 被唤醒后,通过acquireQueued方法重新竞争锁,参考AQS
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清理取消节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 报告中断信息
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

// 1. 将线程添加到条件等待队列
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果最后一个节点是取消状态
if (t != null && t.waitStatus != Node.CONDITION) {
// 从队列删除取消节点
unlinkCancelledWaiters();
t = lastWaiter;
}
// 线程包装成节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

// 2. 释放持有的锁
final int fullyRelease(Node node) {
// 标记是否失败,失败则将节点设为取消状态
boolean failed = true;
try {
int savedState = getState();
// release可以参考AQS的解析
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

signal

唤醒线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public final void signal() {
// 当前线程是否持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 找到底部个非取消节点,遇到取消节点就进行删除
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}


final boolean transferForSignal(Node node) {
// 将状态设置为0,不行就取消该节点
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 将节点放入到同步队列中
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒线程
LockSupport.unpark(node.thread);
return true;
}

总结

ReetrantLock还提供了其它功能,包括定时的锁等待、可中断的锁等待、公平性、以及实现非块结构的加锁、Condition,对线程的等待和唤醒等操作更加灵活。但是内置锁(Synchronized)与ReentrantLock相比有例外一个优点就是在线程转储中能给出在哪些调用帧中获得了哪些锁,并能够检测和识别发生死锁的线程。因为Reentrant的非块状特性意味着获取锁的操作不能与特定的栈帧关联起来。相比之下内置锁是JVM的内置属性,所以未来更可能提升synchronized而不是ReentrantLock的性能,而照目前的趋势来看确实如此。

参考

  1. JDK源码分析——AbstractQueuedSynchronizer
  2. 深入浅出ReentrantLock