RocketMQ源码分析——顺序消息

引:在前面的源码分析中,我们总是能在很多地方看到对于顺序消息特别的逻辑,这次我们终于可以讲一下啦!

场景分析

全局顺序

对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。

全局顺序

适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景。

例子:数据库 binlog 同步

局部顺序

对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。

局部顺序

适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

例子:假设有个下单场景,每个阶段需要发邮件通知用户订单状态变化。用户付款完成时系统给用户发送订单已付款邮件,订单已发货时给用户发送订单已发货邮件,订单完成时给用户发送订单已完成邮件。假设订单A的消息为A1,A2,A3,发送顺序也如此。订单B的消息为B1,B2,B3,A订单消息先发送,B订单消息后发送。我们不要求消费顺序一定A1,A2,A3,B1,B2,B3这样的全局顺序消息,因为严重降低了系统的并发度。

使用方式

NameServer和Broker还是按照之前环境搭建篇启动,但是自己魔改了官方顺序消息的例子,因为我感觉不太好理解。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

for (int i = 0; i < 20; i++) {
int orderId = i % 2;
Message msg =
new Message("TopicTestjjj",
("Hello RocketMQ " + i + " in order " + orderId).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 同一个订单号,使用同一个队列(分区)
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);

System.out.printf("%s%n", sendResult);
}

producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Consumer {

public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTestjjj", "*");

consumer.registerMessageListener(new MessageListenerOrderly() {

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();
System.out.printf("Consumer Started.%n");
}

}

输出

顺序消息.png

从上面我们看到,虽然订单1和订单2之间是无序的,但是对于单个订单,他的消息是有序的。

实现原理

我们这里只分析局部顺序,全局顺序也是一样的。

要保证消息的顺序消费,有三个关键点

  1. 消息顺序发送
  2. 消息顺序存储
  3. 消息顺序消费

消息顺序发送

多线程发送的消息无法保证有序性,因此,需要业务方在发送时,针对同一个业务编号(如同一笔订单)的消息需要保证在一个线程内顺序发送,在上一个消息发送成功后,再进行下一个消息的发送。对于同一个业务编号,生产者消息发送方法必须使用同步发送,异步发送无法保证顺序性。

消息顺序存储

mq的topic下会存在多个queue,要保证消息的顺序存储,同一个业务编号的消息需要被发送到一个queue中。所以需要使用MessageQueueSelector来选择要发送的queue,即对业务编号进行hash,然后根据队列数量对hash值取余,将消息发送到一个queue中,这就是实现局部顺序的关键,如果是全局顺序,大家应该都能想到就是要将所有的消息只发送到一个队列即可。

消息顺序消费

要保证消息顺序消费,同一个queue就只能被一个消费者所消费,因此对broker中消费队列加锁是无法避免的。同一时刻,一个消费队列只能被一个消费者消费,消费者内部,也只能有一个消费线程来消费该队列。即同一时刻,一个消费队列只能被一个消费者中的一个线程消费

源码分析

生产者

上面我们提到了保证消息的顺序发送和消息顺序存储都是在生产者端控制的,我们在使用方式中也有一定的体现,核心就是MessageQueueSelector

关于生产者发送消息的解析,可以看我之前的文章,这里我们直接跳到和MessageQueueSelector相关的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
 // org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl  
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
// 选择队列,使用Hash选择
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
}

long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
if (mq != null) {
// 发送消息
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
}

throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
// org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash
public class SelectMessageQueueByHash implements MessageQueueSelector {

@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}

value = value % mqs.size();
return mqs.get(value);
}
}

消费者

消费者的目标就是保证同一时刻,一个消费队列只能被一个消费者中的一个线程消费

我们在使用方式中看到他有自己的顺序消费监听器MessageListenerOrderly,如果忘了消息者是如何消费消息的,也可以回头看看我之前的文章,这里也直入主题。

获取队列

