Netty源码之服务端启动

引:Netty启动啦!开心脸!但是这又可能是一篇又臭又长由绕的文章,需要点耐心,需要点动手能力。无奈脸!!!

Netty Example

我选择了Netty源码中的EchoServer来作为例子,它在io.netty.example.echo包下,代码如下所示(忽略细节,专注核心):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public final class EchoServer {

public static void main(String[] args) throws Exception {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
// 配置Server端
// AbstractBootstrap的group
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// ServerBootstrap的childGroup
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
// 在它创建的过程中会绑定一个ServerBootstrapConfig保存它的属性
ServerBootstrap b = new ServerBootstrap();
// 方法链配置类,可以学习
// 服务端的Bootstrap才有两个参数的group方法
b.group(bossGroup, workerGroup)
// 设置AbstractBootstrap的channelFactory参数
.channel(NioServerSocketChannel.class)
// 设置AbstractBootstrap的options参数(常量池实现,ConcurrentMap),可以学习
.option(ChannelOption.SO_BACKLOG, 100)
// 设置AbstractBootstrap的handler参数,这里配置了一个日志处理类
.handler(new LoggingHandler(LogLevel.INFO))
// 设置ServerBootstrap的childHandler参数
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(serverHandler);
}
});
// 服务端启动(一切分析的源头)
ChannelFuture f = b.bind(PORT).sync();
// 等待服务端关闭socket
f.channel().closeFuture().sync();
} finally {
// 优雅关闭两组死循环
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

源码分析

上面的例子的注释已经很清楚了,在启动之前主要就是在配置启动类ServerBootstrap。现在我就下面的启动方法开始深入分析(debug,代码展示会忽略细节):

1
ChannelFuture f = b.bind(PORT).sync();

Bootstrap相关

我们可以看看这个类干了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// step 1
public ChannelFuture bind(int inetPort) {
// 通过端口生成一个InetSocketAddress
// step 2
return bind(new InetSocketAddress(inetPort));
}
// step 2
public ChannelFuture bind(SocketAddress localAddress) {
// 验证group和channelFactory是否传进来了
validate();
// 非空判断,这里只说一次,大的工程项目,必须通过防御性编程来增强鲁棒性
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
// step 3
return doBind(localAddress);
}
// step 3
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化serverChannel,并将eventloop注册到serverChannel上
// step 4
final ChannelFuture regFuture = initAndRegister();
// 获取刚刚创建的serverChannel,由于之后都不会改变,所以这里可以用final修饰
final Channel channel = regFuture.channel();
// 绑定地址
doBind0(regFuture, channel, localAddress, promise);
}

// step 4
final ChannelFuture initAndRegister() {
Channel channel = null;
// 生成一个serverChannel
// 这里的channelFactory我们在例子介绍过,其实就是在工厂中利用反射生成对应的channel
// 这里我们会在NioServerSocketChannel类中分析它构造时会发生什么(可以直接跳过去看)
channel = channelFactory.newChannel();
// 初始化serverChannel
// step 5
init(channel);
// config().group()就是通过ServerBootstrapConfig拿到bossGroup没什么好说的,我们重点关注register方法,我们可以进入EventLoopGroup分析(可以直接跳过去看)
// MultithreadEventLoopGroup的 step 1
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}

