RocketMQ源码分析——Broker消息存储

引:消息存储对于一个消息队列来说是肯定要有的,在RocketMQ中,Broker将消息存储抽象成MessageStore接口,我们也将从这里入手~

数据结构

美图:

MessageStore

从上面我们可以看到几个核心的数据结构:

  • CommitLog:存储消息的数据结构,类似一个消息数组,按照消息收到的顺序,所有消息都存储在一起。每个消息存储后都会生成一个对应的offset,代表在commitLog中的字节偏移量。注意:CommitLog并不是一个文件,而是一系列文件(上图中的MappedFile)。每个MappedFile文件的大小都是固定的(默认1G),写满一个会生成一个新的文件,新文件的文件名就是它存储的第一条消息的offset。
  • ConsumeQueue:之前说了所有消息都是存储在一个commitLog中的,但是consumer是按照topic+queue的维度来消费消息的,没有办法直接从commitLog中读取,所以针对每个topic的每个queue都会生成consumeQueue,ConsumeQueue中存储的是消息在commitLog中的offset,可以理解成一个按topic+queue建的索引,每条消息占用20字节(上图中的一个cq)。跟commitLog一样,每个Queue文件也是一系列连续的文件组成,每个文件默认放30w个offset索引。
  • IndexFile:CommitLog的另外一种形式的索引文件,只是索引的是messageKey,每个MsgKey经过hash后计算存储的slot,然后将offset存到IndexFile的相应slot上。根据msgKey来查询消息时,可以先到IndexFile(slot+index类似一个hashmap)中查询offset,然后根据offset去commitLog中查询对应的消息。

关于图中的过程我们后面会展开讲解~

MessageStore启动

之前,我们在BrokerController的启动中看到了MessageStore启动,所以我们也从那里开始看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// org.apache.rocketmq.store.DefaultMessageStore
public void start() throws Exception {
// 构造文件锁,保证磁盘上的文件只会被一个messageStore读写
lock = lockFile.getChannel().tryLock(0, 1, false);
if (lock == null || lock.isShared() || !lock.isValid()) {
throw new RuntimeException("Lock failed,MQ already started");
}

lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);
// 启动FlushConsumeQueueService(继承ServiceThread,是个单线程)
// 定时将consumeQueue文件的数据刷新到磁盘,周期由参数flushIntervalConsumeQueue设置,默认1秒
this.flushConsumeQueueService.start();
// 启动CommitLog,对应了flushCommitLogService服务
// flushCommitLogService服务负责将CommitLog的数据flush到磁盘,有同步刷盘和异步刷盘两种方式
this.commitLog.start();
// 消息存储指标统计服务,RT,TPS等指标,admin可以用
this.storeStatsService.start();
// 针对master,启动延时消息调度服务,真的消费失败的情况
if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
this.scheduleMessageService.start();
}
// 启动ReputMessageService,该服务负责将CommitLog中的消息offset记录到cosumeQueue文件中
if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
}
this.reputMessageService.start();
// 启动HAService,数据主从同步的服务
this.haService.start();
// 对于新的broker,初始化文件存储的目录
this.createTempFile();
// 启动定时任务
this.addScheduleTask();
this.shutdown = false;
}

然后看看它起了哪些定时任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void addScheduleTask() {
// 定时清理过期的commitLog、cosumeQueue数据文件
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
// 定时自检commitLog和consumerQueue文件,校验文件是否完整。主要用于监控,不会做修复文件的动作
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.checkSelf();
}
}, 1, 10, TimeUnit.MINUTES);
// 定时检查commitLog的Lock时长(因为在write或者flush时侯会lock)
// 如果lock的时间过长,则打印jvm堆栈,用于监控。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
try {
if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();
if (lockTime > 1000 && lockTime < 10000000) {

String stack = UtilAll.jstack();
final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-"
+ DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
MixAll.string2FileNotSafe(stack, fileName);
}
}
} catch (Exception e) {
}
}
}
}, 1, 1, TimeUnit.SECONDS);
}

CommitLog

我们知道当Broker接收到消息会进行存储,首先存储的就是CommitLog,所以我们可以找到org.apache.rocketmq.store.CommitLog#putMessage()方法,这里可能有多条消息一起存储,我们这里主要说一下单条消息存储:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// 设置存储时间戳
msg.setStoreTimestamp(System.currentTimeMillis());
// 忽略加密
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// 构造返回结果
AppendMessageResult result = null;
// 拿到统计状态服务,后面会设置一些值
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
// 拿到Topic和queue信息
String topic = msg.getTopic();
int queueId = msg.getQueueId();

final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 非事务消息
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
// 延时投放消息,变更topic为SCHEDULE_TOPIC_XXXX以及队列
// 被重发的Schedule任务读到
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}

topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// 备份之前的topic和queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);
msg.setQueueId(queueId);
}
}

long eclipseTimeInLock = 0;
MappedFile unlockMappedFile = null;
// 获取当前正在写入文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 获取写message的锁,可以是自旋锁或者可重入锁,看配置
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;

// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
// 如果文件不存在或者文件已经写满,新建一个mappedfile
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
// 文件创建失败,则返回错误
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
// 消息写入文件,有一个回调,待会说
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
// 如果文件已满,则新建一个文件继续
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}

eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
// 释放获取到的锁
putMessageLock.unlock();
}
// 写消息时间过长, 警告
if (eclipseTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}
// unlock已经写满的文件,释放内存锁(系统锁)
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}

PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

// 一些统计信息
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
// flush数据到磁盘,分同步和异步
handleDiskFlush(result, putMessageResult, msg);
// 如果是SYNC_MASTER,则Master保存消息后,需要将消息同步给slave后才会返回结果
// 如果ASYNC_MASTER,这里不会做任何操作,由HAService的后台线程做数据同步
handleHA(result, putMessageResult, msg);

return putMessageResult;
}

我们在之前的数据结构图中以及上面的代码中都看到了CommitLog是存储在MappedFile中,下面我们就看看写入消息到MappedFile的实现,对应了org.apache.rocketmq.store.MappedFile#appendMessage()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
return appendMessagesInner(msg, cb);
}

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
// 获取当前的write position,用了原子计数器AtomicInteger
int currentPos = this.wrotePosition.get();

if (currentPos < this.fileSize) {
// 获得NIO的BytBuffer,从writeBuffer或者mappedByteBuffer,这里就利用了零拷贝
// 如果是writeBuffer,属于异步
// CommitLog启动的时候初始化一块内存池(通过ByteBuffer申请的堆外内存)
// 消息数据首先写入内存池中,然后后台有个线程定时将内存池中的数据commit到FileChannel中
// 如果是mappedByteBuffer,属于同步
// 在写入文件时,从FileChannel获取直接内存映射,收到消息后,将数据写入到这块内存中,内存和物理文件的数据交互由操作系统负责
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner) {
// 写单条消息到byteBuffer
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
// 批量消息到byteBuffer
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
// 更新write position,到最新值
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

在写入到MappedZFile的函数中,我们看到有个回调函数,好像也不算回调🤣,我们调用了这个函数的#doAppend()方法,我们看看它做了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

// PHY OFFSET 消息偏移 文件的名的offset + bytebuffer的位置就是要写的offset
long wroteOffset = fileFromOffset + byteBuffer.position();

this.resetByteBuffer(hostHolder, 8);
// 生成message ID, 前8位是host,后8位是wroteOffset,目的是便于使用msgID来查找消息
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
// 取得具体Queue的offset,值是当前是Queue里的第几条消息
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
// 如果是这个queue的第一条消息,需要初始化queueOffset
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}

// 事务消息需要特别的处理,略过
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}

// 序列化消息
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}

final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;

final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
// 计算机序列化消息的长度
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}

// 如果空间不足,magic code设置成-626843481,然后剩余字节随机,保证所有文件大小都是FileSize
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}

看到这里,CommitLog就已经被存入到ByteBuf里啦,等待被flush到文件里!!!!

ConsumeQueue

上面我们看到消息被放到了CommitLog中,但是consumer在消费消息的时候是按照topic+queue的维度来拉取消息的。为了方便读取,MessageStoreCommitLog中消息的offset按照topic+queueId划分后,存储到不同的文件中,这就是ConsumeQueue

在上面MessageStore启动的时候,我们看到他会启动一个服务ReputMessageService将CommitLog中的消息按照topic+queueId划分后,存储到不同的ConsumerQueue中,所以我们也从org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService开始看,既然是一个线程,所以我们首先会找到他的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
    public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}

DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
// 从上面可以看到主要在执行doReput方法
private void doReput() {
// 判断commitLog的maxOffset是否比上次读取的offset大,大就代表了有新的消息
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
// 从上次的结束offset开始读取commitLog文件中的消息
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();

for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 检查message数据完整性并封装成DispatchRequest
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();

if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 分发消息到CommitLogDispatcher
DefaultMessageStore.this.doDispatch(dispatchRequest);
// 当Broker为Master的时候,分发消息到MessageArrivingListener,唤醒等待的PullReqeust接收消息
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
// 更新最新reput的offset
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
// 如果读到文件结尾,则切换到新文件
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {

if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else {
doNext = false;
if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);

this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
// 释放bytebuff,避免内存泄漏
result.release();
}
} else {
doNext = false;
}
}
}

上面我们看到了消息被分发到了CommitLogDispatcher,这个是啥?找到DefaultMessageStore#doDispatch()方法:

