引:前面讲到了Broker对于不同的消息有自己不同的Processor,对于PushConsumer
的Pull请求的时候,它对应的就是PullMessageProcessor
~
PullMessageProcessor
通过前面的分析,我们自然而然地就会想到去看org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest()
方法:
1 | private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) |
从上面我们可以看出:我们会根据topic和queueId和offset和过滤器从MessageStore
中读取消息,如果成功,我们可以从指向内存文件的ByteBuffer
得到数据,根据配置有两种返回数据的方式,第一种是从ByteBuffer
中将数据读取到response中(经过堆),然后返回。第二种是让netty直接读取ByteBuffer
,将消息写给客户端,相对前一种,不需要将ByteBuffer
中的数据copy到java Heap中,少一次内存copy。但是第二种方式无法记录监控信息,比如consumer消费延时(从代码看出)等指标;如果没有读取到消息,会判断是否是PushConsumer
(参数hasSuspendFlag
)以及是否允许挂起(参数brokerAllowSuspend
),如果是的话,则将请求挂起,方式就是封装成PullRequest
提交给PullRequestHoldService
。
#MessageStore#getMessage
我们看看这个方法的代码:
1 | public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, |
如果理解了Broker的信息存储,上面的这些代码粗略的看还是比较好懂的。
主要是先找到topic+queue对应的ConsumeQueue
,根据PullRequest
传入的offset找到
对应的MappedFile
;从MappedFile
中里面读取指定数量的CQUnit
,根据TagsCode做下过滤,然后得到过滤后的Commit log
的offset;然后根据offset从CommitLog中获取具体的Message;最后再根据消息内容再做一次过滤,然后返回结果。
PullRequestHoldService#suspendPullRequest
从Processor的处理来看,如果没有读取到消息,会判断是否是PushConsumer
(参数hasSuspendFlag
)以及是否允许挂起(参数brokerAllowSuspend
),如果是的话,则将请求挂起,方式就是封装成PullRequest
提交给PullRequestHoldService
。所以我们这里看看它是如何挂起请求处理的:
1 | // org.apache.rocketmq.broker.longpolling.PullRequestHoldService |
然后我们回想一下,我们是不是在BrokerController启动中看到了PullRequestHoldService(继承ServiceThread)被启动了,所以我们应该去看看PullRequestHoldService做了什么,所以首先去看他的run方法:
1 | public void run() { |
然后我们在回想一下PullRequestHoldService#notifyMessageArriving()
方法,我们是否似曾相识,对了,就是在org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput()
方法时:
1 | if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() |
OK!到这里,Broker就已经完成了PullMessage的接收啦!!