// step 5 ServerBootstrap具体实现
private final Map < ChannelOption < ?>,Object > childOptions;
private final Map < AttributeKey < ?>,Object > childOptions;
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
void init(Channel channel) throws Exception {
// 拿到设置的AbstractBootstrap的options
final Map < ChannelOption < ?>,
Object > options = options0();
// 同步
synchronized(options) {
// 设置options,跟下去我们会发现将options放入NioServerSocketChannelConfig中
setChannelOptions(channel, options, logger);
}
// 同options设置
final Map < AttributeKey < ?>,
Object > attrs = attrs0();
synchronized(attrs) {
for (Entry < AttributeKey < ?>, Object > e: attrs.entrySet()) {@SuppressWarnings("unchecked") AttributeKey < Object > key = (AttributeKey < Object > ) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 拿到ChannelPipeline
ChannelPipeline p = channel.pipeline();
// 拿到之前注入的childGroup和childHandler
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
// 设置childOptions和childOptions,这些是应用于以后新接入的channel
final Entry < ChannelOption < ?>,
Object > [] currentChildOptions;
final Entry < AttributeKey < ?>,
Object > [] currentChildAttrs;
synchronized(childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized(childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
// 加入新连接处理器
p.addLast(new ChannelInitializer < Channel > () {@Override public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 将会获得最早之前设置的new LoggingHandler(LogLevel.INFO)
ChannelHandler handler = config.handler();
// 新增一个新连接接入器ServerBootstrapAcceptor,具体的可以看之后的博文
ch.eventLoop().execute(new Runnable() {@Override public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
// step 6
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
// 又是异步任务,Netty随处可见
channel.eventLoop().execute(new Runnable() {@Override public void run() {
if (regFuture.isSuccess()) {
// AbstractChannel step 11
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

Channel相关

创建一个NioServerSocketChannel对象会发生什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
// ServerSocketChannel的一系列配置属性
private final ServerSocketChannelConfig config;
// step 1(默认构造函数)
public NioServerSocketChannel() {
// step 2,step 3
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
// step 2
private static ServerSocketChannel newSocket(SelectorProvider provider) {
// 利用JDK NIO 创建的SeveSocketrChannel
return provider.openServerSocketChannel();
}
// step 3
public NioServerSocketChannel(ServerSocketChannel channel) {
// step 4
super(null, channel, SelectionKey.OP_ACCEPT);
// 配置NioServerSocketChannelConfig(初始化会用到)
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
// step 4 AbstractNioMessageChannel
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// step 5
super(parent, ch, readInterestOp);
}
// step 5 AbstractNioChannel
private final SelectableChannel ch;
protected final int readInterestOp;
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// step 6
super(parent);
// 保存原生的ServerSocketChannel
this.ch = ch;
// 保存感兴趣的事件(SelectionKey.OP_ACCEPT,1<<4)
this.readInterestOp = readInterestOp;
// 将Channel设置为非阻塞
ch.configureBlocking(false);
}
// step 6 AbstractChannel
private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 生成channel的唯一标识,自己可以去看看实现
id = newId();
// 生成unsafe对象(进行底层的IO操作),一个抽象方法,需要具体实现,它的抽象类也在AbstractChannel中
unsafe = newUnsafe();
// 生成ChannelPipeline
// step 7
pipeline = newChannelPipeline();
}
// step 7
protected DefaultChannelPipeline newChannelPipeline() {
// 我们将在DefaultChannelPipeline分析
return new DefaultChannelPipeline(this);
}

// step 8 AbstractChannel
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 绑定了eventLoop
AbstractChannel.this.eventLoop = eventLoop;
// ... 专注于核心
// step 9
register0(promise);
// ...
}
// step 9
private void register0(ChannelPromise promise) {
// 当然是第一次注册,true
boolean firstRegistration = neverRegistered;
// step 10
doRegister();
// 注册之后设置标志位
neverRegistered = false;
registered = true;
// pipeline之后的博文会说
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 触发注册完成事件,还会传播pipeline.fireChannelActive();
pipeline.fireChannelRegistered();
// 判断是否绑定,会调用底层NIO创建的channel,应该为false
// 那么什么时候建立连接的呢,怎么找,可以参考后面列出的闪电侠的博文,这个函数分析完了
// 我们应该回到AbstractBootstrap的 step 6 继续分析
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
}
// step 10
protected void doRegister() throws Exception {
// 利用jdk底层将channel注册到selector上,0表示不关心任何事件,this是为了当NIO检测到事件时可以对netty的channel进行操作
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
}
// step 11
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// DefaultChannelPipeline step 2
return pipeline.bind(localAddress, promise);
}
// step 12
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 肯定还没有建立连接
boolean wasActive = isActive();
// step 13
doBind(localAddress);
// 这里isActive()应该要返回true了
if (!wasActive && isActive()) {
invokeLater(new Runnable() {@Override public void run() {
// 触发端口绑定事件,我们将会触发HeadContext的channelActive方法
// ChannelPipeline step 5
pipeline.fireChannelActive();
}
});
}

safeSetSuccess(promise);
}
// step 13
protected void doBind(SocketAddress localAddress) throws Exception {
// 到这里就是进行底层的绑定
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
// step 14 AbstractChannel
public Channel read() {
// DefaultChannelPipeline step 7
pipeline.read();
return this;
}
// step 16 AbstractNioChannel的AbstractNioUnsafe
protected void doBeginRead() throws Exception {
// 还记得在初始化AbstractNioChannel时保存的readInterestOp和在channel注册过程中的selectionKey吗,不记得往上看一看
final SelectionKey selectionKey = this.selectionKey;
// 需要读
readPending = true;
// 拿到具体的值,1<<4=16
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 准备告诉Selector需要关注SelectionKey.OP_ACCEPT事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}

ChannelPipeline相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// step 1 DefaultChannelPipeline
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
protected DefaultChannelPipeline(Channel channel) {
// 绑定channel(如果为空则抛出异常,学习写工具类)
this.channel = ObjectUtil.checkNotNull(channel, "channel");
// 默认的两个节点
tail = new TailContext(this);
head = new HeadContext(this);
// 形成双向链表
head.next = tail;
tail.prev = head;
}

// step 2 DefaultChannelPipeline
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// 这个tail之前就设置过了
// step 3
return tail.bind(localAddress, promise);
}

// step 3 AbstractChannelHandlerContext
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 寻找tail之前的一个节点
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 调用下一个AbstractChannelHandlerContext的invokeBind方法,一直到找到HeadContext
// step 4
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {@Override public void run() {
next.invokeBind(localAddress, promise);
}
},
promise, null);
}
return promise;
}
// step 4
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
// 看到unsafe就知道要进行底层操作的
// 进入 AbstractChannel的Unsafe step 12
unsafe.bind(localAddress, promise);
}

// step 5 DefaultChannelPipeline的HeadContext
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 绑定时间将会一直传播下去
ctx.fireChannelActive();
// step 6 深入分析
readIfIsAutoRead();
}

// step 6 DefaultChannelPipeline的HeadContext
private void readIfIsAutoRead() {
// 我们很容易看到isAutoRead()方法默认返回为1
if (channel.config().isAutoRead()) {
// 进入 AbstractChannel step 14
// 对于服务端channel的读来说就说可以绑定连接了
channel.read();
}
}
// step 7 DefaultChannelPipeline
public final ChannelPipeline read() {
// 从tail节点一直追寻到head,找到 unsafe.beginRead()
// step 8
tail.read();
return this;
}
// step 8 DefaultChannelPipeline的HeadContext
public final void beginRead() {
// 会一直找到AbstractNioChannel的AbstractNioUnsafe的doBeginRead step 16
doBeginRead();
}

EventLoop相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// step 1 MultithreadEventLoopGroup
@Override
public ChannelFuture register(Channel channel) {
// next()将会通过选择器(之后的博文会讲到)拿到一个EventLoop
// step 2
return next().register(channel);
}

// step2 SingleThreadEventLoop
public ChannelFuture register(Channel channel) {
// step 3
return register(new DefaultChannelPromise(channel, this));
}
// step 3
public ChannelFuture register(final ChannelPromise promise) {
// 通过unsafe()方法得到Unsafe对象(绑定在channel上的那个)我们可以知道,它要开始底层操作注册了
// 我们我们又要回到AbstractChannel去了,看看它的register注册方法
// Channel step 8
promise.channel().unsafe().register(this, promise);
return promise;
}

总结

  1. 设置启动类参数
  2. 创建服务端Channel,同时创建ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等
  3. 初始化服务端Channel,设置一些attr,option,以及设置子channel的attr,option,给服务端channel添加新channel接入器
  4. 将Channel注册到Selector,并触发addHandler,register等事件
  5. 进行端口绑定,并触发active事件,同时注册ACCEPT事件

参考

  1. netty源码分析之服务端启动全解析