JDK源码分析——ThreadPoolExecutor

引:相信大家已经收到了Executor框架以及线程池带来的好处,它有着神奇的能力,那我们来看看它是怎么实现这种神奇的能力的!

详细注释:源码分析地址

概述

用过的人应该都很熟悉了,不过还想回顾一下的还是可以看看自己之前看《Java并发实战编程》总结的几章笔记:

  1. Java并发5任务执行
  2. Java并发7线程池的使用

例子

JDK自带的哪几种线程池类型,就不多展示了,这里我们来看看自定义线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ExecutorTest {
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(5,20, 0, TimeUnit.MILLISECONDS
,new LinkedBlockingDeque<>(1024), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

for (int i = 0; i < 10; i++) {
executorService.execute(new MyTask());
}
}

}
class MyTask implements Runnable {

@Override
public void run() {
System.out.println("Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

输出:

1
2
3
4
5
6
7
8
9
10
Thread ID:10
Thread ID:12
Thread ID:11
Thread ID:14
Thread ID:13
Thread ID:10
Thread ID:11
Thread ID:12
Thread ID:14
Thread ID:13

结合设置的参数,大家是不是隐隐发现了什么?模糊的请往下看。

构造

我们主要还是记住这7个参数的含义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 保障线程池运行的最少的线程数
private volatile int corePoolSize;
// 最大线程数,它收到CAPACITY((2^29)-1)的限制
private volatile int maximumPoolSize;
// 当线程总数超多corePoolSize时,如果线程空闲超过这个时间(结合unit时间单位),将被回收
private volatile long keepAliveTime;
// 当线程池来不及执行任务时,会将任务暂时放在队列中,等待后续处理
private final BlockingQueue<Runnable> workQueue;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 当线程池饱和或者优雅关闭的时候调用
private volatile RejectedExecutionHandler handler;

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

execute

最为一个Executor,execute是他最核心的方法,我们看看它是如何实现的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/**
* ctl(线程池控制状态)包含了两个概念
* 1. workerCount:代表有效的线程数
* 2. runState:代表了运行状态
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 保存工作线程
private final HashSet<Worker> workers = new HashSet<Worker>();

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 主要有下面三个步骤:
*
* 1. 判断当前的线程数是否小于corePoolSize
* 如果是,使用传进来的任务通过addWord方法创建一个新线程,如果能完成新线程创建exexute方法结束,成功提交任务
*
* 2. 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check
* 如果状态在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;
*
* 3. 如果不能加入任务到工作队列,将尝试使用任务新增一个线程
* 如果失败,则是线程池已经shutdown或者线程池已经达到饱和状态,所以reject这个他任务
*/
int c = ctl.get();
// 实际线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 增加一个线程
if (addWorker(command, true))
return;
// 更新线程池状态
c = ctl.get();
}
// 当线程池处于运行状态 且 添加进队列成功
if (isRunning(c) && workQueue.offer(command)) {
// 再次对线程池状态检查
int recheck = ctl.get();
// 线程池状态不是运行状态,且从队列删除该任务成功
if (! isRunning(recheck) && remove(command))
// 拒绝任务
reject(command);
// 如果当前工作线程数量为0(线程池已关闭)
else if (workerCountOf(recheck) == 0)
// 添加一个 null 到队列中
addWorker(null, false);
}
// 如果添加队列失败,则创建一个任务线程
else if (!addWorker(command, false))
// 如果失败(饱和),则拒绝
reject(command);
}

// 新增线程
private boolean addWorker(Runnable firstTask, boolean core) {
// Java标签
retry:
// 死循环
for (;;) {
int c = ctl.get();
// 获取当前线程池状态
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 下面的逻辑可以改为这样
// rs >= shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty())
// 表示下面这几种情况均不接受新任务
// 1. rs > shutdown
// 2. rs >= shutdown && firstTask != null
// 3. rs >= shutdown && workQueue.isEmppty
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
// 获取线程池中线程数量
int wc = workerCountOf(c);
// 如果超出容量或者超出核心线程数或最大线程数(由core决定)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 线程安全增加工作线程数
if (compareAndIncrementWorkerCount(c))
// / 跳出retry
break retry;
c = ctl.get(); // Re-read ctl
// 如果线程池状态发生变化,重新循环
if (runStateOf(c) != rs)
continue retry;
}
}
// 线程添加成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// Worker代理了任务对象,可以看其构造方法
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 拿到线程池状态
int rs = runStateOf(ctl.get());
// RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将新创建的线程放进线程Set里
workers.add(w);
// 更新线程池线程数且不超过最大值
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动新添加的线程,这个线程首先执行firstTask,然后不停的从队列中取任务执行
if (workerAdded) {
// 在下一节worker中分析
t.start();
workerStarted = true;
}
}
} finally {
// 线程启动失败,则从wokers中移除w并递减wokerCount
if (! workerStarted)
// 递减wokerCount会触发tryTerminate方法
addWorkerFailed(w);
}
return workerStarted;
}

