RocketMQ源码分析——Broker启动

引:前面讲了NameServer,Producer,Consumer,现在终于轮到Broker了,对于一个消息队列来说,Broker是当仁不让的核心~

五大作用

  • 存储消息
  • 接收生产者提交的消息
  • 回复consumer的消息拉取请求
  • 主从节点间同步数据保证高可用
  • 提供简单的api来查询磁盘上的临时数据

之后我们也将从上面的作用来进行源码的探索!

使用方式

真实使用当然是用命令行去部署,这里为了看源码,我们把环境搭建那篇文章的例子搬过来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws Exception {
// 设置版本号,很关键,不然topic创建不成功
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
final BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerName("broker-a");
brokerConfig.setNamesrvAddr("127.0.0.1:9876");

BrokerController brokerController = new BrokerController(
brokerConfig,
new NettyServerConfig(),
new NettyClientConfig(),
new MessageStoreConfig());
assertThat(brokerController.initialize());
brokerController.start();
// 不让主方法结束
Thread.sleep(DateUtils.MILLIS_PER_DAY);
}

Broker启动

从上面的例子可以看出Broker启动实质就是org.apache.rocketmq.broker.BrokerController完成初始化和启动的过程。

BrokerController初始化

我们看到org.apache.rocketmq.broker.BrokerController#initialize()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
public boolean initialize() throws CloneNotSupportedException {
// 1. 从持久化文件中加载数据到内存中
boolean result = this.topicConfigManager.load();

result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();

if (result) {
try {
// 2. 消息存取的核心接口初始化
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
// messageStore的指标统计类,提供最近一天的消息吞吐量的统计数据
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
// 添加消息分发器,分发到布隆过滤器
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
// messageStore加载内存映射文件,commit log文件,consumer queue文件,index文件
result = result && this.messageStore.load();

if (result) {
// 3. 初始化Netty Server
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
// 初始化VIP通道 Netty Server
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
// 初始化一系列客户端指令执行的线程池,Netty处理的优化
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));

this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));

this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));

this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_"));

this.clientManageExecutor = new ThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));

this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true));

this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));

this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));
// 将这些线程池添加到Netty Pipeline中
this.registerProcessor();
// 4. 打印broker的消息吞吐信息到日志文件定时任务,每天0点记录一次
final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
// 记录consumerOffet到文件定时任务,默认5秒一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 记录consumer filter到文件中定时任务,10秒一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
// 定时检查consumer的消费记录,如果延时太大,则disable consumer,不再往这个consumer投递消息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);
// 打印当前Queue size日志
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);
// 打印dispatch落后情况
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
// 定时更新nameserv address信息
if (this.brokerConfig.getNamesrvAddr() != null) {
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 如果是broker是slave,启动定时任务,每分钟从master同步配置和offset
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {
//如果broker是master,定时打印slave延时情况
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}

// 省略...

// 初始化事务消息service
initialTransaction();
initialAcl();
initialRpcHooks();
}
return result;
}

代码逻辑还是非常简单的,一看就知道他在干什么~

BrokerController启动

当BrokerController初始化完成之后,我们看org.apache.rocketmq.broker.BrokerController#start()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public void start() throws Exception {
// 启动消息存储服务
if (this.messageStore != null) {
this.messageStore.start();
}
// 启动Netty Server接收请求
if (this.remotingServer != null) {
this.remotingServer.start();
}
// 启动VIP Channel Netty Server
if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
// 启动TLS签名文件检测服务
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
// 启动Broker的Netty Client
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
// 启动PushConsumer的请求 Hold 服务 (后面会展开)
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}
// 监控客户端连接,定时检查Producer,Consumer和Filter是否长时间未收到心跳
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}
// 启动Filter Server
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
// 注册Broker到Namesrv
this.registerBrokerAll(true, false, true);

// 定时向Namesrv发心跳,如果有变化则同步Broker信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
// 启动BrokerFastFailure服务,定时清理长时间未执行的客户端请求
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
// 如果是Master,开启事务消息检查
if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) {
if (this.transactionalMessageCheckService != null) {
log.info("Start transaction service!");
this.transactionalMessageCheckService.start();
}
}
}

从整个过程来看还是比较简单的,水文一篇,但是后面会继续分析Broker的各个作用的实现。