RocketMQ源码分析——Producer

引:使用MQ,第一步构造Producer,然后就可以开始发送消息啦!

发送方式

producer发送消息支持3种方式,同步、异步和Oneway。

  • 同步发送:客户端提交消息到broker后会等待返回结果。有可靠性保障。
  • 异步发送:调用发送接口时会注册一个callback类,发送线程继续其它业务逻辑,producer在收到broker结果后回调。当消息结果不影响正常业务逻辑的时候使用。
  • Oneway:Producer提交消息后,无论broker是否正常接收消息都不关心。适合于追求高吞吐、能容忍消息丢失的场景,比如日志收集。

使用方式

我们主要讲同步发送,异步发送和Oneway方式和事务消息带过,同步发送例子就是环境搭建中Producer的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 实例化一个Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

// 指明Namesrv
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

for (int i = 0; i < 1000; i++) {
try {

Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}

producer.shutdown();
}
}

上面我们实例化一个DefaultMQProducer,设置group name和nameserv的地址。Producer启动后就可以往指定的topic发送消息啦!

Producer启动

DefaultMQProducer初始化

下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// org.apache.rocketmq.client.producer.DefaultMQProducer
public DefaultMQProducer(final String producerGroup) {
this(producerGroup, null);
}
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
// DefaultMQProducer包装了DefaultMQProducerImpl
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
// 构造了一个同步发送线程池
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}

DefaultMQProducer启动

下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// org.apache.rocketmq.client.producer.DefaultMQProducer
public void start() throws MQClientException {
// 调用实际的实现类
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
// 如果下面的过程中出错了,那么serviceState就为START_FAILED
this.serviceState = ServiceState.START_FAILED;
// 检查group name是否合适
this.checkConfig();
// 更改defaultMQProducer的名称为进程id
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// MQClientManager为单例,创建mQClientFactory
// 一个进程只会存在一个MQClientInstance, 设置clientId (IP@PID)
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 向mQClientFactory注册defaultMQProducer
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 添加默认的topicPublishInfo
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

if (startFactory) {
// 核心
// 启动MQClientInstance
mQClientFactory.start();
}

log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 启动完成
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 向所有broker发送一次心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

我们看到DefaultMQProducer 的start的过程主要就是初始化和启动一个MQClientInstance,将producer注册到instance中。我们来看下MQClientInstance的启动过程。

MQClientInstance启动

下面是启动代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void start() throws MQClientException {

synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
// 如果NameservAddr为空,尝试从http server获取nameserv的地址
// 这里看出适合于有统一配置中心的系统
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
// 初始化Netty客户端
this.mQClientAPIImpl.start();
// Start various schedule tasks
// 开启各种定时任务
this.startScheduledTask();
// Start pull service
// producer和consumer公用一个MQClientInstance的实现
// 开启拉消息服务(Consumer)
this.pullMessageService.start();
// Start rebalance service
// 开启负载均衡服务
this.rebalanceService.start();
// Start push service
// 开启Producer
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
// 更改client状态
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}

我们可以看看它起了哪些定时任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
// 获取nameserv地址
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}

// 从nameserv更新topicRouteInfo
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

// 清除已经下线的broker,并发送心跳
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

// 持久化消费者的Offset(Consumer)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

// 动态调整消费者的线程池(Consumer)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}

到这里,我们的Producer就启动完成了,接下来我们就可以看看它是怎么发送消息啦!

消息发送

Producer默认采用同步方式发送消息,如我们的示例,接下来我们看看这个方法DefaultMQProducer.send( Message msg)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// org.apache.rocketmq.client.producer.DefaultMQProducer
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg);
}
// org.apache.rocketmq.client.producer.defaultMQProducerImpl
// 超时时间3秒
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
// 可以看到默认采用同步方式
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
// 实际发送逻辑
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 检查client状态是否是runing
this.makeSureStateOK();
// 参数校验,消息不能发给系统预留的topic,消息体不能超过最大长度4M,或者是空消息
Validators.checkMessage(msg, this.defaultMQProducer);

