Netty源码之NioEventLoop

引:谈到Netty就避免不了去谈Reactor模式,不懂的同学同自行面向搜索引擎。在Netty中的Reactor线程的具体实现就是NioEventLoop。

NioEventLoop创建

我们先总结这个过程的流程:

  1. 设置EventLoop数量
  2. 创建线程创建器
  3. 创建NioEventLoop(配置selector和taskQueue等参数)
  4. 创建线程选择器

我们回到上一个例子的:

1
2
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 用于处理serverChannel
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于处理channel

我们进入到NioEventLoopGroup的构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// step 1
public NioEventLoopGroup(int nThreads) {
// 没有传递线程数,默认为0
// step 2
this(nThreads, (Executor) null);
}
// step 2
public NioEventLoopGroup(int nThreads, Executor executor) {
// 绑定默认的SelectorProvider
// step 3
this(nThreads, executor, SelectorProvider.provider());
}
// step 4
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
// 绑定默认的Selector策略工厂,可能select()阻塞或者重试
// step 5
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
// step 5
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
// 调用父类MultithreadEventLoopGroup构造方法,绑定RejectedExecutionHandler(??)
// step 6
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
// step 6 MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object...args) {
// step 7 查看默认线程个数
// step 8
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS: nThreads, executor, args);
}
// step 7
static {
// 我们发现是两倍的CPU核数(当然这里是workGroup)
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}
// step 8
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object...args) {
// 绑定了DefaultEventExecutorChooser工厂(该工厂默认轮询策略)
// step 9
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
// step 9
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object...args) {
// ** 创建线程器生成器
// step 10
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
// 事件执行器数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i++) {
// ** 创建 NioEventLoop
// step 11
children[i] = newChild(executor, args);
}
// ** 创建线程选择器
// step 15
chooser = chooserFactory.newChooser(children);
Set < EventExecutor > childrenSet = new LinkedHashSet < EventExecutor > (children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
// step 10
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
// 绑定线程工厂
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
// step 11
protected EventLoop newChild(Executor executor, Object...args) throws Exception {
// 绑定SelectStrategy和RejectedExecutionHandler
// step 12
return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
// step 12 NioEventLoop
private volatile int ioRatio = 50;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
// 保存selectorProvider
provider = selectorProvider;
// 这里创建selector,里面创建了SelectedSelectionKeySet
final SelectorTuple selectorTuple = openSelector();
// 保存selector 一个selector会与一个EventLoop做唯一的绑定
selector = selectorTuple.selector;
// 保存unwrappedSelector(这个在前一篇博文的时候用到了)
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
// step 13 SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
// step 14
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
// 保存了一个任务队列
tailTasks = newTaskQueue(maxPendingTasks);
}
// step 14 SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 保存 线程器生成器
this.executor = ObjectUtil.checkNotNull(executor, "executor");
// 任务队列
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
// step 15
public EventExecutorChooser newChooser(EventExecutor[] executors) {
// 这里根据eventLoop的个数进行了优化(自行了解)
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}

创建完成!!!

NioEventLoop启动

它的第一次启动还是要回到前一篇博文的这一个步骤中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// step 1 AbstractBootstrap
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 拿到之前绑定的eventLoop
AbstractChannel.this.eventLoop = eventLoop;
// step 2 进入inEventLoop方法中
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
// 所以进入这里了
// step 4
eventLoop.execute(new Runnable() {@Override public void run() {
register0(promise);
}
});
}
}
// step 2 AbstractEventExecutor
public boolean inEventLoop() {
// 这个时候当前线程为主线程
// step 3
return inEventLoop(Thread.currentThread());
}
// step 3 SingleThreadEventExecutor
public boolean inEventLoop(Thread thread) {
// 该类中的thread为null,所有返回false
return thread == this.thread;
}
// step 4
public void execute(Runnable task) {
// 还是主线程,还是false
boolean inEventLoop = inEventLoop();
// 将任务添加到队列
addTask(task);
if (!inEventLoop) {
// 开启线程
// step 4
startThread();
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
// step 4
private void startThread() {
// state 默认为ST_NOT_STARTED没有启动
if (state == ST_NOT_STARTED) {
// 原子更新为开始
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
// step 5
doStartThread();
}
}
}
// step 5
private void doStartThread() {
// 还记得那个executor吗,就是那个线程生成器
// step 6
executor.execute(new Runnable() {@Override public void run() {
thread = Thread.currentThread();
// 执行自己的run方法
// step 8
SingleThreadEventExecutor.this.run();
});
}
// step 6 ThreadPerTaskExecutor
public void execute(Runnable command) {
// 创建线程并开启线程
// step 7 newThread()
// start() 返回到run() step 6
threadFactory.newThread(command).start();
}
// step 7
public Thread newThread(Runnable r) {
// 这里可以看出两点,1,返回了一个优化过的FastThreadLocalRunnable;2,看到了线程名
// 线程名的由来可以百度
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
return t;
}

启动完毕了!!!

NioEventLoop执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
// step 1 这里就是真的reactor线程真正开始干活的地方
protected void run() {
// 死循环,干三件事
// 1. select
// 2. process selected keys
// 3. run tasks
for (;;) {
try {
// 还有任务在执行等着,没有了,就开始select
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
// 1. 轮询注册到EventLoop的Selector上的所有channelIO事件
// wakenUp 表示是否应该唤醒正在阻塞的select操作,每次开始都置为false
// step 2
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
// 2. 处理产生IO事件的channel
// step 3
processSelectedKeys();
// 3. 处理任务队列,但是不能超过一个时间
// step 8
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
}

// step 2 轮询注册到EventLoop的Selector上的所有channelIO事件
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 设置截止时间(当前时间 + 定时任务的将要截止的时间)
// netty里面定时任务队列是按照延迟时间从小到大进行排序
// delayNanos(currentTimeNanos)方法即取出第一个定时任务的延迟时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {

long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 如果超出截止时间0.5ms
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
// 如果在跳出之前发现还没有进行过select操作就执行一次selectNow()(不会阻塞)
selector.selectNow();
selectCnt = 1;
}
break;
}
// 轮询过程中发现有任务加入,中断本次轮询
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 阻塞时轮询
int selectedKeys = selector.select(timeoutMillis);
selectCnt++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// 这里还提供了4种方式来中断轮询
// - 轮询到io事件,
// - 用户主动唤醒
// - 任务队列有任务
// - 有定时任务需要被处理
break;
}
// 这里就很重要了,看看Netty是怎么解决JDK NIO的空轮训bug的
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// 有效轮询,重置标志位
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 当空轮训的次数超过SELECTOR_AUTO_REBUILD_THRESHOLD(默认512)时,就开始重建Selector
// step 3
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}

