跳到主要内容

04、RocketMQ 源码解析 - NameServer

版本声明

1、 基于rocketmq-all-4.3.1版本;
2、 如有发现分析不正确的地方欢迎指正,谢谢!;

NameServer介绍

1、 NameServer本身的高可用是通过部署多台NameServer服务NameServer互相独立,彼此之间不会通信(即多台NameServer的数据并不是强一致的),任意一台宕机并不会影响其他的NameServer

2、 作用;

  • 维护活跃的Broker地址列表,包括Master和Slave
  • 维护所有TopicTopic对应队列的地址列表
  • 维护所有BrokerFilter列表

3、 BrokerNameServer关系;

  • 单个Broker与所有NameServer保持长连接
  • Broker每隔30秒向所有NameServer发送心跳,心跳包含了自身的topic信息
  • NameServer每隔10秒钟扫描所有存活的Broker连接,若2min内没有发送心跳信息,则断开连接
  • Broker在启动后向所有NameServer注册,Producer在发送消息之前先从NameServer获取Broker服务器的地址列表,然后根据负载均衡算法从列表中选择一台Broker进行消息发送

4、 稳定性;

  • nameserver互相独立,无状态
  • nameserver不会有频繁的读写,稳定性相对高

NameServer源码分析

KVConfigManager

1、 内存级的KV存储,提供增删改查以及持久化数据的能力本质就是一个HashMap;

   
public class KVConfigManager {
 
   
 
    private final NamesrvController namesrvController;

    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    // 
    private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
        new HashMap<String, HashMap<String, String>>();

    public KVConfigManager(NamesrvController namesrvController) {
 
   
        this.namesrvController = namesrvController;
    }

    public void load() {
 
   
        String content = null;
        try {
 
   
            content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
        } catch (IOException e) {
 
   
            log.warn("Load KV config table exception", e);
        }
        if (content != null) {
 
   
            KVConfigSerializeWrapper kvConfigSerializeWrapper =
                KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
            if (null != kvConfigSerializeWrapper) {
 
   
                this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
                log.info("load KV config table OK");
            }
        }
    }  
}  

RouteInfoManager

1、 路由信息即Broker向NameServer注册后保存的信息,RouteInfoManager保存所有的TopicBroker信息;

public class RouteInfoManager {
 
   
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    //topic列表对应的队列信息
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    //Broker地址信息
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    //broker集群信息
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    //Broker当前存活的Broker(非实时)
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    //Broker过滤信息
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    public RouteInfoManager() {
 
   
        this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
        this.brokerAddrTable = new HashMap<String, BrokerData>(128);
        this.clusterAddrTable = new HashMap<String, Set<String>>(32);
        this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
        this.filterServerTable = new HashMap<String, List<String>>(256);
    }
...省略... 
} 

 

2、 成员变量解析;

  • topicQueueTable:Topic消息队列路由信息,包括topic所在的broker名称,读队列数量,写队列数量,同步标记等信息,rocketmq根据topicQueueTable的信息进行负载均衡消息发送。
  • brokerAddrTable:Broker节点信息,包括brokerName,所在集群名称,还有主备节点信息。
  • clusterAddrTable:Broker集群信息,存储了集群中所有的BrokerName。
  • brokerLiveTable:Broker状态信息,Nameserver每次收到Broker的心跳包就会更新该信息。 3、 通过远程调试查看具体内容(双主双从,两个nameserver);

ip地址列表

  • rocketmq-slave2 172.19.0.7
  • rocketmq-slave1 172.19.0.6
  • rocketmq-master2 172.19.0.5
  • rocketmq-master1 172.19.0.4
  • rocketmq-nameserver2 172.19.0.3
  • rocketmq-nameserver1 172.19.0.2

topicQueueTable内容如下

 

topicQueueTable信息