1
2
3
4
5
6
  public void doDispatch(DispatchRequest req) {
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
private final LinkedList<CommitLogDispatcher> dispatcherList;

从上面看到是DefaultMessageStore的集合成员变量,那么这个集合是什么时候生成的呢?

我们在构造DefaultMessageStore会往里面添加两个CommitLogDispatcher,我们定位到DefaultMessageStore的构造函数,省略无关代码:

1
2
3
4
5
this.dispatcherList = new LinkedList<>();
// consumeQueue构建Dispatcher
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
// IndexFile构建Dispatcher
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

不知道大家还有没有印象,其实在说Broker初始化)的时候也提到过。我们省略下无关代码,再看一遍:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean initialize() throws CloneNotSupportedException {
// ...
if (result) {
try {
// 消息存取的核心接口初始化
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
// 添加消息分发器,分发到布隆过滤器
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
}

这里我们只分析org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
  class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
// 非事务消息
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// 放位置offset信息
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
// 找到对应的ConsumeQueue,没有就创建一个新的MappedFile文件
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
// org.apache.rocketmq.store.ConsumeQueue
public void putMessagePositionInfoWrapper(DispatchRequest request) {
// 写入重试次数,最多30次
final int maxRetries = 30;
// 判断CQ是否是可写的
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
// 拿到消息的Tag
long tagsCode = request.getTagsCode();
if (isExtWriteEnable()) {
// 如果需要写ext文件,则将消息的tagsCode写入,用于消息过滤,后面有机会再说
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());

long extAddr = this.consumeQueueExt.put(cqExtUnit);
if (isExtAddr(extAddr)) {
tagsCode = extAddr;
} else {
log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
topic, queueId, request.getCommitLogOffset());
}
}
// 写入CQ文件
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
}

// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {

if (offset <= this.maxPhysicOffset) {
return true;
}
// 结构可以参考数据结构中图
this.byteBufferIndex.flip();
// 一个CQUnit的大小是固定的20字节
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);

final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
// 获取最后一个MappedFile, 没有就创建
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {

if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}

if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
// ...
this.maxPhysicOffset = offset;
// CQUnit写入文件中, 使用filechannel写,同步写
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}

看到这里,ConsumeQueue就被写入到文件中了!!!!

IndexFile

MessageStore中存储的消息除了通过ConsumeQueue提供给consumer消费之外,还支持通过MessageID或者MessageKey来查询消息。使用ID查询时,因为ID就是用broker+offset生成的,所以很容易就找到对应的commitLog文件来读取消息。对于用MessageKey来查询消息,MessageStore通过构建一个index来提高读取速度。

在上面的CommitLogDispatcher链表中,我们还看到一个CommitLogDispatcher——CommitLogDispatcherBuildIndex

他就是用来创建IndexFile的,我们也将从那里入手:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
  class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
// org.apache.rocketmq.store.index.IndexService
// 写入indexFile
public void buildIndex(DispatchRequest req) {
// 获取或者新建当前可写入的index file, 默认重试3次
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
// 获取当前indexFile中记录的最大offset
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}
// 过滤回滚消息
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return;
}
// 单条消息
if (req.getUniqKey() != null) {
// 写入index
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
// 多条消息,循环写入index
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}
// 接着看如何将MessageKey写入IndexFile
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
// 重试写入
for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");

indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
return null;
}

ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
}

return indexFile;
}
// org.apache.rocketmq.store.index.IndexFile
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
// 判断indexFile是否已满,已满返回失败
if (this.indexHeader.getIndexCount() < this.indexNum) {
// 计算key的hashCode(非负),调用的java String的hashcode方法
int keyHash = indexKeyHashMethod(key);
// 计算slot位置(第几个)
int slotPos = keyHash % this.hashSlotNum;
// 计算slot的数据存储位置
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

FileLock fileLock = null;

try {
// 之前说过slot+index类似一个hashmap,slot类似于hashmap的数组
// 如果存在hash冲突,获取这个slot存的前一个index的计数,如果没有则值为0
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
// 计算当前msg的存储时间和第一条msg相差秒数
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

timeDiff = timeDiff / 1000;

if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
// 获取该条index实际存储position
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
// 生成一个index的unit内容(看图)
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
// 更新slot中的值为本条消息的index(因为rocketmq觉得新消息被查询的机会更大)
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
// 如果是第一条消息,更新header中的起始offset和起始时间
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
// 更新header中的计数
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);

return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}

return false;
}

看到这里,IndexFile就已经被存入到ByteBuf里啦,等待被flush到文件里!!!!

参考

  1. RocketMQ消息存储流程图及数据结构图
  2. RocketMQ源码解析(九)-Broker#消息存储ConsumeQueue
  3. RocketMQ源码解析(十)-Broker#消息存储Index