RocketMQ源码分析——事务消息

引:在前面的源码分析中,我们总是能在很多地方看到对于事务消息特别的逻辑,这次我们终于可以讲一下啦!同时对利用消息队列来实现分布式事务感兴趣的同学也是不可错过的!

场景分析

例子:通过购物车进行下单的流程中,用户入口在购物车系统,交易下单入口在交易系统,两个系统之间的数据需要保持最终一致,这时可以通过事务消息进行处理。交易系统下单之后,发送一条交易下单的消息到消息队列 RocketMQ,购物车系统订阅消息队列 RocketMQ 的交易下单消息,做相应的业务处理,更新购物车数据。

实现原理

消息队列 RocketMQ 事务消息交互流程如下所示:

交互流程

  1. 发送方向消息队列 RocketMQ 服务端发送消息。
  2. 服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
  5. 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作。

使用方式

NameServer还是按照之前环境搭建篇启动,但是Broker不行,因为我们运行的test包下的broker,为了单元测试,官方利用SPI

注入了org.apache.rocketmq.broker.util.TransactionalMessageServiceImpl ,这个不是我们想要的TransactionalMessageService,所以我们找到rocketmq/broker/src/test/resources/META-INF/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService 这个文件把里面的东西删了就好了。对于生成者,自己魔改了官方事务消息的例子,因为我感觉不太好理解。对于消费者,我们可以直接使用quickstart包的例子。

生产者

核心在于实现一个TransactionListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});

producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

for (int i = 0; i < 3; i++) {
try {
Message msg =
new Message("TopicTest", "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println("发送: " + new String(msg.getBody()));

Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}

for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}

public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private AtomicInteger checktimes = new AtomicInteger(0);

private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
System.out.println(new String(msg.getBody())+" 回查");
if (null != status) {
switch (status) {
case 0:
System.out.println(new String(msg.getBody())+" 回查次数: " + checktimes.incrementAndGet());
return LocalTransactionState.UNKNOW;
case 1:
// 消费者应该只能收到这一条消息
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 指明Namesrv
consumer.setNamesrvAddr("127.0.0.1:9876");

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {

for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();

System.out.printf("Consumer Started.%n");
}
}

输入输出

生产者

tansaction-message-producer.png

消费者

transaction-message-consumerpng.png

根据我们代码逻辑,我们第一次发送本地事务都是执行失败的,所以所有消息都要进行回查,但是标识为0的消息回查会一直返回失败,所以会一直回查,这里因为时间问题只看到两次,最多默认回查15次,标识为1的消息回查之后会返回Commit,所以我们在消费者端看到了这条消息,标识为2的消息在回查之后返回Rollback,所以消费者端看不到了。

源码分析

我们的分析就是就是按照上面实现原理的交互流程走一遍,现在就来看看吧!

TransactionMQProducer启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
	// org.apache.rocketmq.client.producer.TransactionMQProducer
public void start() throws MQClientException {
this.defaultMQProducerImpl.initTransactionEnv();
// 其他和普通消息没有什么区别
super.start();
}
// 创建回查线程池
public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService();
} else {
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
}
}

TransactionMQProducer发送事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
  // org.apache.rocketmq.client.producer.TransactionMQProducer
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
// 事务监听器必须有,因为要提供Broker回查
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}

return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}

public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
// 检查TransactionListener和回查线程池是否存在
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// 消息校验,校验topic和body长度
Validators.checkMessage(msg, this.defaultMQProducer);

SendResult sendResult = null;
// 设置消息的事务属性,为PREPARED消息以及发送的ProducerGroup
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
// 发送消息,和发送普通消息一样
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
// 发送成功
case SEND_OK: {
try {
// 当前Broker不会返回这个值
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
// 使用客户端生成的唯一id作为事务ID
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
// 默认为空, 官方已标注为 @deprecated
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
// 调用transactionListener执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}

if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
// 消息持久化失败,则事务回滚
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}

try {
// 发送结束事务消息(Commit/Rollback)
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
// 返回事务发送结果
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}

// 发送结束事务消息(Commit/Rollback)
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
// 获取消息在commitLog的offset
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
// 获取接收prepared消息的Broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
// 根据本地执行结果设置提交或回滚
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
// 设置消息在broker上的queueOffset
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
// 使用Oneway方式发送结束事务消息RequestCode.END_TRANSACTION
// Broker处理无论成功还是失败,Producer不会再做处理
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}

Broker处理Prepared消息

Broker处理Prepared消息是和普通消息用的同一个SendMessageProcessor,所以我们在之前的分析中看到很多穿插事务消息的代码,下面我们就看下针对事务消息的特殊处理逻辑(没有看过SendMessageProcessor,可以看看我之前的文章):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  // org.apache.rocketmq.broker.processor.SendMessageProcessor
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
// ...
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
// 根据Broker是否支持事务消息判断是否拒绝事务消息
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
// 存储prepare消息
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
// 根据存储结果设置repsonse状态,更新broker统计信息,成功则回复producer,更新context上下文
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

}

