RocketMQ源码分析——Consumer

引:当消息达到Broker之后,就等着Consumer去consume了呀~

消费方式

  • PullConsumer:消费者主动调用pull方法来获取消息,没有则返回

  • PushConsumer:虽然名为Push,但是是消费者主动循环发送Pull请求到broker,如果没有消息,broker会把请求放入等待队列,新消息到达后返回response

所以本质上,两种方式都是通过消费者主动Pull来实现的。

消费模式

Consumer的消费模式在初始化consumer时设置的,主要有下面两种:

  • Broadcast模式:消息会发送给group内所有consumer
  • Cluster模式:每条消息只会发送给group内的一个consumer,但是Cluster模式的支持消费失败重发,从而保证消息一定被消费

使用方式

这次我们主要看看PushConsumer,以Cluster模式消费的源码是如何实现的,因为这种方式相对来说是最复杂的一种。例子其实也是在之前环境搭建那边文章中例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {
// 指定消费者组的PushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

// 指明Namesrv
consumer.setNamesrvAddr("127.0.0.1:9876");

// 指定消费开始位置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// 订阅Topic
consumer.subscribe("TopicTest", "*");

// 指定回调逻辑
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 启动
consumer.start();

System.out.printf("Consumer Started.%n");
}
}

Consumer启动

之前其实在Producer分析里面带过Consumer,所以逻辑也是类似的。

DefaultMQPushConsumer初始化

和Producer一样包装了一个DefaultMQPushConsumerImpl,下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  // org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
public DefaultMQPushConsumer(final String consumerGroup) {
this(consumerGroup, null, new AllocateMessageQueueAveragely());
}
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
// 默认平均分配
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
// 构造实际类
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
}

DefaultMQPushConsumer启动

实质就是defaultMQPushConsumerImpl启动,下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
 	// org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
public void start() throws MQClientException {
// 实质defaultMQPushConsumerImpl启动
this.defaultMQPushConsumerImpl.start();
// 省略...
}
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
// 基本参数检查
this.checkConfig();
// 将DefaultMQPushConsumer的订阅信息copy到RebalanceService中
// 如果是cluster模式,如果订阅了topic,则自动订阅%RETRY%groupname,也加入到RebalanceService中
this.copySubscription();

if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
// 如果InstanceName参数值为DEFAULT则修改为PID
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 新建一个MQClientInstance,客户端管理类, 单例,一个进程中只有一个
// 所有的i/o类操作由它管理,缓存客户端和topic信息,各种service,很重要
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// PullRequest封装实现类,封装了和broker的通信接口
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
// 消息被客户端过滤时会回调的钩子
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

// consumer客户端消费offset持久化
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 广播消息本地持久化offset
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
// 集群模式broker持久化offset
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 如果是广播模式则从本地文件中load,如果是集群模式不做操作
this.offsetStore.load();
// 消息消费服务,顺序和并发消息逻辑不同,接收消息并调用listener消费,处理消费结果
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
// 启动等待处理消息服务(定时服务)
this.consumeMessageService.start();
// 注册Consumer
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// mqClient启动
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
// 改变状态
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 更新Topic信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
// 发送心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 做一次rebalance
this.mQClientFactory.rebalanceImmediately();
}

这里有两个点说一下:

  • %RETRY% +groupname:如果consumer是cluster模式,并且订阅了TopicA的消息,那客户端会自动订阅%RETRY% + +groupname。我们知道consumer消费消息处理失败的话,broker是会延时一定的时间重新推送的,重新推送不是跟其它新消息一起过来,而是通过单独的%RETRY%过来。
  • RebalanceService分配策略:Rebalance支持多种分配策略,比如平均分配、一致性Hash等,默认采用平均分配策略(AVG)。

MQClientInstance启动

在讲Producer的时候已经讲过MQClientInstance的启动过程,因为Producer和Consumer共用一个MQClientInstance,下面我们再来看一下Consumer相关逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
  // org.apache.rocketmq.client.impl.factory.MQClientInstance
