引:在前面的源码分析中,我们总是能在很多地方看到对于事务消息特别的逻辑,这次我们终于可以讲一下啦!同时对利用消息队列来实现分布式事务感兴趣的同学也是不可错过的!
场景分析
例子:通过购物车进行下单的流程中,用户入口在购物车系统,交易下单入口在交易系统,两个系统之间的数据需要保持最终一致,这时可以通过事务消息进行处理。交易系统下单之后,发送一条交易下单的消息到消息队列 RocketMQ,购物车系统订阅消息队列 RocketMQ 的交易下单消息,做相应的业务处理,更新购物车数据。
实现原理
消息队列 RocketMQ 事务消息交互流程如下所示:
- 发送方向消息队列 RocketMQ 服务端发送消息。
- 服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
- 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作。
使用方式
NameServer还是按照之前环境搭建
篇启动,但是Broker不行,因为我们运行的test包下的broker,为了单元测试,官方利用SPI
注入了org.apache.rocketmq.broker.util.TransactionalMessageServiceImpl
,这个不是我们想要的TransactionalMessageService
,所以我们找到rocketmq/broker/src/test/resources/META-INF/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService
这个文件把里面的东西删了就好了。对于生成者,自己魔改了官方事务消息的例子,因为我感觉不太好理解。对于消费者,我们可以直接使用quickstart
包的例子。
生产者
核心在于实现一个TransactionListener
1 | public class TransactionProducer { |
消费者
1 | public class Consumer { |
输入输出
生产者
消费者
根据我们代码逻辑,我们第一次发送本地事务都是执行失败的,所以所有消息都要进行回查,但是标识为0的消息回查会一直返回失败,所以会一直回查,这里因为时间问题只看到两次,最多默认回查15次,标识为1的消息回查之后会返回Commit,所以我们在消费者端看到了这条消息,标识为2的消息在回查之后返回Rollback,所以消费者端看不到了。
源码分析
我们的分析就是就是按照上面实现原理的交互流程走一遍,现在就来看看吧!
TransactionMQProducer启动
1 | // org.apache.rocketmq.client.producer.TransactionMQProducer |
TransactionMQProducer发送事务消息
1 | // org.apache.rocketmq.client.producer.TransactionMQProducer |
Broker处理Prepared消息
Broker处理Prepared消息是和普通消息用的同一个SendMessageProcessor
,所以我们在之前的分析中看到很多穿插事务消息的代码,下面我们就看下针对事务消息的特殊处理逻辑(没有看过SendMessageProcessor
,可以看看我之前的文章):
1 | // org.apache.rocketmq.broker.processor.SendMessageProcessor |
上面我们看到存储prepare消息和存储正常的消息走的不一样的路径,prepare消息调用了TransactionalMessageService#prepareMessage()
,我们可以看一下,但是相信最终也只是存入CommitLog中而已:
1 | // org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl |
Broker处理Commit/Rollback消息
Broker处理Commit/Rollback消息不再使用SendMessageProcessor
,而是使用了EndTransactionProcessor
来处理Commit/Rollback
消息,但是分析思路是不变的,先找到org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest()
方法:
1 | public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws |
消息Commit/Rollback后,理论上需要将原来的Prepared消息删除,这样Broker就能知道哪些消息一直没收到Commit/Rollback,需要去Producer
回查状态。但是如果直接修改CommitLog
文件,这个代价是很大的,所以RocketMQ
是通过生成一个新的delete消息来标记的。这样,Broker
在检查的时候只需要看下Prepared
消息有没有对应的delete
消息就可以了,我们看看这个这个操作:
1 | // org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl |
Broker回查事务状态
在我们最开始展示的图中,当Broker未收到Commit/Rollback的消息时,会进行事务状态的回查。我们接下就看看这个逻辑,不知道大家还记得我们在Broker启动
分析一文中提到了transactionalMessageCheckService
这个类的启动,回查逻辑就在这里,这个类也继承ServiceThread
,所以它也是一个线程,我们可以直接去看他的run方法:
1 | public void run() { |
OK~到这里我们我们就介绍啦!!