引:在前面的源码分析中,我们总是能在很多地方看到对于顺序消息特别的逻辑,这次我们终于可以讲一下啦!
场景分析
全局顺序
对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景。
例子:数据库 binlog
同步
局部顺序
对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
例子:假设有个下单场景,每个阶段需要发邮件通知用户订单状态变化。用户付款完成时系统给用户发送订单已付款邮件,订单已发货时给用户发送订单已发货邮件,订单完成时给用户发送订单已完成邮件。假设订单A的消息为A1,A2,A3,发送顺序也如此。订单B的消息为B1,B2,B3,A订单消息先发送,B订单消息后发送。我们不要求消费顺序一定A1,A2,A3,B1,B2,B3这样的全局顺序消息,因为严重降低了系统的并发度。
使用方式
NameServer和Broker还是按照之前环境搭建
篇启动,但是自己魔改了官方顺序消息的例子,因为我感觉不太好理解。
生产者
1 | public class Producer { |
消费者
1 | public class Consumer { |
输出
从上面我们看到,虽然订单1和订单2之间是无序的,但是对于单个订单,他的消息是有序的。
实现原理
我们这里只分析局部顺序,全局顺序也是一样的。
要保证消息的顺序消费,有三个关键点
- 消息顺序发送
- 消息顺序存储
- 消息顺序消费
消息顺序发送
多线程发送的消息无法保证有序性,因此,需要业务方在发送时,针对同一个业务编号(如同一笔订单)的消息需要保证在一个线程内顺序发送,在上一个消息发送成功后,再进行下一个消息的发送。对于同一个业务编号,生产者消息发送方法必须使用同步发送,异步发送无法保证顺序性。
消息顺序存储
mq的topic下会存在多个queue,要保证消息的顺序存储,同一个业务编号的消息需要被发送到一个queue中。所以需要使用MessageQueueSelector来选择要发送的queue,即对业务编号进行hash,然后根据队列数量对hash值取余,将消息发送到一个queue中,这就是实现局部顺序的关键,如果是全局顺序,大家应该都能想到就是要将所有的消息只发送到一个队列即可。
消息顺序消费
要保证消息顺序消费,同一个queue就只能被一个消费者所消费,因此对broker中消费队列加锁是无法避免的。同一时刻,一个消费队列只能被一个消费者消费,消费者内部,也只能有一个消费线程来消费该队列。即同一时刻,一个消费队列只能被一个消费者中的一个线程消费。
源码分析
生产者
上面我们提到了保证消息的顺序发送和消息顺序存储都是在生产者端控制的,我们在使用方式中也有一定的体现,核心就是MessageQueueSelector
。
关于生产者发送消息的解析,可以看我之前的文章,这里我们直接跳到和MessageQueueSelector
相关的逻辑:
1 | // org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl |
消费者
消费者的目标就是保证同一时刻,一个消费队列只能被一个消费者中的一个线程消费。
我们在使用方式中看到他有自己的顺序消费监听器MessageListenerOrderly
,如果忘了消息者是如何消费消息的,也可以回头看看我之前的文章,这里也直入主题。
获取队列
Consumer启动后会初始化一个RebalanceImpl
做rebalance操作,从而得到当前这个consumer负责处理哪些queue的消息。
对于顺序消息的消费者,还要求在获取队列的时候去给消息队列加锁。
1 | // org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance |
我们在之前分析Consumer
的时候,我们说过在启动DefaultMQPushConsumer
时会启动一个ConsumeMessageService
,对应顺序消息它会启动对应的ConsumeMessageOrderlyService
,看看它启动干了什么?
1 | public void start() { |
消息消费
RebalanceImpl
在从Broker获取到消息后,会调用ConsumeMessageOrderlyService
的submitConsumeRequest()
方法:
1 | public void submitConsumeRequest( |
ComsumeRequest
其实就是一个,我们就是把它丢进线程池进行处理,我们具体看看这个任务:
1 | class ConsumeRequest implements Runnable { |
处理消费结果
跟着上面走,ConsumeMessageOrderlyService
是如何处理消费结果的:
1 | public boolean processConsumeResult( |
到这里就结束啦~