Java并发_8_避免活跃性危险

引:在安全性和活跃性之间通常存在着某些制衡。我们使用加锁机制来确保线程安全,但如果过度地使用加锁,则可能导致顺序死锁。同样,我们使用线程池和信号量来限制对资源的使用,但这些被限制的行为可能会导致资源死锁。Java应用程序无法从死锁中恢复过来,因此在设计时一定要排除那些可能导致死锁出现的条件。

死锁

最简单的死锁:当线程A持有锁L并想获得锁M的同时,线程B持有锁M并尝试获得锁L,那么这两个线程将永远等待下去。

数据库解决死锁问题: 当它检测到一组事务发生了死锁时(通过在表示等待关系的有向图中搜索循环),将选择一个牺牲者并放弃这个事务。作为牺牲者的事务会释放它所持有的资源,从而使其他事务继续进行。应用程序可以重新执行被强制终止的事务,而这个事务现在可以成功完成,因为所有跟它竞争资源的事务都已经完成了。

JVM解决死锁问题:
当一组Java线程发生死锁时,“游戏”将到此结束——这些线程永远不能使用了。

锁顺序死锁

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 注意:容易发生死锁!

public class LeftRightDeadLock {
private final Object left = new Object();
private final Object right = new Object();

public void leftRight() {
synchronized (left) {
synchronized (right) {
doSomething();
}
}
}

public void rightLeft() {
synchronized (right) {
synchronized (left) {
doSomething();
}
}
}
}

解决方法: 如果所有线程以固定的顺序来获得锁,那么在程序中就不会出现锁顺序死锁问题。

动态死锁问题

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 注意:容易发生死锁!

public void transferMoney(Account fromAccount, Account toAccount, DollarAmount amount) throws InsufficientFundsException{
synchronized (fromAccount) {
synchronized (fromAccount) {
if (fromAccount.getBalance().compareTo(amount)) {
throw new InsufficientFundsException();
} else {
fromAccount.debit(amount);
toAccount.credit(amount);
}
}
}

}

解决方法: 通过一致哈希算法或者其它方式来统一锁顺序,使未知顺序变为已知顺序。对于极少数的哈希冲突,可以使用“加时赛”锁来解决。解决代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private static final Object tieLock = new Object();

public void transferMoney(final Account fromAcct,
final Account toAcct,
final DollarAmount amount)
throws InsufficientFundsException{
class Helper{
public void transfer throws InsufficientFundsException{
if(fromAcct.getBalance().compareTo(amount) < 0){
throw new InsufficientFundsException();
}else{
fromAcct.debit(amount);
toAcct.credit(amount);
}
}
}

int fromHash = System.identifyHashCode(fromAcct);
int toHash = System.identityHashCode(toAcct);

if(fromHash < toHash){
synchronized(fromAcct){
synchronized(toAcct){
new Helper.transfer();
}
}
}else if (fromHash > toHash) {
synchronized(toAcct){
synchronized(fromAcct){
new Helper().transfer();
}
}
} else {
synchronized(tieLock){//加时赛锁来解决问题
synchronized(fromAcct){
synchronized(toAcct){
new Helper().transfer();
}
}
}
}
}

在协作对象之间发生的死锁

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 注意:同步方法获得的是对象锁
class Taxi {
@GuardedBy("this") private Point location, destination;
private final Dispatcher dispatcher;

public Taxi(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
}

public synchronized Point getLocation() {
return location;
}

public synchronized void setLocation(Point location) {
this.location = location;
if (location.equals(destination))
dispatcher.notifyAvailable(this);
}

public synchronized Point getDestination() {
return destination;
}

public synchronized void setDestination(Point destination) {
this.destination = destination;
}
}

class Dispatcher {
@GuardedBy("this") private final Set<Taxi> taxis;
@GuardedBy("this") private final Set<Taxi> availableTaxis;

public Dispatcher() {
taxis = new HashSet<Taxi>();
availableTaxis = new HashSet<Taxi>();
}

public synchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}

public synchronized Image getImage() {
Image image = new Image();
for (Taxi t : taxis)
image.drawMarker(t.getLocation());
return image;
}
}

尽管没有任何方法会显式地获得两个锁,但是setLocation和getImage等方法的调用者都会获得两个锁,所以有可能造成死锁。

注意: 如果在持有锁的情况下调用某个外部方法时,那么就需要警惕死锁。

开放调用

开放调用: 如果在调用某个方法时不需要持有锁,那么这种调用被称为开放调用。