public void start() throws MQClientException {

synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
// 省略不相关...
// 开启各种定时任务
this.startScheduledTask();
// 开启拉消息服务(Consumer)
this.pullMessageService.start();
// 开启负载均衡服务,一个线程定时触发rebalance(20秒一次)
this.rebalanceService.start();
// 初始化一个自用的producer,`CLIENT_INNER_PRODUCER`
// 主要用于在消费失败或者超时后发送重试的消息给broker
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
this.serviceState = ServiceState.RUNNING;
break;
// 省略不相关...
}
}
}
private void startScheduledTask() {
// 省略不相关...
// 持久化消费者的Offset
// 保存消费进度,广播消息存在本地,集群消息上传到所有的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

// 根据负载调整本地处理消息的线程池corePool大小
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}

到这里Consumer的相关初始化工作就做完了,下面就会去消费消息了。

消息消费

这块逻辑有点复杂,为了大家不被细节绕晕,这里画了一下时序图,如下:
RocketMq——Consumer消息消费时序图.jpg

RebalanceImpl触发Pull消息

还记得defaultMQPushConsumerImpl启动代码中最后一行执行了this.mQClientFactory.rebalanceImmediately(),忘记了,可以回头看看,这里会第一次触发Pull消息。我们看看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
  // org.apache.rocketmq.client.impl.factory.MQClientInstance
public void rebalanceImmediately() {
// 之前说过rebalanceService就是一个线程,继承于ServiceThread
this.rebalanceService.wakeup();
}
// org.apache.rocketmq.common.ServiceThread
public void wakeup() {
// 这里主要是唤醒rebalanceService,用来了原子类和闭锁保证线程安全
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
// org.apache.rocketmq.client.impl.consumer.rebalanceService
public void run() {
while (!this.isStopped()) {
// 默认20s
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
}
// org.apache.rocketmq.client.impl.factory.MQClientInstance
public void doRebalance() {
// 每一个Consumer都要做doRebalance操作
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
// org.apache.rocketmq.client.impl.consumer.RebalanceImpl
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
// 从路由信息中获取topic对应所有的Queue
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 从broker获取所有同一个group的所有Consumer ID(192.168.1.28@83721)
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}

if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}

if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 将mq和cid都排好序
Collections.sort(mqAll);
Collections.sort(cidAll);
// 按照初始化是指定的分配策略(默认平均),获取Consumer分配的MQ列表
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}

Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 更新rebalanceImpl中的processQueue用来缓存收到的消息
// 对于新加入的Queue,提交一次PullRequest
// 对于新启动的consumer来说,所有的queue都是新添加的,所以所有queue都会触发PullRequest
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
// 发送一次心跳
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}

这里有两个点重点看看:

Queue分配策略

我们看这个方法AllocateMessageQueueStrategy#allocate(),他有下面几种实现:

  • AllocateMessageQueueAveragely:这是默认的分配方式,一个consumer分到在平均的情况下分到连续的queue,待会我们会看看代码

  • AllocateMessageQueueAveragelyByCircle: 和上面类似,但是分到的queue不是连续的。比如一共12个Queue,3个consumer,则第一个consumer接收queue1,4,7,9的消息

  • AllocateMachineRoomNearby:将queue先按照broker划分几个computer room,不同的consumer只消费某几个broker上的消息

  • AllocateMessageQueueByMachineRoom:根据computer room进行hash分配队列

  • AllocateMessageQueueByConfig:在用户启动时指定消费哪些Queue的消息

  • AllocateMessageQueueConsistentHash:使用一致性hash算法来分配Queue,用户需自定义虚拟节点的数量

然后下面我们看看默认的AllocateMessageQueueAveragely:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
// 省略...
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
// 1. mq数量 <= consumer数量,size = 1
// 2. 否则,size = mq数量 / consumer数量,余数是几则前几个consumer的size+1,这样所有的queue都会有consumer消费
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
// 从第一个consumer开始分配,每个分avgSize个连续的Queue,
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}

提交Pull请求

上面我们说过对于新加入的Queue,提交一次PullRequest,那么我们就可以看看org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
  private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