Consumer启动后会初始化一个RebalanceImpl做rebalance操作,从而得到当前这个consumer负责处理哪些queue的消息。

对于顺序消息的消费者,还要求在获取队列的时候去给消息队列加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
  // org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
// ...
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
// 如果是新加入的Queue
if (!this.processQueueTable.containsKey(mq)) {
// 如果是顺序消费者,则去给消息队列加锁 **关键**
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
// 从offset store中移除过时的数据
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
// 获取起始消费offset
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
// 为新的Queue初始化一个ProcessQueue,用来缓存收到的消息
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
// 对新加的queue初始化一个PullRequest
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 分发pull request到PullMessageService拉取消息
this.dispatchPullRequest(pullRequestList);

return changed;
}

//org.apache.rocketmq.client.impl.consumer.RebalanceImpl#lock
public boolean lock(final MessageQueue mq) {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.getMqSet().add(mq);

try {
// 请求Broker获得指定消息队列的分布式锁
Set<MessageQueue> lockedMq =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
// 设置消息处理队列锁定成功
for (MessageQueue mmqq : lockedMq) {
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}

boolean lockOK = lockedMq.contains(mq);
log.info("the message queue lock {}, {} {}",
lockOK ? "OK" : "Failed",
this.consumerGroup,
mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
}
}

return false;
}

我们在之前分析Consumer的时候,我们说过在启动DefaultMQPushConsumer时会启动一个ConsumeMessageService,对应顺序消息它会启动对应的ConsumeMessageOrderlyService,看看它启动干了什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void start() {
// 在集群模式下
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 定时给MQ加锁
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}

public synchronized void lockMQPeriodically() {
if (!this.stopped) {
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
}
}

消息消费

RebalanceImpl在从Broker获取到消息后,会调用ConsumeMessageOrderlyServicesubmitConsumeRequest()方法:

1
2
3
4
5
6
7
8
9
10
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}

ComsumeRequest其实就是一个,我们就是把它丢进线程池进行处理,我们具体看看这个任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;

public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}

public ProcessQueue getProcessQueue() {
return processQueue;
}

public MessageQueue getMessageQueue() {
return messageQueue;
}

@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
// 获取消息MessageQueue锁对象,加互斥锁,保证同一个MessageQueue同时只会有一个线程在处理消息
// 就是将本地的MessageQueue对象与远程的分布式关联起来
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
// Cluster模式下同时判断processQueue也是被锁定的
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
// 如果没有获得锁,放入定时任务中待会重新加锁消费
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 如果锁过期,放入定时任务中待会重新加锁消费
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
// 当前周期消费时间超过连续时长
// 默认:60s,提交延迟消费请求。默认情况下,每消费1分钟休息10ms
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
// 从ProcessQueue获取一批消息
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

ConsumeOrderlyStatus status = null;

ConsumeMessageContext consumeMessageContext = null;
// ...
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
// 获得消费锁,粒度更小,防止在消费时,有其他线程介入
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
// 调用用户自定义Listener处理消息
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
// 消费锁解锁
this.processQueue.getLockConsume().unlock();
}

// ...
}

ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
// 处理消费结果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}

ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}

处理消费结果

跟着上面走,ConsumeMessageOrderlyService是如何处理消费结果的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
) {
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) {
switch (status) {
// ROLLBACK 、COMMIT 暂时只使用在 MySQL binlog 场景,官方将这两状态标记为 @Deprecated
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
consumeRequest.getMessageQueue());
case SUCCESS:
// 如果成功,则调用ProcessQueue的commit方法
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
// 检查重试次数,如果没超过则放到ProcessQueue中;如果超过则直接发到broker的Dead Queue中
// ProcessQueue维护了一个msgTreeMap,保证了顺序
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
default:
break;
}
} else {
// ...
}

if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}

return continueConsume;
}

到这里就结束啦~

参考

  1. 顺序消息

  2. RocketMQ-顺序消息Demo及实现原理分析

  3. RocketMQ 源码分析 —— Message 顺序发送与消费