Netty源码之新连接接入

引:之前对服务端和NioEventLoop都有了一定的分析,相信大家的服务端应该已经虚位以待了。好的,我们现在就开始分析新连接接入

新连接检测

用过NIO的人都会知道新连接检测应该在处理select出来的SelectedKey中出现,所以我们就从这开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// step 1
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
eventLoop = ch.eventLoop();
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 会进入到这里(接收连接)
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// step 2
unsafe.read();
}
}
// step 2 AbstractNioMessageChannel
public void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// 控制读取连接的速率,默认一次读取16个连接
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
do {
// step 3
int localRead = doReadMessages(readBuf);
// 对连接数增加
allocHandle.incMessagesRead(localRead);
} while ( allocHandle . continueReading ());

int size = readBuf.size();
for (int i = 0; i < size; i++) {
readPending = false;
// 触发读事件,绑定到NioEventLoop
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
// step 3
protected int doReadMessages(List < Object > buf) throws Exception {
// 熟悉的accept方法,熟悉的javaChannel()拿到serverSocketChannel
// step 4
SocketChannel ch = SocketUtils.accept(javaChannel());
// 封装成NioSocketChannel,进入创建部分
buf.add(new NioSocketChannel(this, ch));
}
// step 4
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
// 返回SocketChannel(新出来的AccessController还需要研究)
return AccessController.doPrivileged(new PrivilegedExceptionAction < SocketChannel > () {@Override public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
}

检测完毕!!!

创建Channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// step 1 NioSocketChannel
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
// 同样创建一个config,这里设置了禁止Nagle算法(让数据尽快发出去,而不是让小数据包变大再发)
config = new NioSocketChannelConfig(this, socket.socket());
}
// step 2 AbstractNioByteChannel
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// 这里要准备绑定读事件了
// step 3
super(parent, ch, SelectionKey.OP_READ);
}
// step 3
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// step 4
super(parent);
this.ch = ch;
// 设定为对读事件感兴趣
this.readInterestOp = readInterestOp;
// 设置为非阻塞
ch.configureBlocking(false);
}
// step 4
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 也给自己分配这些东西
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

Channel注册绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// step 1 DefaultChannelPipeline
public final ChannelPipeline fireChannelRead(Object msg) {
// step 2
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
// step 2
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 还是那个线程生成器
EventExecutor executor = next.executor();
// step 3
next.invokeChannelRead(m);
}
// step 3
private void invokeChannelRead(Object msg) {
// step 4 它会一直讲事件传播到再服务端启动过程中添加到Pipeline的ServerBootstrapAcceptor
((ChannelInboundHandler) handler()).channelRead(this, msg);
}
// step 4 ServerBootstrap里的ServerBootstrapAcceptor
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 设置一系列信息
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry < AttributeKey < ?>, Object > e: childAttrs) {
child.attr((AttributeKey < Object > ) e.getKey()).set(e.getValue());
}
// 忽略细节,关注register方法(这里是childGroup)
// step 5
childGroup.register(child);
}
// step 5 MultithreadEventLoopGroup
public ChannelFuture register(Channel channel) {
// 这里的next方法会去拿到之前ServerChannel初始化的EventLoop数组的一个
// step 6
return next().register(channel);
}
// step 6 SingleThreadEventLoop
public ChannelFuture register(Channel channel) {
// 看到这里会发现注册流程基本和ServerSocketChanne是一样的
// step 7
return register(new DefaultChannelPromise(channel, this));
}
// step 7
public ChannelFuture register(final ChannelPromise promise) {
// 看到这里,剩下的基本上就可以参考服务端启动流程了
promise.channel().unsafe().register(this, promise);
}

总结

  1. ServerSocketChannel绑定的EventLoop轮询到有新的连接进入
  2. 通过封装jdk底层的channel创建 NioSocketChannel以及一系列的netty核心组件
  3. 将该cahnnel通过chooser,选择一个EventLoop绑定上去
  4. 注册读事件,开始新连接的读写(可以参考服务端启动流程)

参考

  1. netty源码分析之新连接接入全解析