RocketMQ源码分析——环境搭建

引:终于要开始了吗,一直想好好弄清楚一款MQ,作为一名Javaer,肯定选择RocketMQ,希望今天是一个好开始!

前置

在看源码前大家都至少应该使用过RocketMQ,并且了解它的相关原理,所以这里丢出两个链接,虽然有点老~

  1. RocketMQ 用户指南
  2. RocketMQ 原理简介

依赖工具

  1. Git
  2. Maven
  3. JDK1.8
  4. IntelliJ IDEA

源码拉取

从官方仓库 https://github.com/apache/rocketmq 通过Git拉取代码到本地,在2019.6.8看到最新的release版本是4.5.1,但是为了和芋艿大佬保持一致,我也选择了4.4.0版本,所以将代码拉取到本地之后,我们切换到4.4.0分支。

1
2
git clone https://github.com/apache/rocketmq.git
git checkout -b release-4.4.0 origin/release-4.4.0

然后我们就可以看是搭建调试环境啦~

环境搭建

启动 RocketMQ Namesrv

打开 org.apache.rocketmq.namesrv.NameServerInstanceTest 单元测试类,参考 #startup() 方法,我们编写 #main(String[] args) 静态方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws Exception {
NamesrvConfig namesrvConfig1 = new NamesrvConfig();
NettyServerConfig nettyServerConfig1 = new NettyServerConfig();
nettyServerConfig1.setListenPort(9876);
NamesrvController nameSrvController1 = new NamesrvController(namesrvConfig1, nettyServerConfig1);
nameSrvController1.initialize();
nameSrvController1.start();

// 不让主方法结束
Thread.sleep(DateUtils.MILLIS_PER_DAY);
}

运行之后,输出如下日志就代表OK啦~

1
2
14:41:41.202 [NettyEventExecutor] INFO  RocketmqRemoting - NettyEventExecutor service started
14:41:41.203 [FileWatchService] INFO RocketmqCommon - FileWatchService service started

启动 RocketMQ Broker

打开 org.apache.rocketmq.broker.BrokerControllerTest 单元测试类,参考 #testBrokerRestart() 方法,我们编写 #main(String[] args) 方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws Exception {
// 设置版本号,很关键,不然topic创建不成功
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
final BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerName("broker-a");
brokerConfig.setNamesrvAddr("127.0.0.1:9876");

BrokerController brokerController = new BrokerController(
brokerConfig,
new NettyServerConfig(),
new NettyClientConfig(),
new MessageStoreConfig());
assertThat(brokerController.initialize());
brokerController.start();
// 不让主方法结束
Thread.sleep(DateUtils.MILLIS_PER_DAY);
}

运行之后,虽然在broker下面什么日志都没有,但是在Namesrv已经看到了broker的注册日志,如下:

1
2
3
4
5
15:19:21.437 [NettyServerCodecThread_4] INFO  RocketmqRemoting - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:58887
15:19:21.437 [NettyServerCodecThread_4] INFO RocketmqRemoting - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:58887]
15:19:21.460 [RemotingExecutorThread_7] DEBUG RocketmqNamesrv - receive request, 103 127.0.0.1:58887 RemotingCommand [code=103, language=JAVA, version=293, opaque=0, flag(B)=0, remark=null, extFields={brokerId=0, bodyCrc32=2135245619, clusterName=DefaultCluster, brokerAddr=192.168.1.28:8888, haServerAddr=192.168.1.28:10912, compressed=false, brokerName=broker-a}, serializeTypeCurrentRPC=JSON]
15:19:21.461 [RemotingExecutorThread_7] INFO RocketmqNamesrv - new topic registered, RMQ_SYS_TRANS_HALF_TOPIC QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
15:19:21.461 [RemotingExecutorThread_7] INFO RocketmqNamesrv - new broker registered, 192.168.1.28:8888 HAServer: 192.168.1.28:10912

启动 RocketMQ Producer

打开 org.apache.rocketmq.example.quickstart.Producer 示例类,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {

/*
* Instantiate with a producer group name.
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/

/*
* Launch the instance.
*/

// 指明Namesrv
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

for (int i = 0; i < 1000; i++) {
try {

/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);

/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);

System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}

/*
* Shut down once the producer instance is not longer in use.
*/
producer.shutdown();
}
}

根据注释,我们需要指明Namesrv地址:producer.setNamesrvAddr("127.0.0.1:9876");

运行之后就会发送1000条消息啦,然后就会断开与Namesrv,broker的连接,日志如下:

1
2
3
4
SendResult [sendStatus=SEND_OK, msgId=C0A8011C69E618B4AAC22757459803E7, offsetMsgId=C0A8011C000022B80000000000057CB0, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=499]
15:20:30.889 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.1.28:8888] result: true
15:20:30.890 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.1.28:8886] result: true
15:20:30.891 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true

启动 RocketMQ Producer

打开 org.apache.rocketmq.example.quickstart.Consumer 示例类,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {

/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
// 指明Namesrv
consumer.setNamesrvAddr("127.0.0.1:9876");
/*
* Specify where to start in case the specified consumer group is a brand new one.
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

/*
* Subscribe one more more topics to consume.
*/
consumer.subscribe("TopicTest", "*");

/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

/*
* Launch the consumer instance.
*/
consumer.start();

System.out.printf("Consumer Started.%n");
}
}

根据注释,我们这里也需要指明Namesrv地址:consumer.setNamesrvAddr("127.0.0.1:9876");

疯狂消费的日志如下:

1
2
3
Consumer Started.
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=1, storeSize=178, queueOffset=250, sysFlag=0, bornTimestamp=1559978428786, bornHost=/192.168.1.28:58898, storeTimestamp=1559978428830, storeHost=/192.168.1.28:8888, msgId=C0A8011C000022B8000000000002BEB2, commitLogOffset=179890, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=500, CONSUME_START_TIME=1559978752764, UNIQ_KEY=C0A8011C69E618B4AAC227573D700000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
...

It`s over !!

总结

很喜欢下面的话:

源码,是原理的具象化
原理,是代码的抽象化