Netty源码之解码器

引:Netty在帮我们解决性能的同时,也提供了丰富的编解码器来为我们业务上提供便利,这次我们就来看看Netty的解码器。

解码器基类

解码:就是将二进制数据流解码为自定义数据包。

在Netty中解码器的基类是ByteToMessageDecoder,然后我们要明白的是ByteToMessageDecoder其实是一个ChannelInboundHandlerAdapter。

一个解码器解码的过程主要有如下三个步骤:

  1. 累加字节流
  2. 调用子类的decode方法进行解析
  3. 将解析到的ByteBuf向下传播

当我们知道ByteToMessageDecoder是一个Handler的时候,我们就会去找它对事件的处理方法,主要是读事件,所以我们找到读事件处理方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// step 1
ByteBuf cumulation;
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 解码器是相对于ByteBuf进行解码的(在入站的时候已经封装好了,可以参考自己之前关于Pipeline的博文)
if (msg instanceof ByteBuf) {
// 存储解码后的数据 List
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
// 是否是第一次累加
first = cumulation == null;
if (first) {
// 第一次
cumulation = data;
} else {
// 不是第一次,则进行累加
// 我们看看这个累加器的cumulate方法 step 2
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 调用子类的decode方法进行解析
// step 3
callDecode(ctx, cumulation, out);
} catch(DecoderException e) {
throw e;
} catch(Exception e) {
throw new DecoderException(e);
} finally {
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
// 将解析到的对象向下继续传播
// step 5
fireChannelRead(ctx, out, size);
// 回收list
out.recycle();
}
} else {
// 不是ByteBuf就直接向下传播
ctx.fireChannelRead(msg);
}
}
// step 2
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in ) {
final ByteBuf buffer;
// 当buf的数据大于cumulation的剩余容量时
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
// 进行扩容(扩容其实就是一个内存拷贝操作)
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
// 不然就不扩容
buffer = cumulation;
}
// 将数据写入buf
buffer.writeBytes(in);
// 释放传入的buf
in.release();
return buffer;
}
// step 3
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in , List < Object > out) {
while ( in .isReadable()) {
int outSize = out.size();
// 看list是否有对象
if (outSize > 0) {
// 有对象则将list传播出去
fireChannelRead(ctx, out, outSize);
// 并清空list
out.clear();
outSize = 0;
}
// 记录可读长度
int oldInputLength = in.readableBytes();
// 开始调用decode方法了
// step 4
decodeRemovalReentryProtection(ctx, in, out);
// 什么都没有解析出来时
if (outSize == out.size()) {
// 代表当前的数据不能拼装成一个完整的数据包,break准备等待下一次数据包
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
}
}
// step 4
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in , List < Object > out) throws Exception {
// ...
// 开始调用解码器子类的decode方法了
decode(ctx, in, out);
// ...
}


// step 5
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i++) {
// msgs.getUnsafe(i)保证成byteBuf
// 这里可能就会传播到业务的处理器
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}

LineBasedFrameDecoder

我们挑选了基于分隔符解码器LineBasedFrameDecoder来分析,它可以同时处理 \n以及\r\n两种类型的行分隔符,我们看看它的decode方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// step 1 (解码器模板)
protected final void decode(ChannelHandlerContext ctx, ByteBuf in , List < Object > out) throws Exception {
// step 2
Object decoded = decode(ctx, in);
if (decoded != null) {
// 拆除一个数据包就放进List里
out.add(decoded);
}
}
// step 2 LineBasedFrameDecoder
private boolean discarding;
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// 找到换行符位置
final int eol = findEndOfLine(buffer);
// 第一次拆包不在丢弃模式下
if (!discarding) {
// 非丢弃模式下
// 找到分隔符
if (eol >= 0) {
final ByteBuf frame;
final int length = eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r' ? 2 : 1;
// 判断拆包的长度是否大于允许的最大长度,maxLength构造时传入
if (length > maxLength) {
// 超出最大长度则丢弃这段数据,抛出异常
buffer.readerIndex(eol + delimLength);
fail(ctx, length);
return null;
}
// 解码数据是否要包含分隔符
if (stripDelimiter) {
// 不包含
frame = buffer.readRetainedSlice(length);
buffer.skipBytes(delimLength);
} else {
// 包含
frame = buffer.readRetainedSlice(length + delimLength);
}
// 返回拆包后封装成的帧
return frame;
} else {
// 没找到分隔符
final int length = buffer.readableBytes();
if (length > maxLength) {
// 如果大于最大长度,进入丢弃模式
// 保存丢弃长度
discardedBytes = length;
buffer.readerIndex(buffer.writerIndex());
discarding = true;
offset = 0;
if (failFast) {
fail(ctx, "over " + discardedBytes);
}
}
// 不大于,则返回null
return null;
}
} else {
// 丢弃模式下
if (eol >= 0) {
// 找到分割符
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r' ? 2 : 1;
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) {
fail(ctx, length);
}
} else {
// 保存丢弃长度
discardedBytes += buffer.readableBytes();
// 改变读指针位置
buffer.readerIndex(buffer.writerIndex());
}
return null;
}
}

总结

其他的解码器也可以根据LineBasedFrameDecoder的思路去分析。当然我们最需要关注的还是整个解码器的设计思想。学会去抽象一类东西。

参考

  1. netty源码分析之拆包器的奥秘