RocketMQ源码分析——Namesrv

引:NameSrv是RocketMQ的注册中心,保存所有Broker、Topic的元数据。Broker启动后会向namesrv发送心跳,namesrv也会定时检测broker的可用性,并移除不可用的broker。对于生产者和消费者来说,它提供了Broker的查询服务。

NameSrv启动

我们按照按照上一篇环境搭建的#org.apache.rocketmq.namesrv.NameServerInstanceTest出发,我们看到Main函数,如下:

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws Exception {
NamesrvConfig namesrvConfig1 = new NamesrvConfig();
NettyServerConfig nettyServerConfig1 = new NettyServerConfig();
nettyServerConfig1.setListenPort(9876);
NamesrvController nameSrvController1 = new NamesrvController(namesrvConfig1, nettyServerConfig1);
nameSrvController1.initialize();
nameSrvController1.start();

// 不让主方法结束
Thread.sleep(DateUtils.MILLIS_PER_DAY);
}

我们发现主要有三个步骤:

  1. 构造NamesrvController
  2. 初始化NamesrvController
  3. 启动NamesrvController

可见NamesrvController是关键,负责初始化和后台任务启动。

构造NamesrvController

org.apache.rocketmq.namesrv.NamesrvController的构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
// namesrv参数配置
this.namesrvConfig = namesrvConfig;
// netty的参数配置
this.nettyServerConfig = nettyServerConfig;
// KVConfigManager绑定NamesrvController
this.kvConfigManager = new KVConfigManager(this);
// 初始化RouteInfoManager,很重要
this.routeInfoManager = new RouteInfoManager();
// 监听客户端连接(Channel)的变化,通知RouteInfoManager检查broker是否有变化
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
// namesrv的配置参数会保存到磁盘文件中
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

其中主要的就是org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager 它负责缓存整个集群的broker信息,以及topic和queue的配置信息。我们看看的内部构造:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 读写锁,控制并发读写
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// Topic和Broker的Queue的Map,保存了topic在每个broker上的读写Queue的个数
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// 注册到namesrv上的所有Broker,按照brokername分组,Broker使用brokerName来标识主从关系
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// broker的集群对应关系,使用clusterName来判断多个broker是不是属于同一个集群。
// 对于同一个cluster下的broker,producer在发送消息时只会选择发送给其中一个
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// broker最新的心跳时间和配置版本号,nameserv会记录brokerAddr的最后活跃时间,
// 如果超过一定时间没有心跳或其他数据交互,会认为broker已下线
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// broker和FilterServer的对应关系
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

RouteInfoManager的所有数据通过HashMap缓存在内存中,通过读写锁来控制并发更新。这样可最大程度的提高客户端查询数据的速度。

初始化NamesrvController

我们看看它的初始化方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public boolean initialize() {
// 初始化KVConfigManager
this.kvConfigManager.load();
// 初始化netty server
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 初始化客户端请求处理的线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册DefaultRequestProcessor,所有的客户端请求都会转给这个Processor来处理
// 它的逻辑到时候会出现在NettyServerHandler里
this.registerProcessor();
// 启动定时调度,每10秒钟扫描所有Broker,检查存活状态
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 日志打印的调度器,定时打印kvConfigManager的内容
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// 监听ssl证书文件变化
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
// 省略无用逻辑
}

return true;
}

我们可以稍微看看它是怎样踢出无效链接的,代码在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager中:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
// 默认120秒
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}

启动NamesrvController

启动的过程就是启动netty server开始接收客户端请求,代码如下:

1
2
3
4
5
6
7
8
public void start() throws Exception {
// 开启Netty Server
this.remotingServer.start();
// 监听ssl文件变化,可以实时更新证书
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}

到这里,Namesrv就启动完成了,下面我们将讲一下它最重要的两个功能Broker注册(管理)和Broker查询。

DefaultRequestProcessor请求处理

在讲Broker注册(管理)和Broker查询之前我们要将一下DefaultRequestProcessor,因为所有请求都会被它处理,在上面初始化的过程中,我们也看到了它被注册到Netty的Pipeline上。
下面我就看看org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor #processRequest方法是如何处理请求的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {

// 省略无用逻辑
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
// Broker注册
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
// Broker注销
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
// 根据Topic拿到Broker路由信息
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
// 拿到Broker集群信息
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}

处理器根据RemotingCommand中的请求码将执行具体的请求逻辑,下面我们重点分析一下Broker注册和查询。

Broker注册

根据上面DefaultRequestProcessor的处理逻辑,我们找到#registerBroker方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public RemotingCommand registerBroker(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

// ... 省略校验
// topic相关配置
TopicConfigSerializeWrapper topicConfigWrapper;
if (request.getBody() != null) {
topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
} else {
topicConfigWrapper = new TopicConfigSerializeWrapper();
topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
topicConfigWrapper.getDataVersion().setTimestamp(0);
}
// 根据broker上报的信息更新namesrv的RouteInfo
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
// 返回master和haserver地址
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
// 将topic的KV配置信息通过response返回
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}

下面我们继续看看如何更新namesrv的RouteInfo,找到org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
// 更新加锁
this.lock.writeLock().lockInterruptibly();
// 更新cluster和broker对应关系
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);

boolean registerFirst = false;
// 更新brokername和brokerdata的分组
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
// 如果是master broker,第一次注册或者是topic信息发生变化了,更新topicQueueTable
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 更新broker的心跳时间
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
// 更新filter server table
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
// 如果是slave broker注册,如果master存在,则返回master broker信息
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}

return result;
}

Broker查询

根据上面DefaultRequestProcessor的处理逻辑,我们找到#getRouteInfoByTopic方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
// 从RouteInfoManager中获取topic的Broker信息
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
// 如果支持顺序消息,则填充KVConfig信息
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}

byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}

response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}

下面我们继续看看如何查找namesrv的RouteInfo,找到org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
topicRouteData.setBrokerDatas(brokerDataList);

HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
topicRouteData.setFilterServerTable(filterServerMap);

try {
try {
// 获取读锁
this.lock.readLock().lockInterruptibly();
// 获取所有支持该topic的broker的queue配置
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
// 获取BrokerName Set
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}
// 根据brokerName获取broker主从地址以及过滤器信息
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}

if (foundBrokerData && foundQueueData) {
return topicRouteData;
}

return null;
}

总结

到此,我们就介绍完啦!其实简单来看,Namesrv其实一个用Netty写的一个NettyRemotingServer,在Namesrv定义了一个默认处理器DefaultRequestProcessor,在这个处理器中会根绝具体的请求码去做一些更新RouteInfoManager的操作,在RouteInfoManager中利用HashMap保存了各种关系映射,就这么简单,那么到这里结束啦!

参考

  1. RocketMQ源码解析(二)-nameserv