Java并发_4_基础构建模块

引:JDK提供的东西效率应该是可以保证的,所以我们要学会去使用JDK自带的并发基础构建模块,以及理解在使用这些模块来构建应用程序时的一些常用模式。

同步容器类

同步容器类包括Vector和Hashtable。这些类实现线程安全的方式是:将他们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。

同步容器类的问题

先看下面的代码:

1
2
3
4
5
6
7
8
9
public static Object getLast(Vector list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}

public static void deleteLast(Vector list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}

这些方法看起来没有问题,但是如果线程A在执行deleteLast, 线程B在执行getLast,list中有10个元素,刚好B在执行list.size()和get(lastIndex)之间,线程A执行完了remove(lastIndex), 那么线程B在执行get(lastIndex)时就会抛出ArrayIndexOutOfBoundsException。

由于同步容器类要遵守同步策略,即客户端加锁,因此在创建一些新的操作时,只要我们知道应该使用哪一个锁,那么这些新操作就与容器的其他操作一样都是原子操作。如下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public static Object getLast(Vector list) {
synchronized(list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}

public static void deleteLast(Vector list) {
synchronized(list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}

在调用size和相应的get之间,Vector的长度可能会发生变化,这种风险在对Vector中的元素进行迭代时仍然会出现,如下面的代码:

1
2
3
4
5
// 可能抛出ArrayIndexOutOfBoundsException的迭代操作

for (int i = 0; i < vector.size(); i++) {
doSomething(vetor.get(i));
}

我们可以通过在客户端加锁来解决不可靠迭代的问题,但是要牺牲一些伸缩性。通过在迭代期间持有Vector的锁,可以防止其他线程在迭代期间修改Vector。如下面的代码:

1
2
3
4
5
synchronized (vector) {
for (int i = 0; i < vector.size(); i++) {
doSomething(vetor.get(i));
}
}

迭代器与ConcurrentModificationException

对容器类进行迭代的标准方式是使用Iterator,然而,如果有其他线程并发地修改容器,那么即使是使用迭代器也无法避免地需要在同步容器上加锁。在设计同步容器类的迭代器时并没有考虑到并发修改的问题,它们的迭代器是“及时失败”的,所以当它们发现容器在迭代过程中发生变化,就会抛出一个ConcurrentModificationException异常。这种fail-fast机制并不是一种完备的处理机制,而只是“善意地”捕获并发错误,因此只能作为并发问题的预警指示器。它们采取的实现方式是将计数器变化与容器关联起来:如果在迭代期间计数器被修改,那么hasNext或next将抛出ConcurrentModificationException。然而,这种检查是在没有同步的情况下进行的,因此可能会看到失效的值,而迭代器可能并没有意识到已经发生了修改。要想避免出现ConcurrentModificationException,就必须在迭代过程中持有容器的锁。

然而,有时候开发人员并不希望在迭代器间对容器加锁。例如,某些线程在可以访问容器之前,必须等待迭代过程结束,如果容器规模很大,或者在每个元素上执行操作的时间很长,那么这些线程就需要长时间等待。持有锁的时间越长,那么在锁上的竞争就越激烈,如果许多线程都在等待锁被释放,那么将极大地降低吞吐量和CPU的利用率。

另一种替代方法是“克隆”容器,并在副本上进行迭代。由于副本被封闭在线程内,因此其他线程不会在迭代期间对其进行修改,这样就避免了抛出ConcurrentModificationException,不过在克隆过程中仍然要加锁(以防在此期间被克隆容器被其他线程修改,那样克隆出来的容器就是失效的容器),所以也会增加性能开销。所以这种方法的好坏取决于多个因素:容器的大小,在每个元素上执行的操作,迭代操作相对于容器上其他操作被调用的频率,以及在响应时间和吞吐量等方面的需求。

隐藏迭代器

虽然加锁可以防止迭代器抛出ConcurrentModificationException,但必须记住在所有对共享容器进行迭代的地方都需要加锁。实际情况更复杂,因为在某些情况下,迭代器会隐藏起来。

如下例,标准容器的toString方法将迭代容器,并在每个元素上调用toString来生成容器内容的格式化表示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class HiddenIterator {
@GuardedBy("this") private final Set<Integer> set = new HashSet<Integer>();
public synchronized void add(Integer i) {
set.add(i);
}

public synchronized void remove(Integer i) {
set.remove(i);
}

public void addTenThing() {
Random r = new Random();
for (int i=0; i< 10; i++) {
set.add(r.nextInt());
}
System.out.println("DEBUG: added ten elements to " + set);
}
}

addTenThings方法可能会抛出ConcurrentModificationException,因为toString对set进行了迭代,而且没加锁。如果状态与保护它的同步代码之间相隔越远,那开发人员就越容易忘记在访问状态时使用正确的同步。如果HiddenIterator用
synchronizedSet来包装HashSet,并且对同步代码进行封装,那么就不会抛出异常了。容器的hashCode和equals等方法也会间接地执行迭代操作,同样,containsAll, removeAll和retainAll等方法,以及把容器作为参数的构造函数,都会对容器进行迭代,所有这些间接的迭代操作都可能抛出ConcurrentModificationException。

并发容器

同步容器将所有对容器状态的访问都串行化,以实现它们的线程安全性,这样的代价就是严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重降低。Java 5.0提供了多种并发容器类来改进同步容器的性能。通过并发容器来代替同步容器,可以极大地提高伸缩性并降低风险。

Java 5.0增加了ConcurrentHashMap,用来替代基于hash的同步map,增加了CopyOnWriteArrayList,用来替代以遍历操作为主要操作的同步List。在新的ConcurrentMap接口中增加了一些常用的复合操作,例如“putIfAbsent”,replace, 和 conditional remove。

Java 5.0还增加了两个新的集合类型,Queue和BlockingQueue。

Java 6.0增加了ConcurrentSkipListMap来替换同步的SortedMap,增加了ConcurrentSkipListSet替换SortedSet(例如TreeMap和TreeSet)

ConcurrentHashMap

与HashMap一样,ConcurrentHashMap也是一个基于HashCode的Map,但它使用了一种完全不同的加锁策略来提供更高的并发性和伸缩性。ConcurrentHashMap并不是将每个方法都在同一把锁上同步并使得每次只有一个线程访问容器,而是使用一个种粒度更细的加锁机制来时间共享,叫做分段锁。在这种机制下,任意数量的读取线程可以并发地访问这个map,执行读取操作的线程和执行写入操作的线程可以并发地访问map,并且一定数量的写入线程可以并发地修改Map。

而且ConcurrentHashMap提供的迭代器不会抛出ConcurrentModificationException,因此不需要再迭代过程中对容器加锁。它返回的迭代器具有弱一致性,而并非“及时失败”。弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以在迭代器被构造后将修改操作反映给容器。

与Hashtable和synchronized-Map相比,ConcurrentHashMap有着更多的优势以及更少的劣势。因此在大多数情况下,用ConcurrentHashMap来代替同步Map能进一步提高代码的可伸缩性,只有当应用程序需要给map加锁以进行独占访问时,才应该放弃使用ConcurrentHashMap。

额外的原子Map操作

由于ConcurrentHashMap不能被加锁来执行独占访问,因此也无法使用客户端加锁来创建新的原子操作。但是一些常见的复合操作,如“如没有则添加(put-if-absent)”,”若相等则移除(remove-if-equals)”,”若相等则替换(replace-if-equals)”等,都已经在ConcurrentMap接口中有声明,所以如果需要为现有的同步Map添加这样的功能,就应该考虑使用ConcurrentMap了。

CopyOnWriteArrayList

CopyOnWriteArrayList用于替代同步list,在某些情况下提供了更好的并发性能,并且在迭代器间不需要对容器进行加锁或复制。(类似地,CopyOnWriteArraySet的作用是替代同步set)

Copy-On-Write容器的线程安全性在于,只要正确地发布一个实际不可变的对象,那么在访问该对象时就不需要进一步的同步了。

Copy-On-Write从字面上看就是,Write的时候总是要Copy,所以在每次修改时,都会创建并重新发布一个新的容器副本。而CopyOnWriteArrayList容器的迭代器会保留一个指向原始数组的引用,遍历的也是原始数组,而其他线程修改的是这个原始数组的副本,所以也不会影响原始数组,原始数组不会改变,也就不会有ConcurrentModificationException了,并且返回的元素和迭代器创建时的元素完全一致。

显然,每当修改容器时都会复制原始数组,这需要一定开销,特别是当容器的规模较大时。仅当迭代器操作多于修改操作时,才应该使用“写入时复制”容器。

阻塞队列和生产者-消费者模式

基于阻塞队列构建的生产者-消费者设计中:当数据生成时,生产者把数据放入队列,而当消费者准备处理数据时,将从队列中获取数据。阻塞队列简化了生产者-消费者设计的实现过程,它支持任意数量的生产者和消费者。

在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:它们能抑制并防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。

BlockingQueue有多种实现:LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,与LinkedList和ArrayList相似,但比同步list有更好的并发性能。PriorityBlockingQueue是一个按优先级排序的队列,当你希望按照某种顺序而不是FIFO来处理元素时,这个队列非常有用,PriorityBlockingQueue既可以根据元素的自然顺序来比较元素,也可以使用Comparator来比较。最后一个BlockingQueue是SynchronousQueue,它并不是一个真正的队列,因为它不会为队列中元素维护存储空间。它维护的是一组线程,这些线程在等待着把元素加入或移出队列。以洗盘子为例,相当于没有盘架,直接将洗好的盘子放入下一个空闲的烘干机中,它可以直接交付工作,从而降低了将数据从生产者移动到消费者的延迟。因为SynchronousQueue没有存储功能,因此put和take会一直阻塞,直到有另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总有一个消费者准备好获取交付的工作时,才适合使用同步队列。

串行线程封闭

对于可变对象,生产者-消费者这种设计与阻塞队列组合在一起使得把对象从生产者转移给消费者变得容易。线程封闭对象只能由单个线程拥有,但可以通过安全地发布该对象来转移所有权。在所有权转移后,就只有新线程能获得这个对象的访问权限,并且发布对象的线程不会再访问它。这种安全的发布确保了对象状态对于新的所有者来说是可见的,并且由于最初的所有者不会再访问它,所以这个对象又被封闭在新的线程中,新线程可以对该对象做任意修改,因为它具有独占的访问权。

对象池利用了串行线程封闭,将对象借给一个请求线程。只要对象池包含足够的内部同步来安全地发布池中的对象,并且只要客户代码本身不会发布池中的对象,或者在将对象返回给对象池后就不再使用它,那么就可以安全地在线程之间传递所有权。

双端队列与工作密取

Java 6增加了两种容器类型,Deque&BlockingQueue。Deque是一个双端队列,实现了在队列头和队列尾的高效插入和移除。

双端队列适用于另一种相关模式,即工作密取(Work Stealing)。(不懂)

同步工具类

同步器可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流就可以叫同步器。阻塞队列可以作为同步器,其他类型的同步器还包括信号量(Semaphore)/栅栏(Barrier)以及闭锁(Latch)。

所有的同步器都包含一些特定的结构化属性:它们封装了一些状态,这些状态将决定使用同步器的线程是继续执行还是等待,此外还提供了一些方法对状态进行操作,以及另一些方法用于高效地等待同步器进入到预期状态。

闭锁

闭锁时一种同步器,可以延迟线程的进度直到线程到达终止状态。闭锁的作用相当于一扇门:在闭锁到达terminal状态前,这扇门一直是关闭的,没有任何线程通过,而当到达terminal状态时,这扇门就会打开允许所有线程通过。当闭锁达到terminal状态,它的状态就不会再改变,因此这扇门会永远打开。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行。例如:

  • 确保某个计算在其需要的所有资源都初始化之后才继续执行
  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动
  • 等待某个操作的所有参与者都就绪再继续执行

CountDownLatch是一种灵活的闭锁,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么await会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。例如下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);

for(int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) {}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end-start;
}

