引:当消息达到Broker之后,就等着Consumer去consume了呀~
消费方式
PullConsumer:消费者主动调用pull方法来获取消息,没有则返回
PushConsumer:虽然名为Push,但是是消费者主动循环发送Pull请求到broker,如果没有消息,broker会把请求放入等待队列,新消息到达后返回response
所以本质上,两种方式都是通过消费者主动Pull来实现的。
消费模式
Consumer的消费模式在初始化consumer时设置的,主要有下面两种:
- Broadcast模式:消息会发送给group内所有consumer
- Cluster模式:每条消息只会发送给group内的一个consumer,但是Cluster模式的支持消费失败重发,从而保证消息一定被消费
使用方式
这次我们主要看看PushConsumer,以Cluster模式消费的源码是如何实现的,因为这种方式相对来说是最复杂的一种。例子其实也是在之前环境搭建那边文章中例子:
1 | public class Consumer { |
Consumer启动
之前其实在Producer分析里面带过Consumer,所以逻辑也是类似的。
DefaultMQPushConsumer初始化
和Producer一样包装了一个DefaultMQPushConsumerImpl,下面代码:
1 | // org.apache.rocketmq.client.consumer.DefaultMQPushConsumer |
DefaultMQPushConsumer启动
实质就是defaultMQPushConsumerImpl启动,下面代码:
1 | // org.apache.rocketmq.client.consumer.DefaultMQPushConsumer |
这里有两个点说一下:
- %RETRY% +groupname:如果consumer是cluster模式,并且订阅了TopicA的消息,那客户端会自动订阅%RETRY% + +groupname。我们知道consumer消费消息处理失败的话,broker是会延时一定的时间重新推送的,重新推送不是跟其它新消息一起过来,而是通过单独的%RETRY%过来。
- RebalanceService分配策略:Rebalance支持多种分配策略,比如平均分配、一致性Hash等,默认采用平均分配策略(AVG)。
MQClientInstance启动
在讲Producer的时候已经讲过MQClientInstance
的启动过程,因为Producer和Consumer共用一个MQClientInstance
,下面我们再来看一下Consumer相关逻辑:
1 | // org.apache.rocketmq.client.impl.factory.MQClientInstance |
到这里Consumer的相关初始化工作就做完了,下面就会去消费消息了。
消息消费
这块逻辑有点复杂,为了大家不被细节绕晕,这里画了一下时序图,如下:
RebalanceImpl触发Pull消息
还记得defaultMQPushConsumerImpl
启动代码中最后一行执行了this.mQClientFactory.rebalanceImmediately()
,忘记了,可以回头看看,这里会第一次触发Pull消息。我们看看代码:
1 | // org.apache.rocketmq.client.impl.factory.MQClientInstance |
这里有两个点重点看看:
Queue分配策略
我们看这个方法AllocateMessageQueueStrategy#allocate()
,他有下面几种实现:
AllocateMessageQueueAveragely:这是默认的分配方式,一个consumer分到在平均的情况下分到连续的queue,待会我们会看看代码
AllocateMessageQueueAveragelyByCircle: 和上面类似,但是分到的queue不是连续的。比如一共12个Queue,3个consumer,则第一个consumer接收queue1,4,7,9的消息
AllocateMachineRoomNearby:将queue先按照broker划分几个computer room,不同的consumer只消费某几个broker上的消息
AllocateMessageQueueByMachineRoom:根据computer room进行hash分配队列
AllocateMessageQueueByConfig:在用户启动时指定消费哪些Queue的消息
AllocateMessageQueueConsistentHash:使用一致性hash算法来分配Queue,用户需自定义虚拟节点的数量
然后下面我们看看默认的AllocateMessageQueueAveragely
:
1 | public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, |
提交Pull请求
上面我们说过对于新加入的Queue,提交一次PullRequest,那么我们就可以看看org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance
方法:
1 | private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, |
消息拉取服务
我们在MQClientInstance启动的时候,我们看到我们启动了一个消息拉取的定时服务。这里我们也就知道其实PullMessageService
也是一个线程,我们先看run方法:
1 | // org.apache.rocketmq.client.impl.consumer.PullMessageService |
消息消费
上面我们已经说过了当消息拉取完之后会执行PullCallback,具体一点就是会执行到org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.submitConsumeRequest()
,我们看看:
1 | // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService |
到这里,我们也基本上吧消息消费的流程走完了, 现在可以再回头看看流程图,如果能对的上就OK啦!