引:消息存储对于一个消息队列来说是肯定要有的,在RocketMQ中,Broker将消息存储抽象成MessageStore
接口,我们也将从这里入手~
数据结构
美图:
从上面我们可以看到几个核心的数据结构:
- CommitLog:存储消息的数据结构,类似一个消息数组,按照消息收到的顺序,所有消息都存储在一起。每个消息存储后都会生成一个对应的offset,代表在commitLog中的字节偏移量。注意:CommitLog并不是一个文件,而是一系列文件(上图中的MappedFile)。每个MappedFile文件的大小都是固定的(默认1G),写满一个会生成一个新的文件,新文件的文件名就是它存储的第一条消息的offset。
- ConsumeQueue:之前说了所有消息都是存储在一个commitLog中的,但是consumer是按照topic+queue的维度来消费消息的,没有办法直接从commitLog中读取,所以针对每个topic的每个queue都会生成consumeQueue,ConsumeQueue中存储的是消息在commitLog中的offset,可以理解成一个按topic+queue建的索引,每条消息占用20字节(上图中的一个cq)。跟commitLog一样,每个Queue文件也是一系列连续的文件组成,每个文件默认放30w个offset索引。
- IndexFile:CommitLog的另外一种形式的索引文件,只是索引的是messageKey,每个MsgKey经过hash后计算存储的slot,然后将offset存到IndexFile的相应slot上。根据msgKey来查询消息时,可以先到IndexFile(slot+index类似一个hashmap)中查询offset,然后根据offset去commitLog中查询对应的消息。
关于图中的过程我们后面会展开讲解~
MessageStore启动
之前,我们在BrokerController的启动中看到了MessageStore启动,所以我们也从那里开始看:
1 | // org.apache.rocketmq.store.DefaultMessageStore |
然后看看它起了哪些定时任务:
1 | private void addScheduleTask() { |
CommitLog
我们知道当Broker接收到消息会进行存储,首先存储的就是CommitLog,所以我们可以找到org.apache.rocketmq.store.CommitLog#putMessage()
方法,这里可能有多条消息一起存储,我们这里主要说一下单条消息存储:
1 | public PutMessageResult putMessage(final MessageExtBrokerInner msg) { |
我们在之前的数据结构图中以及上面的代码中都看到了CommitLog是存储在MappedFile中,下面我们就看看写入消息到MappedFile的实现,对应了org.apache.rocketmq.store.MappedFile#appendMessage()
方法:
1 | public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) { |
在写入到MappedZFile的函数中,我们看到有个回调函数,好像也不算回调🤣,我们调用了这个函数的#doAppend()
方法,我们看看它做了什么:
1 | public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, |
看到这里,CommitLog就已经被存入到ByteBuf里啦,等待被flush到文件里!!!!
ConsumeQueue
上面我们看到消息被放到了CommitLog中,但是consumer在消费消息的时候是按照topic+queue的维度来拉取消息的。为了方便读取,MessageStore
将CommitLog
中消息的offset按照topic+queueId划分后,存储到不同的文件中,这就是ConsumeQueue
。
在上面MessageStore启动的时候,我们看到他会启动一个服务ReputMessageService
将CommitLog中的消息按照topic+queueId划分后,存储到不同的ConsumerQueue中,所以我们也从org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService
开始看,既然是一个线程,所以我们首先会找到他的run方法:
1 | public void run() { |
上面我们看到了消息被分发到了CommitLogDispatcher,这个是啥?找到DefaultMessageStore#doDispatch()
方法:
1 | public void doDispatch(DispatchRequest req) { |
从上面看到是DefaultMessageStore的集合成员变量,那么这个集合是什么时候生成的呢?
我们在构造DefaultMessageStore会往里面添加两个CommitLogDispatcher,我们定位到DefaultMessageStore的构造函数,省略无关代码:
1 | this.dispatcherList = new LinkedList<>(); |
不知道大家还有没有印象,其实在说Broker初始化)的时候也提到过。我们省略下无关代码,再看一遍:
1 | public boolean initialize() throws CloneNotSupportedException { |
这里我们只分析org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue
:
1 | class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { |
看到这里,ConsumeQueue就被写入到文件中了!!!!
IndexFile
MessageStore
中存储的消息除了通过ConsumeQueue
提供给consumer消费之外,还支持通过MessageID或者MessageKey来查询消息。使用ID查询时,因为ID就是用broker+offset生成的,所以很容易就找到对应的commitLog
文件来读取消息。对于用MessageKey来查询消息,MessageStore
通过构建一个index来提高读取速度。
在上面的CommitLogDispatcher链表中,我们还看到一个CommitLogDispatcher——CommitLogDispatcherBuildIndex
他就是用来创建IndexFile的,我们也将从那里入手:
1 | class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { |
看到这里,IndexFile就已经被存入到ByteBuf里啦,等待被flush到文件里!!!!