{
       
         
   "RMQ_SYS_TRANS_HALF_TOPIC": [
       {
       
         
           "brokerName": "rocketmq-master1",
           "perm": 6,
           "readQueueNums": 1,
           "topicSynFlag": 0,
           "writeQueueNums": 1
       },
       {
       
         
           "brokerName": "rocketmq-master2",
           "perm": 6,
           "readQueueNums": 1,
           "topicSynFlag": 0,
           "writeQueueNums": 1
       }
   ],
   "rocketmq-master1": [
       {
       
         
           "brokerName": "rocketmq-master1",
           "perm": 7,
           "readQueueNums": 1,
           "topicSynFlag": 0,
           "writeQueueNums": 1
       }
   ],
   "rocketmq-master2": [
       {
       
         
           "brokerName": "rocketmq-master2",
           "perm": 7,
           "readQueueNums": 1,
           "topicSynFlag": 0,
           "writeQueueNums": 1
       }
   ],
   "SELF_TEST_TOPIC": [
       {
       
         
           "brokerName": "rocketmq-master1",
           "perm": 6,
           "readQueueNums": 1,
           "topicSynFlag": 0,
           "writeQueueNums": 1
       },
       {
       
         
           "brokerName": "rocketmq-master2",
           "perm": 6,
           "readQueueNums": 1,
           "topicSynFlag": 0,
           "writeQueueNums": 1
       }
   ],
   "TBW102": [
       {
       
         
           "brokerName": "rocketmq-master1",
           "perm": 7,
           "readQueueNums": 4,
           "topicSynFlag": 0,
           "writeQueueNums": 4
       },
       {
       
         
           "brokerName": "rocketmq-master2",
           "perm": 7,
           "readQueueNums": 4,
           "topicSynFlag": 0,
           "writeQueueNums": 4
       }
   ],
   "testTopic": [
       {
       
         
           "brokerName": "rocketmq-master1",
           "perm": 6,
           "readQueueNums": 16,
           "topicSynFlag": 0,
           "writeQueueNums": 16
       },
       {
       
         
           "brokerName": "rocketmq-master2",
           "perm": 6,
           "readQueueNums": 16,
           "topicSynFlag": 0,
           "writeQueueNums": 16
       }
   ],
   "BenchmarkTest": [
       {
       
         
           "brokerName": "rocketmq-master1",
           "perm": 6,
           "readQueueNums": 1024,
           "topicSynFlag": 0,
           "writeQueueNums": 1024
       },
       {
       
         
           "brokerName": "rocketmq-master2",
           "perm": 6,
           "readQueueNums": 1024,
           "topicSynFlag": 0,
           "writeQueueNums": 1024
       }
   ],
   "DefaultCluster": [
       {
       
         
           "brokerName": "rocketmq-master1",
           "perm": 7,
           "readQueueNums": 16,
           "topicSynFlag": 0,
           "writeQueueNums": 16
       },
       {
       
         
           "brokerName": "rocketmq-master2",
           "perm": 7,
           "readQueueNums": 16,
           "topicSynFlag": 0,
           "writeQueueNums": 16
       }
   ],
   "OFFSET_MOVED_EVENT": [
       {
       
         
           "brokerName": "rocketmq-master1",
           "perm": 6,
           "readQueueNums": 1,
           "topicSynFlag": 0,
           "writeQueueNums": 1
       },
       {
       
         
           "brokerName": "rocketmq-master2",
           "perm": 6,
           "readQueueNums": 1,
           "topicSynFlag": 0,
           "writeQueueNums": 1
       }
   ]
}

BrokerAddrTable内容如下

 

brokerAddrTable信息

{
       
         
    "rocketmq-master1": {
       
         
        "brokerAddrs": {
       
         
            0: "172.19.0.4:10911",
            1: "172.19.0.6:10921"
        },
        "brokerName": "rocketmq-master1",
        "cluster": "DefaultCluster"
    },
    "rocketmq-master2": {
       
         
        "brokerAddrs": {
       
         
            0: "172.19.0.5:10912",
            1: "172.19.0.7:10922"
        },
        "brokerName": "rocketmq-master2",
        "cluster": "DefaultCluster"
    }
}

clusterAddrTable内容如下

 

clusterAddrTable 信息

{
       
         
    "DefaultCluster": [
        "rocketmq-master1",
        "rocketmq-master2"
    ]
}

  • brokerLiveTable内容如下

  •  

brokerLiveTable信息
{
       
         
    "172.19.0.7:10922": {
       
         
        "channel": {
       
         
            "active": true,
            "inputShutdown": false,
            "open": true,
            "outputShutdown": false,
            "registered": true,
            "writable": true
        },
        "dataVersion": {
       
         
            "counter": 3,
            "timestamp": 1562476312530
        },
        "haServerAddr": "172.19.0.7:10923",
        "lastUpdateTimestamp": 1562476361146
    },
    "172.19.0.5:10912": {
       
         
        "channel": {
       
         
            "active": true,
            "inputShutdown": false,
            "open": true,
            "outputShutdown": false,
            "registered": true,
            "writable": true
        },
        "dataVersion": {
       
         
            "counter": 3,
            "timestamp": 1562476312530
        },
        "haServerAddr": "172.19.0.5:10913",
        "lastUpdateTimestamp": 1562476360402
    },
    "172.19.0.4:10911": {
       
         
        "channel": {
       
         
            "active": true,
            "inputShutdown": false,
            "open": true,
            "outputShutdown": false,
            "registered": true,
            "writable": true
        },
        "dataVersion": {
       
         
            "counter": 3,
            "timestamp": 1562476312525
        },
        "haServerAddr": "172.19.0.4:10912",
        "lastUpdateTimestamp": 1562476359516
    },
    "172.19.0.6:10921": {
       
         
        "channel": {
       
         
            "active": true,
            "inputShutdown": false,
            "open": true,
            "outputShutdown": false,
            "registered": true,
            "writable": true
        },
        "dataVersion": {
       
         
            "counter": 3,
            "timestamp": 1562476312525
        },
        "haServerAddr": "172.19.0.6:10922",
        "lastUpdateTimestamp": 1562476360541
    }
}

