Netty源码之编码器

引:上一篇博文讲了解码器对应了读事件,这次就讲一下编码器,对应了写事件。

编码器基类

在Netty中解码器的基类是MessageToByteEncoder ,然后我们要明白的是MessageToByteEncoder其实是一个ChannelOutboundHandlerAdapter。

我们在使用的过程中主要就是覆写它的encode方法:

1
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

一个编码器编码的过程主要有如下6个步骤:

  1. 匹配对象
  2. 分配内存
  3. 编码实现
  4. 释放对象
  5. 传播数据
  6. 释放内存

当我们知道MessageToByteEncoder是一个Handler的时候,我们就会去找它对事件的处理方法,主要是写事件,所以我们找到写事件处理方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// step 1
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
// 判断是否能够处理该对象
// step 2
if (acceptOutboundMessage(msg)) {
// 能处理
I cast = (I) msg;
// 内存分配(默认分配堆外内存 )
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 开始调用我们覆写的encode方法了
encode(ctx, cast, buf);
} finally {
// 释放对象
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
// 传播数据,一直传播到head节点处理
ctx.write(buf, promise);
} else {
// 如果编码没有成功,则是否内存
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
// 不能处理,则继续传播
ctx.write(msg, promise);
}
} catch(EncoderException e) {
throw e;
} catch(Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
// step 2
private final TypeParameterMatcher matcher;
public boolean acceptOutboundMessage(Object msg) throws Exception {
// step 3
return matcher.match(msg);
}
// step 3
public boolean match(Object msg) {
// 主要就是判断msg是否是MessageToByteEncoder<I>中的I类型
return type.isInstance(msg);
}

write()

之前自己在ChannelPipeline那章分析到HeadContext的write方法,没有继续往下分析,我们在这里分析:

它主要有这么几个过程:

  1. direct化ByteBuf
  2. 插入写链表
  3. 设置写状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// step 1
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// step 2
unsafe.write(msg, promise);
}
// step 2
public final void write(Object msg, ChannelPromise promise) {
// 该对象负责缓冲写进来的数据
估算出需要写入的ByteBuf的size outboundBuffer = this.outboundBuffer;
// ...
int size;
try {
// 将待写入的对象过滤,把所有的非直接内存转换成直接内存DirectBuffer
// step 3
msg = filterOutboundMessage(msg);
// 估算出需要写入的ByteBuf的size
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch(Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
// 将信息添加到估算出需要写入的ByteBuf的size
// step 4
outboundBuffer.addMessage(msg, size, promise);
}
// step 3
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
// 转换成直接内存DirectBuffer
return newDirectBuffer(buf);
}

if (msg instanceof FileRegion) {
return msg;
}
}
// step 4 ChannelOutboundBuffer
// 一个链表
// Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
//
// 指向第一个已经flush的节点
private Entry flushedEntry;
// 指向第一个未flush的节点
private Entry unflushedEntry;
// 指向末尾节点
private Entry tailEntry;

public void addMessage(Object msg, int size, ChannelPromise promise) {
// 关注这三个指针tailEntry、tailEntry、unflushedEntry
// 先包装成一个Entry
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
// 插入链表
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// 统计当前需要写入到Socket缓存区的字节
// step 5
incrementPendingOutboundBytes(entry.pendingSize, false);
}
// step 5
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}

long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
// 默认缓冲区不能超过64个字节
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
// 设置写状态
setUnwritable(invokeLater);
}
}
// step 6
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
// 传播不能写事件
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}

flush()

write不能写了就需要flush了,我们同样也从HeadContext的flush方法开始分析:

它主要有这么几个步骤:

  1. 添加刷新标志并设置写状态
  2. 遍历buffer队列,过滤ByteBuf
  3. 调用JDK底层的API进行自旋写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// step 1
public void flush(ChannelHandlerContext ctx) throws Exception {
// step 2
unsafe.flush();
}
// step 2
public final void flush() {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// step 3
outboundBuffer.addFlush();
// step 5
flush0();
}
// step 3
public void addFlush() {
// 指向unflushedEntry
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
flushedEntry = entry;
}
do {
// 设置flush的数量
flushed++;
if (!entry.promise.setUncancellable()) {
int pending = entry.cancel();
// step 4
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while ( entry != null );
// 将unflushedEntry设置为null
unflushedEntry = null;
}
}
// step 4
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
// 默认缓冲区低于32个字节,则可设置可写
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
// 设置可写状态
setWritable(invokeLater);
}
}
// step 5
protected void flush0() {
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
// ...
// step 6
doWrite(outboundBuffer);
}
// step 6 NioSocketChannel
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
// 拿到底层的SocketChannel准备开始底层的操作
SocketChannel ch = javaChannel();
// 获取循环次数,相当于一个自旋锁,默认16
int writeSpinCount = config().getWriteSpinCount();
do {
// 如果buffer里空的
if ( in .isEmpty()) {
// 清理OP_WRITE,防止Reacotr线程再次处理这个Channel
clearOpWrite();
return;
}
// 获取聚合写的最大字节数,默认Integer.MAX_VALUE
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
// 把ByteBuf里的数据写到原生Buffer里
// step 7
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
// 对非Buffer对象(FileRegion)的数据进行普通的读写
switch (nioBufferCnt) {
case 0:
// 没成功,自旋一次
writeSpinCount -= doWrite0( in );
break;
case 1:
// JDK NIO 支持一次写单个ByteBuffer 以及 一次写多个ByteBuffer的聚集写模式
// 如果只有一个buffer的情况下,直接把这个buffer写进去
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
// 调用JDK底层的API进行写,完毕
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
// 释放缓存对象
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default:
{
// 多个buffer的情况下,写nioBufferCnt个buffer进去
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in .removeBytes(localWrittenBytes); --writeSpinCount;
break;
}
}
} while ( writeSpinCount > 0 );

incompleteWrite(writeSpinCount < 0);
}

知道write和flush那么就很容易理解writeAndFlush啦!!

参考

  1. etty源码分析之writeAndFlush全解析
  2. Netty源码分析——flush流程
  3. Netty 源码解析 ——— writeAndFlush流程分析