Java并发_11_构建自定义同步工具

引:创建状态依赖的类的最简单的方法通常是在类库中现有状态依赖类的基础上进行构造。但如果类库没有提供你需要的功能,我们还可以使用Java语言和类库提供的底层机制来构造自己的同步机制,包括内置的条件队列、显式的Condition对象以及AbstractQueuedSynchronizer框架。

状态依赖性管理

程序在做某一个操作之前,需要依赖另一个操作的完成或者状态的就绪,这样的一种关系就叫做“状态依赖”。

状态依赖的实现类,例如FutureTask、Semaphore和BlockingQueue等。在这些类的一些操作中有着基于状态的前提条件,例如,不能从一个空队列删除元素,或者获取一个尚未结束的任务的计算结果,在这些操作可以执行之前,必须等待队列进入“非空”状态,或者任务进入“已完成”状态。

依赖状态的操作可以一直阻塞直到可以继续执行,这比使他们先失败再实现起来要更为方便且更不容易出错。而内置的条件队列就可以是线程一直阻塞,直到对象进入某个线程可以继续执行的状态,并且当被阻塞的线程可以执行时再唤醒他们。

使用条件队列

条件队列:它使得一组线程(称之为等待线程集合)能够通过某种方式来等待特定的条件变为真。传统的队列是一个个数据,而与之不同的是,条件队列中的元素是一个个正在等待相关条件的线程。

  • Object中的wait、notify和notifyAll方法构成了内部条件队列的API。
  • 对象的内置锁与内部条件是相互关联的,要调用对象X中的条件队列的任何一个方法,必须持有对象X上的锁。
  • Object.wait会自动释放锁,并请求操作系统挂起当前线程,从而使其它线程能够获得这个锁并修改对象的状态。当被挂起的线路醒来时,它将在返回之前重新获取锁。(需要重新竞争,并没有优先获取权)

使用条件队列构造有界缓存示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//未加任何约束的缓冲队列
public abstract class BaseBoundedBuffer<V> {
private final V[] buf;//缓存
private int tail;//队尾
private int head;//队首
private int count;//元素个数

protected BaseBoundedBuffer(int capacity) {
this.buf = (V[]) new Object[capacity];
}

protected synchronized final void doPut(V v) {//入队
buf[tail] = v;//在队尾添加
if (++tail == buf.length)//如果满了,从头开始
tail = 0;
++count;
}

protected synchronized final V doTake() {//出队
V v = buf[head];//从队首取出
buf[head] = null;//GC
if (++head == buf.length)//如果到尾了,则从头开始
head = 0;
--count;
return v;
}

public synchronized final boolean isFull() {//队列是否满
return count == buf.length;
}

public synchronized final boolean isEmpty() {//队列是否空
return count == 0;
}
}

@ThreadSafe
public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
// 条件谓词:not-full(!isFull())
// 条件谓词:not-empty(!isEmpty())

public BoundedBuffer(int size) { super(size); }

// 阻塞并直到: not-full
public synchronized void put(V v) throws InterruptedException {
while (isFull())//如果满,则等待
wait();
doPut(v);
notifyAll();//并在放入后马上通知其他线程
}

// 阻塞并直到: not-empty
public synchronized V take() throws InterruptedException {
while (isEmpty())//如果为空,则等待
wait();
V v = doTake();
notifyAll();
return v;
}
}

条件谓词

条件谓词是使某个操作成为状态依赖操作的前提条件。在有界缓存中,只有当缓存不为空时,take方法才能执行,否则必须等待。对take方法来说,它的条件谓词就是“缓存不为空”,take方法在执行之前必须首先测试该条件谓词。

条件等待存在的三元关系: 包括加锁、wait方法和一个条件谓词。在条件谓词中包含多个包含多个状态变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁,锁对象与条件队列对象(即调用wait和notify等方法所在的对象)必须是同一个对象。

每一次wait调用都会隐式地与特定的条件谓词关联起来。当调用某个特定条件谓词的wait时,调用者必须已经持有与条件队列相关的锁,并且这个锁必须保护着构成条件谓词的状态变量。

过早唤醒

wait方法的返回并不一定意味着线程正在等待的条件谓词已经变真了,因为也许是因为与同一条件队列相关的另一个条件谓词变成了真。

