Netty源码之ChannelPipeline

引:其实前面就多次提到了ChannelPipeline,但是都没有详细说明ChannelPipeline是如何工作的,这里我们就具体看看这个管理处理逻辑的抽象以及处理逻辑的抽象ChannelHandler。

ChannelPipeline的创建

通过前面的讲解,我们应该还记得一个Channel对应了一个ChannelPipeline,而ChannelPipeline也就是在创建Channel的时候随之创建的。我们就从那里开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// step 1 AbstractChannel的创建
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
// step 2 ChannelPipeline的创建
pipeline = newChannelPipeline();
}
// step 2
protected DefaultChannelPipeline newChannelPipeline() {
// step 3
return new DefaultChannelPipeline(this);
}
// step 3
protected DefaultChannelPipeline(Channel channel) {
// 绑定channel
this.channel = ObjectUtil.checkNotNull(channel, "channel");
// 默认创建一个由头尾节点的双向链表,我们可以看一下节点的类型
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}

ChannelPipeline创建完毕!!

节点的添加

节点的添加我们必然要从我们见过的ChannelPipeline的addLast(handler)方法开始分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// step 1
public final ChannelPipeline addLast(ChannelHandler handler) {
// step 2
return addLast(null, handler);
}
// step 2
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
// step 3
return addLast(null, name, handler);
}
// step 3
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
// 同步
synchronized(this) {
// 检查是否有重复节点
checkMultiplicity(handler);
// 将handler包装成节点上下文 DefaultChannelHandlerContext
// step 4
newCtx = newContext(group, filterName(name, handler), handler);
// 添加节点 step 7
addLast0(newCtx);
}
// 回调处理器被添加的方法
callHandlerAdded0(newCtx);
return this;
}
// step 4
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
// step 5
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
// step 5
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
// step 6 isInBound,isOutBount已经确定了
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
// 绑定handler
this.handler = handler;
}
// step 6
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) {
// 设定名字
this.name = ObjectUtil.checkNotNull(name, "name");
// 绑定pipeline
this.pipeline = pipeline;
// 绑定NioEventLoop
this.executor = executor;
// 表示自己是inbound处理器还是outbound处理器,后面根据这个值来判断
this.inbound = inbound;
this.outbound = outbound;
}
// step 7
private void addLast0(AbstractChannelHandlerContext newCtx) {
// 添加到Pipeline的那个双向链表中
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}

节点的删除

和ChannelPipeline的addLast(handler)一样,我们可以找到ChannelPipeline的remove(handler)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// step 1
public final ChannelPipeline remove(ChannelHandler handler) {
// 我们知道节点是保证了handler的AbstractChannelHandlerContext
// step 2
remove(getContextOrDie(handler));
return this;
}
// step 2
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
// 同步
synchronized(this) {
// step 3
remove0(ctx);
}
// 回调节点的删除方法
callHandlerRemoved0(ctx);
return ctx;
}
// step 3
private static void remove0(AbstractChannelHandlerContext ctx) {
// 就是双向链表的删除
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}

入站事件的传播

其实我们应该已经经历过了,源头都是在从Unsafe对象(它负责最底层的IO操作)出发的,所以我们从NioEventLoop的这一段代码出发:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// step 1
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// ...
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 接受IO事件,这里需要注意的是在SeverSocketChannel和SocketChannel处理的事件是不一样的
// 我们主要分析SocketChannel的读事件,对应的Unsafe对象是NioByteUnSafe
// step 2
unsafe.read();
}
}
// step 2 AbstractNioByteChannel的NioByteUnSafe
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// 创建ByteBuf分配器
final ByteBufAllocator allocator = config.getAllocator();
// 控制连接数
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);

ByteBuf byteBuf = null;
do {
// 分配一个byteBuf
byteBuf = allocHandle.allocate(allocator);
// 读数据
allocHandle.lastBytesRead(doReadBytes(byteBuf));
allocHandle.incMessagesRead(1);
readPending = false;
// 触发读事件,这是我们要关注的重点
// step 3
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while ( allocHandle . continueReading ());

allocHandle.readComplete();
// 触发读完成事件
pipeline.fireChannelReadComplete();
}
// step 3
public final ChannelPipeline fireChannelRead(Object msg) {
// 出发头节点的读事件
// step 4
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
// step 4
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 调用头节点的channelRead方法
// step 5
next.invokeChannelRead(m);
}
// step 5
private void invokeChannelRead(Object msg) {
// 调用头节点的channelRead方法
// step 6
((ChannelInboundHandler) handler()).channelRead(this, msg);
}
// step 6
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 继续传递读事件
// step 7
ctx.fireChannelRead(msg);
}
// step 7
public ChannelHandlerContext fireChannelRead(final Object msg) {
// 找到下一个入站节点
// step 8 findContextInbound
// step 9 会重复这个过程,如果消息没有被释放,那么将一直传递到tail节点(最后一个inbound节点public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
})
invokeChannelRead(findContextInbound(), msg);
return this;
}
// step 8
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
// 可以看到是根据标志位来判定的
} while (! ctx . inbound );
return ctx;
}
// step 9 DefaultChannelPipeline的TailContext
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// step 10
onUnhandledInboundMessage(msg);
}
protected void onUnhandledInboundMessage(Object msg) {
// 如果消息一直没有被释放,将在tail节点被释放
ReferenceCountUtil.release(msg);
}

出站事件的传播

出站事件一般是写事件,有两种写需要区别一下:(来自netty源码中telnet例子)

1
2
ChannelFuture future = ctx.channel().write(response);   // 1
ChannelFuture future = ctx.write(response); // 2

我们先看第一种:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// step 1
public void channelRead0(ChannelHandlerContext ctx, String request) throws Exception {
// step 2
ChannelFuture future = ctx.channel().write(response);
}
// step 2 AbstractChannel
public ChannelFuture write(Object msg) {
// step 3
return pipeline.write(msg);
}
// step 3 DefaultChannelPipeline
public final ChannelFuture write(Object msg) {
// 直接调用tail的写方法
// step 4
return tail.write(msg);
}
// step 5 AbstractChannelHandlerContext
public ChannelFuture write(Object msg) {
// step 6
return write(msg, newPromise());
}
// step 6 AbstractChannelHandlerContext
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
// 加上是否flush的信息
// step 7
write(msg, false, promise);
}
// step 7
private void write(Object msg, boolean flush, ChannelPromise promise) {
// 和进站相识,这里找的出站节点
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
// 调用下一个出站节点的写方法
next.invokeWrite(m, promise);
}
}
// step 7
private void invokeWrite(Object msg, ChannelPromise promise) {
// step 8
invokeWrite0(msg, promise);
}
// step 8
private void invokeWrite0(Object msg, ChannelPromise promise) {
// step 9 会直接调用到HeadContext的write方法
((ChannelOutboundHandler) handler()).write(this, msg, promise);
}
// step 9
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 利用unsafe写入(明白流程就好,关于具体怎么写入将放到编码器的博文上)
unsafe.write(msg, promise);
}

第二种情况:

1
2
3
4
5
6
7
8
9
10
// step 1
public void channelRead0(ChannelHandlerContext ctx, String request) throws Exception {
// step 2
ChannelFuture future = ctx.write(response);
}
// step 2
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
// 直接从当前节点写,后面就和第一种情况一样了
write(msg, false, promise);
}

参考

  1. netty源码分析之pipeline(一)
  2. netty源码分析之pipeline(二)