public static void main(String[] args) {
Runnable task = new Runnable() {

@Override
public void run() {
System.out.println("ing");

}
};

try {
long interval = new TestHarness().timeTasks(3, task);
System.out.println(interval);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

启动门将使得主线程能够同时释放所有工作线程,而结束门则使主线程能够等待最后一个线程执行完成。

FutureTask

FutureTask也可以用作闭锁,它可以处于下面3种状态:等待运行、正在运行和运行完成。Future.get的行为取决于任务的状态,如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进行完成状态。

信号量

计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。

Semaphore管理着一组虚拟的permits,许可的初始数量通过构造函数指定。在执行操作前先acquire permits(只要有剩余的许可就可以),在使用完后会release这个许可。如果没有获得permit,acquire方法将一直阻塞到有许可或指导被中断或超时。release方法将返回一个permit给信号量。

Semaphore也可以将任何一种容器变成有界阻塞容器。信号量的计数值会初始化为容器容量的最大值,add操作在向容器添加一个元素之前,首先获取一个permit,然后再添加,如果添加失败,那么会释放许可,如果成功就不释放了。同样,remove操作会释放一个许可,来使更多的元素能够添加到容器中。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

public class BoundedHashSet<T> {

private final Set<T> set;
private final Semaphore sem;

public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}

public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
} finally {
if (!wasAdded) {
sem.release();
}
}
return wasAdded;
}