当使用条件等待时要满足的条件(Object.wait或Condition.wait)

  • 通常都有一个条件谓词——包括一些对象状态的测试,线程在执行前必须首先通过这些测试。
  • 在调用wait之前测试条件谓词,并且从wait中返回时再次进行测试。
  • 在一个循环中调用wait。
  • 确保使用与条件队列相关的锁来保护构成条件谓词的各个状态变量。
  • 当调用wait/notify/notifyAll等方法时,一定要持有与条件队列相关的锁。
  • 在检查条件谓词之后以及开始执行相应的操作之前,不要释放锁。

丢失的信号

丢失的信号: notify或者notifyAll操作发生在wait之前,就会造成通知信号的丢失,最终wait永远都得不到恢复或者不得不等待下一次重新通知而延迟了恢复时间。

通知

每当在等待一个条件时,一定要确保在条件谓词变为真时通过某种方式发出通知

发出通知的线程应该尽快地释放锁,从而确保正在等待的线程尽可能快地解除阻塞。如果这些等待中线程此时不能重新获得锁,那么无法从wait返回。

只有同时满足以下两个条件时,才能用单一的notify而不是notifyAll:

  • 所有等待线程的类型都相同。只有一个条件谓词与条件队列相关,并且每个线程在wait返回后将执行相同的操作。
  • 单进单出。在条件变量上的每次通知,最多只能唤醒一个线程来执行。

显式的Condition对象

内置条件队列的局限性:每个内置锁都只能有一个相关联的条件队列,因而在像BoundBuffer这种类中,多个线程可能在同一个条件队列上等待不同的条件谓词,并且在最常见的加锁模式下公开条件队列对象。

显示条件队列的优势:可以编写一个带有多个条件谓词的并发对象,或者获得除了条件队列可见性之外的更多控制权,这是一种灵活的选择;对于每个Lock,可以有任意数量的Condition对象。Condition对象继承了相关的Lock对象的公平性,对于公平的锁,线程会依照FIFO顺序从Condition await中释放。

特别注意:Condition对象中,三个与条件队列相关的API是:await,signal,signalAll。

下面是使用显示的Condition对象实现的有界缓存:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ConditionBoundedBuffer<T> {
protected final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();//条件:count < items.length
private final Condition notEmpty = lock.newCondition();//条件:count > 0
private final T[] items = (T[]) new Object[100];
private int tail, head, count;

public void put(T x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();//等到条件count < items.length满足
items[tail] = x;
if (++tail == items.length)
tail = 0;
++count;
notEmpty.signal();//通知读取等待线程
} finally {
lock.unlock();
}
}

public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();//等到条件count > 0满足
T x = items[head];
items[head] = null;
if (++head == items.length)
head = 0;
--count;
notFull.signal();//通知写入等待线程
return x;
} finally {
lock.unlock();
}
}
}

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。例如:ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock、SynchronousQueue和FutureTask。

下面是AQS中获取操作和释放操作的标准形式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
boolean acquire() throws InterruptedException
{
while (当前状态不允许获取操作)
{
if (需要阻塞获取请求)
{
如果当前线程不在队列中,则将其插入队列
阻塞当前线程
}
else
返回失败
}
可能更新同步器的状态
如果线程位于队列中,则将其移出队列
返回成功
}

void release()
{
更新同步器的状态
if (新的状态允许某个被阻塞的线程获取成功)
解除队列中一个或多个线程的阻塞状态
}

总结

要实现一个依赖状态的类——如果没有满足依赖状态的前提条件,那么这个类的方法必须阻塞,那么最好的方式是基于现有类库来构建,例如Semaphore.BlockingQueue或CountDownLatch。然而,有时候现有的类库不能提供足够的功能,在这种情况下,可以使用内置的条件队列、显式的Condition对象或者AbstractQueuedSynchronizer来构建自己的同步器。内置条件队列与内置锁是紧密绑定在一起的,这是因为管理状态依赖性的机制必须与确保状态一致性的机制关联起来。同样,显式的Condition与显式地Lock也是紧密地绑定在一起的,并且与内置条件队列相比,还提供了一个扩展的功能集,包括每个锁对应于多个等待线程集,可中断或不可中断的条件等待,公平或非公平的队列操作,以及基于时限的等待。

参考

  1. 《Java并发编程实战》