// 省略
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
// 如果是新加入的Queue
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
// 从offset store中移除过时的数据
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
// 获取起始消费offset
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
// 为新的Queue初始化一个ProcessQueue,用来缓存收到的消息
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
// 对新加的queue初始化一个PullRequest
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 分发pull request到PullMessageService拉取消息
this.dispatchPullRequest(pullRequestList);

return changed;
}
// org.apache.rocketmq.client.impl.consumer.RebalancePushImpl
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
// 执行拉取消息
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
public void executePullRequestImmediately(final PullRequest pullRequest) {
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
// org.apache.rocketmq.client.impl.consumer.PullMessageService
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
// 将pull request 放入到pullRequestQueue中
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}

消息拉取服务

我们在MQClientInstance启动的时候,我们看到我们启动了一个消息拉取的定时服务。这里我们也就知道其实PullMessageService也是一个线程,我们先看run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
  // org.apache.rocketmq.client.impl.consumer.PullMessageService
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 取pull request 进行拉取
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
// 实质
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
public void pullMessage(final PullRequest pullRequest) {
// 拿到缓存消息的队列
final ProcessQueue processQueue = pullRequest.getProcessQueue();
// ...

// 设置拉取时间戳
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
// ...

// 查看消息缓存队列的消息数量以及大小
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 如果堆积未处理的消息数量过多(大于默认1000条),则放回pull request队列,延时执行(默认50ms)
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// ...
return;
}
// 如果堆积未处理的消息的大小过大(大于100MB),同上面的逻辑
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// ...
return;
}
// 无序消息,消息offset跨度过大,同上面的逻辑
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// ...
return;
}
} else {
// ...
}
}
// 检查订阅关系有没有变化,有可能在延时期间,topic或者consumer的配置都发生了变化,需要重新处理
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}

final long beginTimestamp = System.currentTimeMillis();
// Pull Command发送后的回调
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 消息预处理,客户端再次过滤,设置minOffset和maxOffset
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);

switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);

long firstMsgOffset = Long.MAX_VALUE;
// 如果获取到的消息数为0,则立即发起下一次pull
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
// 消息放入ProcessQueue
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 消费消息服务(线程池),调用messageListener处理,处理完成会通知ProcessQueue
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// 再次提交pull request
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}

if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}

break;
case NO_NEW_MSG:
// ...
break;
case OFFSET_ILLEGAL:
// ...
break;
default:
break;
}
}
}
// ...
}
try {
// 调用Netty去发送Pull Command,其实这后面就和生产者发送消息是一样的,不过这里是异步发送消息
// 异步完成之后会执行pullCallback
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}

消息消费

上面我们已经说过了当消息拉取完之后会执行PullCallback,具体一点就是会执行到org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.submitConsumeRequest(),我们看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
  // org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 看是否需要批量消费,默认阈值是1
if (msgs.size() <= consumeBatchSize) {
// 创建一个消费任务
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
// 创建批量消费任务
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}

ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}

this.submitConsumeRequestLater(consumeRequest);
}
}
}
}

// 由于ConsumeRequest是一个任务,所以我们可以看看它的run方法
public void run() {
// ...
// 得到改消息的Listener
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;

ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}

long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
// listener执行回调逻辑
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
// 设置消费结果
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
// ...
if (!processQueue.isDropped()) {
// 处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();

if (consumeRequest.getMsgs().isEmpty())
return;
// 设置消费状态
switch (status) {
// 消费成功
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
// 重新消费
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
// broadcast模式,处理失败,不做处理
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// Cluster模式,将消息发回broker,让broker重新发送
// 一共有两种方式让broker重发(有兴趣自己去看看):
// 1. 先尝试给broker发送send_msg_back的命令,
// 2. 如果失败了,则通过consumer预留的producer给%RETRY%groupname发送消息,等于是自己给自己发一条消息。
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
// 发回broker失败,则再次尝试本地消费
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
// 将消费前缓存的消息清除
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

到这里,我们也基本上吧消息消费的流程走完了, 现在可以再回头看看流程图,如果能对的上就OK啦!

参考

  1. RocketMQ源码解析(四)-Consumer