final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 根据消息的topic,获取该topic的路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 重试次数,同步模式下默认为3次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
// 记录上次的发送的broker
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 从所有topic可用queue中选择一个queue,有不同的策略
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
// 记录下当前的broker
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
// 发送消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
// 更新本次调用时间到MQFaultStrategy中
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
// 异步和ONEWAY方式调用后就直接返回了
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
// 发送没成功
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
// 如果broker存储失败,判断是否要重试
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}

return sendResult;
default:
break;
}
} catch (RemotingException e) {
// 省略异常
}
}
// 成功则返回结果
if (sendResult != null) {
return sendResult;
}
// 省略不重要逻辑
}

上面完整的发送逻辑信息量还是比较大的,我们具体看看几个重要的部分。

获取topic路由信息

关注DefaultMQProducerImpl#tryToFindTopicPublishInfo()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 从本地缓存获取,之前看到过有定时任务会定时更新这个缓存
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
// 如果不存在,则从Namesrv加锁更新TopicRouteInfo ,使用Netty
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
// 若获取的 Topic发布信息时候可用,则返回
return topicPublishInfo;
} else {
// 当从Namesrv获取不到时,如果允许broker自动创建topic信息则自动创建并更新
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}

Queue选取策略

Queue的选取是采用轮询的方式,如果客户端开启延迟容错,那么在轮询的时候会加入broker可用性的判断。
关注DefaultMQProducerImpl#selectOneMessageQueue()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
// org.apache.rocketmq.client.latency.MQFaultStrategy
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 如果开启了延时容错
if (this.sendLatencyFaultEnable) {
try {
// 首先获取上次使用的Queue index+1,这个index是放在ThreadLocal下
// 实现了轮询的效果,但是在重试的时候是为了选择上一次发送的broker
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
// 找到index对应的queue
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 如果queue对应的broker可用(根据faultItemTable判断),则使用该broker
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
// 第一次发送或者是重试,直接选
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
// 如果没找个合适的broker,则从所有的broker中选择一个相对合适的,并且是可写的broker
// 相对合适是指 可用/延迟低/上次不可用时间早
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}

return tpInfo.selectOneMessageQueue();
}
// 未开启延时容错,直接按顺序选下一个
return tpInfo.selectOneMessageQueue(lastBrokerName);
}

关于queue选择有很多细节可以说,看以后是不是单独拎出来说一下,这里给个延迟的结论:当发送时长低于100ms时,设置broker不可用时长为0,之后依次增加,如果超过15秒,则有10分钟不可用。可以看到如果上次发送失败的话,也是10分钟不可用,如果重试肯定不会选择相同的broker,即不会选择不可用的broker。

消息发送

关注DefaultMQProducerImpl#sendKernelImpl()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
// 根据brokerName从缓存中获取broker的地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
// 如果地址为空,则从namesrv中再获取一次
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
// 重新获取
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

SendMessageContext context = null;
if (brokerAddr != null) {
// 切换到VIP channel
// Broker启动时会开启2个端口接收客户端数据,其中一个端口只接收producer的消息,不接受consumer的拉取请求,被称为VIP channel
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
// 如果不是批量发送则客户端设置的id
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}

int sysFlag = 0;
boolean msgBodyCompressed = false;
// 如果消息body过长,则压缩并设置标记位
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
// 事务消息标记
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
// 省略不重要逻辑
// 设置消息头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
// 要求重新发送的消息,设置重试次数和延时时间
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}

String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}

SendResult sendResult = null;
// 通过NettyClient发送消息到Broker
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
msg.setBody(prevBody);
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
// 实际发送消息的位置
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}

if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}

return sendResult;
} catch (RemotingException e) {
// ... 省略异常处理
} finally {
msg.setBody(prevBody);
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

到这里就结束了!除了队列选择逻辑有点复杂,其他还好,但是还是要好好去体会~

参考

  1. RocketMQ源码解析(三)-Producer
  2. RocketMQ源码解读——消息发送之选择队列并发送