引:前面我们说过了Producer和Consumer是如何发送消息的,当我们了解完Broker的消息存储之后,我们就可以看看Broker是如何接收发来的消息啦~
前言
在分析完Producer和Consumer之后,我们知道无论是消费者还是生产者来说,Broker都是接受消息的一方,同时我们知道RocketMQ是通过Netty实现通信的,对应接收消息,我们很容易就会想到一定是一个ChannelHandler在处理,而在RocketMQ的设计中对应是NettyRequestProcessor
,所以我们就可以分析一下接收消息的几种类型啦,有几种消息类型,就有几种Processor,如下图:
我们看到有很多种消息类型,我们这次只分析SendMessage
。
SendMessage
对于SendMessage有下面这几种可能:单条消息、批量消息、RETRY消息。Retry消息即consumer消费失败,要求broker重发的消息。失败的原因有两种,一种是业务端代码处理失败;还有一种是消息在consumer的缓存队列中待的时间超时,consumer会将消息从队列中移除,然后退回给Broker重发。
SendMessageProcessor
我们肯定会找到org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest()
这个方法,因为它实现自NettyRequestProcessor
接口的:
1 | public RemotingCommand processRequest(ChannelHandlerContext ctx, |
Consumer的RETRY消息
对于Consumer的RETRY消息,我们可以找到org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack()
方法:
1 | private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) |
Producer的单条消息
对于Producer的单条消息,我们可以找到org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage()
方法:
1 | private RemotingCommand sendMessage(final ChannelHandlerContext ctx, |
问题
其实我有个问题就是对于Producer的单条信息,为了要判断重试次数?? 问题先放在这,等以后知道了,再来回答!!