worker

worker对象代理了任务,我们看看它的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 代理任务执行的线程
final Thread thread;
// 第一个任务
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 这里就用到我们在例子中的传入的工厂(注意是这个this,将worker自己作为一个Runnabel)
this.thread = getThreadFactory().newThread(this);
}

// DefaultThreadFactory
public Thread newThread(Runnable r) {
// 创建一个线程,后面就是线程名了,在构造工厂的时候就确定了
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}

// work执行任务
public void run() {
// 这个方法太核心,单独拿出来
runWorker(this);
}

runWorker

主要的工作就是第一次启动会执行初始化传进来的任务firstTask;然后会循环从workQueue中取任务执行,如果队列为空则等待keepAliveTime这么长时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 拿到firstTask
Runnable task = w.firstTask;
// 等待gc
w.firstTask = null;
// 允许中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// task不为空 或 队列不为空 关注getTask方法解析
while (task != null || (task = getTask方法解析()) != null) {
w.lock();
// 中断处理
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();

try {
// 执行前钩子
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 调用任务的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行后钩子
afterExecute(task, thrown);
}
} finally {
// 等待gc
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// Worker的善后,从线程池中移除超时或者出现异常的线程
processWorkerExit(w, completedAbruptly);
}
}

// 取任务,这里关注超时问题以及keepAliveTime起作用的代码段
private Runnable getTask() {
// 超时标志位
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
// 获取线程池状态
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 有下面两种情况成立
// 1. rs > SHUTDOWN 所以rs至少等于STOP,这时不再处理队列中的任务
// 2. rs = SHUTDOWN 所以rs>=STOP肯定不成立,这时还需要处理队列中的任务除非队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 介绍之前,递减workerCount值
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// 标记从队列中取任务时是否设置超时时间
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 1. 如果运行线程数超过了最大线程数,但是缓存队列已经空了,这时递减worker数量
// 2. 如果设置有核心线程有超时时间要求或者线程数远大于核心线程数 且 缓存队列已经空了这时递减worker数量
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 没有设置核心线程有超时时间要求或者线程数远小于核心线程数就take,否则就带keepAliveTime得poll
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 超时
timedOut = true;
} catch (InterruptedException retry) {
// 线程被中断重试
timedOut = false;
}
}
}

submit

当我们对于线程执行不需要返回结果时,直接调用线程的execute方法来提交任务就好了。然而很多时候我们需要线程执行的返回结果,这个时候就需要调用submit方法来提交任务。虽然它最后也会调用到execute方法,但是他们具体不同在哪里,我们来看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// submit方法来自于AbstractExecutorService
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 将task进行了封装
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 剩下的就一样了
execute(ftask);
return ftask;
}

// AbstractExecutorService
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
// 将Runnable封装成FutureTask
return new FutureTask<T>(runnable, value);
}

// FutureTask
public FutureTask(Runnable runnable, V result) {
// 将Runnable封装成callable
this.callable = Executors.callable(runnable, result);
// 确保callable的可见性
this.state = NEW;
}

// 根据前面的execute方法的分析,我们知道最后的执行会调用到FutureTask的run方法
public void run() {
// 状态不为NEW或者UNSAFE不成功,则运行失败
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
// 最终又调用到了FutureTask保证的Callable对象
Callable<V> c = callable;
// callable不为空且状态为NEW
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 得到返回值
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 如果有异常,设置异常
setException(ex);
}
if (ran)
// 设置返回值,
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

protected void set(V v) {
// 设置返回值,也将状态NEW变为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
// 完成之后将状态设置为NORMAl
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 进行变量清理工作
finishCompletion();
}
}

在我们submit之后会得到一个Future,我们要想到返回值,我们只要调用get方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public V get() throws InterruptedException, ExecutionException {
// 拿到状态
int s = state;
// 未完成,则阻塞等待完成,这里可以设置超时时间
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 还需要根据状态判断是否返回值
return report(s);
}

private V report(int s) throws ExecutionException {
Object x = outcome;
// 如果状态为NORMAL,则返回值
if (s == NORMAL)
return (V)x;
// 如果状态为CACELLED,则代表任务呗取消,抛出异常
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

这里基本就覆盖了运行时的过程,我们就可以很好得理解我们,当任务未完成的时候,会发生阻塞等待的情况。

总结

通过上面我们基本知道了线程池的实现原理。然后就是运用这些原理对线程池进行调优了。这里关键还是调整哪些构造时的参数。

参考

  1. Java并发5任务执行
  2. Java并发7线程池的使用