上面我们看到存储prepare消息和存储正常的消息走的不一样的路径,prepare消息调用了TransactionalMessageService#prepareMessage(),我们可以看一下,但是相信最终也只是存入CommitLog中而已:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  // org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.putHalfMessage(messageInner);
}
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
// 进行消息的转换
return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
// 清除sysFlag中的事务消息状态位
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 事务prepare消息放入统一的topic: RMQ_SYS_TRANS_HALF_TOPIC
// 这个topic是系统内置的,consumer不会订阅这个topic的消息
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
// queueId统一设置成0
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}

Broker处理Commit/Rollback消息

Broker处理Commit/Rollback消息不再使用SendMessageProcessor,而是使用了EndTransactionProcessor来处理Commit/Rollback 消息,但是分析思路是不变的,先找到org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.info("Transaction request:{}", requestHeader);
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}
// 省略日志相关...

OperationResult result = new OperationResult();
// 如果收到的是Commit事务消息
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 从commitLog中查出原始的prepared消息
// 这要求了Producer在发送最终的Commit消息的时候一定要指定是同一个Broker
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 检查获取到的消息是否和当前消息匹配
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 使用原始的prepared消息属性,构建最终发给consumer的消息
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
// 调用MessageStore的消息存储接口提交消息,使用真正的topic和queueId
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 设置Prepared消息的标记位为delete
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 如果收到的是Rollback事务消息
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 设置Prepared消息的标记位为delete
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}

消息Commit/Rollback后,理论上需要将原来的Prepared消息删除,这样Broker就能知道哪些消息一直没收到Commit/Rollback,需要去Producer回查状态。但是如果直接修改CommitLog文件,这个代价是很大的,所以RocketMQ是通过生成一个新的delete消息来标记的。这样,Broker在检查的时候只需要看下Prepared消息有没有对应的delete消息就可以了,我们看看这个这个操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
   // org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl
public boolean deletePrepareMessage(MessageExt msgExt) {
if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
return true;
} else {
log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
return false;
}
}
// org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge
public boolean putOpMessage(MessageExt messageExt, String opType) {
//选择和Prepared消息相同的queue
MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
return addRemoveTagInTransactionOp(messageExt, messageQueue);
}
return true;
}
private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
// message的topic为RMQ_SYS_TRANS_OP_HALF_TOPIC
// 消息的tags值是d,body中存储的是prepared消息的queueOffset
Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
// 写入CommitLog
writeOp(message, messageQueue);
return true;
}

Broker回查事务状态

在我们最开始展示的图中,当Broker未收到Commit/Rollback的消息时,会进行事务状态的回查。我们接下就看看这个逻辑,不知道大家还记得我们在Broker启动分析一文中提到了transactionalMessageCheckService这个类的启动,回查逻辑就在这里,这个类也继承ServiceThread,所以它也是一个线程,我们可以直接去看他的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
  public void run() {
log.info("Start transaction check service thread!");
// 默认1分钟,可配置
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
protected void waitForRunning(long interval) {
// 如果被通知了,就不等了
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}

//entry to wait
waitPoint.reset();
// 不然,就等待一段时间
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
// 等待结束
this.onWaitEnd();
}
}
protected void onWaitEnd() {
// 事务检查超时时间 默认6s
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
// 默认最大回查次数 默认15次
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
// 回查
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

// org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl
// 核心preparequeue 和 opqueue
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
// 获取所有prepare消息队列,之前说过prepare消息的Topic为RMQ_SYS_TRANS_HALF_TOPIC
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.info("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
// 拿到对应的opQueue
MessageQueue opQueue = getOpQueue(messageQueue);
// 拿到prepare消息的offset
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
// 拿到对应的op消息的offset
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
halfOffset, opOffset);
continue;
}

List<Long> doneOpOffset = new ArrayList<>();
// 将标记位delete的消息组成map进行优化
HashMap<Long, Long> removeMap = new HashMap<>();
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
messageQueue, halfOffset, opOffset);
continue;
}
// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset;
while (true) {
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
// 事务消息完成
if (removeMap.containsKey(i)) {
log.info("Half offset {} has been committed/rolled back", i);
removeMap.remove(i);
} else {
// 事务消息未完成,拿到prepare消息
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
// ...
// 如果超过最大回查次数或者消息达到最大保留时间,默认3天
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
// 报错提醒
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
//... 各种判断逻辑,比如prepare消息是否失效等,表示是否需要回查
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);
// 需要回查
if (isNeedCheck) {
// 写入消息成功
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
// 调用相关发送逻辑
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
}
newOffset = i + 1;
i++;
}
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("Check error", e);
}

}
// org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
// 发送回查消息
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}

OK~到这里我们我们就介绍啦!!

参考

  1. 阿里云——事务消息

  2. RocketMQ 事务消息的使用与原理分析)