JDK源码分析——ReentrantReadWriteLock

引:读写分离是解决并发瓶颈的常用策略,在Java中也有其实现——ReentrantReadWriteLock,它能够有效的提高读比写多的场景下的程序性能。

概览

ReentrantReadWriteLock是通过两把锁实现读写分离的,分别是读锁和写锁。它的源码如下:

1
2
3
4
5
6
7
8
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {

// 读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
// 同步器
final Sync sync;

写锁

写锁主要是通过重写同步器的tryAcquire和tryRelease实现,其他逻辑都可以参考AQS的解析。

tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Sync
static final int SHARED_SHIFT = 16;
// 重入最大次数
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 独占锁(写锁)掩码
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

protected final boolean tryAcquire(int acquires) {

Thread current = Thread.currentThread();
int c = getState();
// 用 state & 65535 得到低 16 位的值
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 如果 state 不是0,且低16位是0,说明写锁是空闲的,读锁被霸占了。那么也不能拿锁,返回 fasle。保证了读的时候不能写。
// 如果低 16 位不是0,说明写锁被霸占了,此时如果持有锁的不是当前线程,那么这次拿锁是失败的。返回 fasle。
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 这里时候应该是写重入锁,如果写重入次数超过最大值 65535,就会溢出
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
// writerShouldBlock 判断是否应该阻塞
// 1. 公平锁情况下,如果队列中有等待锁的线程,则返回ture,应该阻塞
// 2. 非公平锁情况下,返回false,不应该阻塞,直接参与竞争
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 竞争到锁,设置独占线程
setExclusiveOwnerThread(current);
return true;
}

tryRelease

1
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean tryRelease(int releases) {
// 是否持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 设置state状态
int nextc = getState() - releases;
// 计算写锁的状态(低16位),如果是0,说明是否成功
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

读锁

写锁主要是通过重写同步器的tryAcquireShared和tryReleaseShared实现,其他逻辑都可以参考AQS的解析。

tryAcquireShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// Sync
// 共享(读锁)重入次数基数
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 第一个获得读锁的线程
private transient Thread firstReader = null;
// 第一个获得读锁的线程的重入次数计数器
private transient int firstReaderHoldCount;
// 最后一个获取读锁的线程的计数器,存放在ThreadLocal中
private transient HoldCounter cachedHoldCounter;

protected final int tryAcquireShared(int unused) {

Thread current = Thread.currentThread();
int c = getState();
// 用 state & 65535 得到低 16 位的值 不等于0,写锁被霸占了
// 且
// 持有锁的不是当前线程
// 保证了写的时候不能读
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
// 获取锁失败
return -1;
// 如果写锁没有被霸占,则将高16位移到低16位
int r = sharedCount(c);
// readerShouldBlock判断是否应该阻塞 和写锁逻辑一致
// 写锁重入次数不超过最大值 65535
// 设置state成功 (相当于高16位加1)
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 读锁空闲
if (r == 0) {
firstReader = current;
// 计数器为1
firstReaderHoldCount = 1;
// 第一个读线程是当前线程
} else if (firstReader == current) {
// 计数器加1
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
// 如果最后一个线程计数器是 null 或者不是当前线程
if (rh == null || rh.tid != getThreadId(current))
// 新建一个 HoldCounter 对象
cachedHoldCounter = rh = readHolds.get();
// 如果不是 null,且 count 是 0
else if (rh.count == 0)
// 就将上个线程的 HoldCounter 覆盖本地的(性能考虑,相当于缓存)
readHolds.set(rh);
// 计数器加1
rh.count++;
}
// 获得锁成功
return 1;
}
// 死循环获取读锁,和tryReleaseShared逻辑类似,只有多了死循环
return fullTryAcquireShared(current);
}

锁降级

在tryAcquireShared还体现了锁降级的概念。概念如下:


重入还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。

体现在代码中如下:

1
2
3
4
// tryAcquireShared 或者 fullTryAcquireShared中
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;

上面的代码就体现出:当写锁被持有的时候,如果当前是线程是持有写锁的那个线程,可以继续获得读锁。

总得来说就提高了性能。

tryReleaseShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 当前线程重入次数计数器
private transient ThreadLocalHoldCounter readHolds;

protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 如果是第一个线程
if (firstReader == current) {
// 如果是 1,将第一个线程设置成 null
if (firstReaderHoldCount == 1)
firstReader = null;
// 如果不是 1,减一操作
else
firstReaderHoldCount--;
// 如果不是当前线程
} else {
HoldCounter rh = cachedHoldCounter;
// 如果最后一个线程计数器是 null 或者缓存所属线程不是当前线程
if (rh == null || rh.tid != getThreadId(current))
// 获取当前线程的计数器
rh = readHolds.get();
int count = rh.count;
// 如果计数器小于等于一,就直接删除计数器
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
// 对计数器减一
--rh.count;
}
// 死循环使用 CAS 修改状态
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 修改成功后,如果是 0,表示读锁和写锁都空闲,则可以唤醒后面的等待线程
return nextc == 0;
}
}

总结

关于读写锁,它其实就是一个读锁一个写锁,读锁是共享的,写锁是独占的。然后我们再理解锁降级的相关概念就行了,当然这一切都是需要建立在读AQS的理解之上。

参考

  1. JDK源码分析——AbstractQueuedSynchronizer
  2. 并发编程之——写锁源码分析
  3. 并发编程之——读锁源码分析(解释关于锁降级的争议)