通过开放调用解决在协作对象之间发生的死锁,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Taxi {
@GuardedBy("this") private Point location, destination;
private final Dispatcher dispatcher;
...

public synchronized Point getLocation() {
return location;
}

public void setLocation(Point location) {
boolean reachedDestination;
synchronized (this) {
this.location = location;
reachedDestination = location.equals(destination);
}
if (reachedDestination)
dispatcher.notifyAvailable(this);
}
}

class Dispatcher {
@GuardedBy("this") private final Set<Taxi> taxis;
@GuardedBy("this") private final Set<Taxi> availableTaxis;
...

public synchronized void notifyAvailable(Taxi taxi) {
availableTaxis.add(taxi);
}

public synchronized Image getImage() {
Set<Taxi> copy;
sychronized (this) {
copy = new HashSet<Taxi>(taxis);
}
Image image = new Image();
for (Taxi t : copy)
image.drawMarker(t.getLocation());
return image;
}
}

解决的方法思路就是:缩小锁的粒度。

缺点:可能丢失操作原子性,此时需要通过协议来实现原子性,而不是通过加锁。

资源死锁

独占类型的访问都可以和加锁操作类比,看起来就像需要获得锁才能访问。

  • 如果一个任务需要连接两个数据库,并且在请求这两个资源时不会始终遵循相同的顺序,那么线程A可能持有与数据库D1的连接,并等待与数据库D2的连接,而线程B持有D2的连接并等待与D1的连接。资源池越大,就越不容易出现这种类型的死锁。
  • 线程饥饿死锁。如果某些任务需要等待其它任务的结果,那么这些任务往往是产生线程饥饿死锁的主要来源,有界线程池/资源池与相互依赖的任务不能一起使用。

死锁的避免与诊断

如果必须获取多个锁,那么在设计时必须考虑锁的顺序:尽量减少潜在的加锁交互数量,将获取锁时需要遵循的协议写入正式文档并始终遵循这些协议。

在细粒度锁的程序中,可以通过一种两阶段策略来检查代码中的死锁:首先,找出在什么地方将获取多个锁(使这个集合尽量小),然后对所有这些实例进行全局分析,从而确保他们在整个程序中获取锁的顺序保持一致。尽可能使用开发调用。

支持定时的锁

当定时锁失败时,你并不需要知道失败的原因。至少你能记录所发生的失败,以及关于这次操作的其它有用信息,并通过一种更平缓的方式来重新启动计算,而不是关闭整个进程。

如果在获取锁时超时,那么可以释放这个锁,然后后退并在一段时间后并再次尝试,从而消除了死锁发生的条件,使程序恢复过来。(这项技术只有在同时获取两个锁时才有效,如果在嵌套的方法调用中请求多个锁,那么即使你知道已经持有了外层的锁,也无法释放它。)

通过线程转储信息来分析死锁

JVM会通过线程转储来帮助是被死锁的发生。在生成线程转储信息之前,JVM将在等待关系图中通过搜索循环来找出死锁。如果发现了一个死锁,则获取相应的死锁信息,理由在死锁中涉及哪些锁和线程,以及这个锁的获取操作位于程序的哪些位置。

其他活跃危险

饥饿

当线程由于无法访问它所需要的资源而不能继续执行时,就发生了“饥饿”,引发饥饿的最常见资源就是CPU时钟周期。

要避免使用线程优先级,因为这会增加平台依赖性,并可能导致活跃性问题,在大多数并发应用程序中,都可以使用默认的线程优先级。

活锁

活锁是另一种形式的活跃性问题,该问题尽管不会阻塞线程,但也不能继续执行,因为线程将不断重复执行相同的操作,而且总会失败。

活锁通常发生在处理事务消息的应用程序中:如果不能成功地处理某个消息,那么消息处理机制将会回滚整个事务,并将它重新放到队列的开头。如果消息处理器在处理某种特定类型的消息时存在错误并导致它失败,那么会出一直存在“处理-出错-回滚-处理”的循环中。

解决方法:在重试机制中引用随机性。

总结

活跃性故障是一个非常严重的问题,因为当出现活跃性故障时,除了中止应用程序之外没有其他任何机制可以帮助从这种故障恢复过来。最常见的活跃性故障是锁顺序死锁。在设计时应该避免锁顺序死锁:确保线程在获取多个锁采用一致的顺序。最好的解决方法是在程序中始终使用开放调用。这将大大减少需要同时持有多个锁的地方,也更容易发现这些地方。

参考

  1. 《Java并发编程实战》