currentTimeNanos = time;
}
}
}

// step 3
public void rebuildSelector() {
// step 4
rebuildSelector0();
}
// step 4
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
// 创建一个新的selector
newSelectorTuple = openSelector();
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
int interestOps = key.interestOps();
// 取消key在旧的selector上的事件注册
key.cancel();
// 将所有channel重新注册到新的selector上
SelectionKeynewKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
nChannels++;
}
// eventLoop绑定新的seletor
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
}
// step 5
private void processSelectedKeys() {
// 可定不为空,因为在NioEventLoop 的 openSelector方法中创建了
if (selectedKeys != null) {
// 为什么说SelectedKeys是优化过的,是因为原生的SelectedKeys是Set,添加一个事件为O(logn)
// 优化过后使用数组来操作,添加一个事件为O(1)
// 具体的可以参考闪电侠的博客,我这里就不多说了
// step 6
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
// step 6
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// 让对象可以被GC
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 拿到携带的channel
// step 7
processSelectedKey(k, (AbstractNioChannel) a);
} else {@SuppressWarnings("unchecked") NioTask < SelectableChannel > task = (NioTask < SelectableChannel > ) a;
processSelectedKey(k, task);
}

if (needsToSelectAgain) {
selectedKeys.reset(i + 1);

selectAgain();
i = -1;
}
}
}
// step 7
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 拿到NioEventLoop的Unsafe对象
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 拿到NioEventLoop的eventLoop对象
eventLoop = ch.eventLoop();
// 拿到事件
int readyOps = k.readyOps();
// 连接事件处理
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
// 获得感兴趣的事件
ops &= ~SelectionKey.OP_CONNECT;
// 重新注册
k.interestOps(ops);
// 完成链接
unsafe.finishConnect();
}
// 写事件处理
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 读事件(work)或者建立连接事件(boss)处理
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 都是read方法,只是实现不同
unsafe.read();
}
}

// step 8 处理任务队列
protected boolean runAllTasks(long timeoutNanos) {
// 将定时任务放入普通任务队列(它是一个多生产者单消费者队列)
// step 9
fetchFromScheduledTaskQueue();
// 拿出一个普通任务
Runnable task = pollTask();
if (task == null) {
// 运行收尾任务队列tailTasks
afterRunningAllTasks();
return false;
}
// 计算出本次循环的能够运行的最多时间
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// 执行任务(这里一定要多调试几次才会有感觉,任务队列的感觉一下子就出来了)
// step 10
safeExecute(task);
runTasks++;
// 由于ScheduledFutureTask.nanoTime()比较耗时,所以没64次比较一次是否超时
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}

task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
// step 9
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
// 定时任务队列是一个优先级队列,并且任务实现了compareTo方法,可以返回快需要被执行的定时任务
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// 如果普通队列满了,就又放回去
scheduledTaskQueue().add((ScheduledFutureTask < ?>) scheduledTask);
return false;
}
// 拿到定时任务
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
// step 10
protected static void safeExecute(Runnable task) {
// 就只是运行这个任务,但是一个出错了,也会继续进行下去
task.run();
}

执行完毕!!!

总结

虽然还是有很多细节没有调试到,但是大概的流程应该都走了一遍,希望多调试,然后才会深感EventLoop的吊!!

参考

  1. netty源码分析之揭开reactor线程的面纱(一)
  2. netty源码分析之揭开reactor线程的面纱(二)
  3. netty源码分析之揭开reactor线程的面纱(三)