BrokerHouseKeepingService

1、 BrokerHouseKeepingService:实现了ChannelEventListener接口,用于处理Broker状态事件,当Broker失效、异常或者关闭,则将BrokerRouteInfoManager中移除;

public class BrokerHousekeepingService implements ChannelEventListener {
 
   
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final NamesrvController namesrvController;

    public BrokerHousekeepingService(NamesrvController namesrvController) {
 
   
        this.namesrvController = namesrvController;
    }

    @Override
    public void onChannelConnect(String remoteAddr, Channel channel) {
 
   
    }

    @Override
    public void onChannelClose(String remoteAddr, Channel channel) {
 
   
        //通道关闭从RouteInfoManager中移除Broker
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }

    @Override
    public void onChannelException(String remoteAddr, Channel channel) {
 
   
        //通道发生异常从RouteInfoManager中移除Broker
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }

    @Override
    public void onChannelIdle(String remoteAddr, Channel channel) {
 
   
        //通道失效从RouteInfoManager中移除Broker
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    }
}

DefaultRequestProcessor

1、 DefaultRequestProcessor默认请求处理器,根据RequestCode执行相应的处理,核心处理方法processRequest()

  
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
 
   

if (ctx != null) {
 
   
        log.debug("receive request, {} {} {}",
            request.getCode(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            request);
    }
    switch (request.getCode()) {
 
   
        //向NameServer追加KV配置
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        //从NameServer获取KV配置
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        //从NameServer获取KV配置
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        //获取版本信息
        case RequestCode.QUERY_DATA_VERSION:
            return queryBrokerTopicConfig(ctx, request);
        //注册一个Broker,数据都是持久化的,如果存在则覆盖配置
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
 
   
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
 
   
                return this.registerBroker(ctx, request);
            }
        //卸载一个Broker,数据都是持久化的
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        //根据Topic获取Broker Name、topic配置信息
        case RequestCode.GET_ROUTEINTO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        //获取注册到Name Server的所有Broker集群信息
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        //去掉BrokerName的写权限
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        //从Name Server获取完整Topic列表
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        //从Namesrv删除Topic配置
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        //通过Namespace获取所有的KV List
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        //获取指定集群下的所有 topic
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        //获取所有系统内置 Topic 列表
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        //单元化相关 topic
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        //获取含有单元化订阅组的 Topic 列表
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        //获取含有单元化订阅组的非单元化
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        //更新Name Server配置
        case RequestCode.UPDATE_NAMESRV_CONFIG:
            return this.updateConfig(ctx, request);
        case RequestCode.GET_NAMESRV_CONFIG:
            return this.getConfig(ctx, request);
        default:
            break;
    }
    return null;
}

NamesrvStartup

1、 NamesrvStartupNameServer的启动入口,启动的核心是调用NamesrvControllerinitialize()方法;

public boolean initialize() {



    //从文件中加载数据到内存中,默认从${user.home}/namesrv/kvConfig.json文件加载
    this.kvConfigManager.load();
    //创建服务Server,传入处理连接的ChannelEventListener
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    //默认任务处理器的线程池,每一个RequestCode可以单独设置一个线程池,如果不设置就使用默认的线程池
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    //注册默认处理器,根据requestCode执行相应的处理
    this.registerProcessor();

    //启动后延迟5秒开始执行,每隔10秒执行一次,对于两分钟没有活跃的broker,关闭连接
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {



@Override
        public void run() {


            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    //启动后延迟1min,每隔10分钟执行打印configTable
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {



        @Override
        public void run() {


            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {


        // Register a listener to reload SslContext
        try {


            fileWatchService = new FileWatchService(
                new String[] {


                    TlsSystemConfig.tlsServerCertPath,
                    TlsSystemConfig.tlsServerKeyPath,
                    TlsSystemConfig.tlsServerTrustCertPath
                },
                new FileWatchService.Listener() {


                    boolean certChanged, keyChanged = false;
                    @Override
                    public void onChanged(String path) {


                        if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {


                            log.info("The trust certificate changed, reload the ssl context");
                            reloadServerSslContext();
                        }
                        if (path.equals(TlsSystemConfig.tlsServerCertPath)) {


                            certChanged = true;
                        }
                        if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {


                            keyChanged = true;
                        }
                        if (certChanged && keyChanged) {


                            log.info("The certificate and private key changed, reload the ssl context");
                            certChanged = keyChanged = false;
                            reloadServerSslContext();
                        }
                    }
                    private void reloadServerSslContext() {


                        ((NettyRemotingServer) remotingServer).loadSslContext();
                    }
                });
        } catch (Exception e) {


            log.warn("FileWatchService created error, can't load the certificate dynamically");
        }
    }

    return true;
}