public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved) {
sem.release();
}
return wasRemoved;
}
}

栅栏

闭锁可以启动一组相关的操作,或者等待一组相关的操作结束。闭锁时一次性对象,一旦进入终止状态,就不能被重置。

栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件发生,而栅栏用于等待其他线程

构建高效且可伸缩的结果缓存

下面的几个代码段将逐步构架一个高效且可伸缩的缓存:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Memoizer1存在一个可伸缩的问题,每次只有一个线程能够执行compute。

public interface Computable<A,V> {
V compute(A arg) throws InterruptedException;
}

public class ExpensiveFunction implements Computable<String, BigInteger>{
@Override
public BigInteger compute(String arg) throws InterruptedException {
//在经过长时间的计算后
return new BigInteger(arg);
}
}

public class Memoizer1<A, V> implements Computable<A, V> {
private final Map<A, V> cache = new HashMap<A, V>();
private final Computable<A, V> c;


public Memoizer1(Computable<A, V> c) {
this.c = c;
}

@Override
public synchronized V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 会存在重复计算的问题
public class Memoizer2<A, V> implements Computable<A, V> {
private final Map<A, V> cache = new ConcurrentHashMap<A, V>();
private final Computable<A, V> c;


public Memoizer2(Computable<A, V> c) {
this.c = c;
}

@Override
public V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/*利用FutureTask来减少重复计算的问题,但是由于if判断中依然存在非原子的“先检查再执行”的操作,
所以还是会存在重复计算的问题*/
public class Memoizer3<A, V> implements Computable<A, V> {
private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;


public Memoizer3(Computable<A, V> c) {
this.c = c;
}

@Override
public V compute(final A arg) throws InterruptedException {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = ft;
cache.put(arg,ft);
ft.run();
}
try {
return f.get();
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}

/**
* 强制将未检查的Throwable转化为RuntimeException
* @param t
* @return
*/
public static RuntimeException launderThrowable(Throwable t) {
if (t instanceof RuntimeException) {
return (RuntimeException) t;
} else if (t instanceof Error) {
throw (Error) t;
} else {
throw new IllegalStateException("Not unchecked",t);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// **最终版**
// 利用复合操作“若没有则添加”可以解决Memoizer3的问题

public class Memoizer4<A, V> implements Computable<A, V> {
private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;


public Memoizer4(Computable<A, V> c) {
this.c = c;
}

@Override
public V compute(final A arg) throws InterruptedException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = ft;
cache.putIfAbsent(arg, ft);
ft.run();
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(arg, f); //为了解决缓存污染问题,当计算被取消或者失败时,就从缓存中remove
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}
}

总结

构建一个高效且具有伸缩性的基础模块还是有点难度的,我们要考虑的东西比较多,我们要利用已有的基础模块合理构建。

参考

  1. 《Java并发编程实战》
  2. 《Java并发编程实践》(四)—- 构建阻塞
  3. 图解集合3:CopyOnWriteArrayList