動態

詳情 返回 返回

【RocketMq】RocketMq-NameServ 源碼分析(Ver4.9.4) - 動態 詳情

引言

RocketMq3.X的版本和Kafka一樣是基於Zookeeper進行路由管理的,但是這意味着運維需要多部署一套Zookeeper集羣,後來RocketMq選擇去ZK最終出現了NameServ。NameServ作為RocketMq源碼閲讀的切入點非常不錯,本文將會介紹Ver 4.9.4 版本的NameServ源碼分析。

NameServer主要有兩個功能,Broker管理路由信息管理

整個NameServ實際代碼只有幾百行,因為本身出現根本目的就是替代ZK,所以角色類似ZK。在下面的截圖中,NamesrvStartup為啓動類,NamesrvController為核心控制器,RouteInfoManager為路由信息表,整個NameServ基本上就是圍繞這三個類做文章。

NameServe的類結構圖如下:

源碼分析

NameServ 啓動

NameServ的啓動步驟主要有下面幾個點:

  1. 創建NameServ控制器,解析和創建重要配置,重要核心控制器創建並注入配置。
  2. NameServ核心控制器初始化,NettyServ服務等次重要相關組件創建和初始化。
  3. 啓動定時任務,定期掃描過期Broker並且移除不活躍Broker,定期打印系統全部的KV配置。
  4. 註冊JVM鈎子函數優雅關閉資源(Netty和線程池),啓動Netty。
  5. Netty服務啓動

在瞭解代碼細節之前,我們先畫一個時序圖瞭解NameServ的啓動過程:

NameServ 啓動

顯然NameServ的整個啓動基本上是在為Nettty做了一系列周邊服務,Netty是網絡通信的核心框架。

訪問入口

整個NameServ的入口為org.apache.rocketmq.namesrv.NamesrvStartup#main0,我們直接定位到相關代碼。

public static NamesrvController main0(String[] args) {  
  
    try {  
        // 1. 構建核心控制器
        NamesrvController controller = createNamesrvController(args);  
        // 2. 啓動控制器
        start(controller);  
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();  
        log.info(tip);  
        System.out.printf("%s%n", tip);  
        return controller;  
    } catch (Throwable e) {  
        e.printStackTrace();  
        System.exit(-1);  
    }  
  
    return null;  
}

構建核心控制器

NameServer一開始的工作是構建核心控制器,從整體上看主要做了下面幾個操作:

  1. 調用Apach Commons CLI 命令行解析工具進行命令解析。
  2. 根據運行時參數生成commandLine命令行對象。
  3. 創建NamesrvConfig和NettyServerConfig對象,讀取-c指定的配置文件路徑解析配置文件。
  4. namesrvConfignettyServerConfig 對象進行初始化。

Apach Commons CLI 工具可以幫助開發者快速構建服務器啓動命令參數,並且支持輸出到列表。這裏我們接着進入到 org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController 一探究竟。進入之後發現代碼還不少,所以我們拆成多個部分分析。

下面是完整的代碼:

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {  
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));  
    //PackageConflictDetect.detectFastjson();  
  
    Options options = ServerUtil.buildCommandlineOptions(new Options());  
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());  
    if (null == commandLine) {  
        System.exit(-1);  
        return null;    
    }  
  
    final NamesrvConfig namesrvConfig = new NamesrvConfig();  
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();  
    nettyServerConfig.setListenPort(9876);  
    if (commandLine.hasOption('c')) {  
        String file = commandLine.getOptionValue('c');  
        if (file != null) {  
            InputStream in = new BufferedInputStream(new FileInputStream(file));  
            properties = new Properties();  
            properties.load(in);  
            MixAll.properties2Object(properties, namesrvConfig);  
            MixAll.properties2Object(properties, nettyServerConfig);  
  
            namesrvConfig.setConfigStorePath(file);  
  
            System.out.printf("load config properties file OK, %s%n", file);  
            in.close();  
        }  
    }  
  
    if (commandLine.hasOption('p')) {  
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);  
        MixAll.printObjectProperties(console, namesrvConfig);  
        MixAll.printObjectProperties(console, nettyServerConfig);  
        System.exit(0);  
    }  
  
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);  
  
    if (null == namesrvConfig.getRocketmqHome()) {  
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);  
        System.exit(-2);  
    }  
  
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();  
    JoranConfigurator configurator = new JoranConfigurator();  
    configurator.setContext(lc);  
    lc.reset();  
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");  
  
    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);  
  
    MixAll.printObjectProperties(log, namesrvConfig);  
    MixAll.printObjectProperties(log, nettyServerConfig);  
  
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);  
  
    // remember all configs to prevent discard  
    controller.getConfiguration().registerConfig(properties);  
  
    return controller;  
}

因為內容比較多,這裏這裏分段進行介紹,,首先是註冊相關啓動後命令:

System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));  
    //PackageConflictDetect.detectFastjson();  
      // 創建命令行參數對象,這裏定義了 -h 和 -n參數
    Options options = ServerUtil.buildCommandlineOptions(new Options());  
    
    // 根據Options和運行時參數args生成命令行對象,buildCommandlineOptions定義了-c參數(Name server config properties file)和-p參數(Print all config item)

    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());  
    if (null == commandLine) {  
        System.exit(-1);  
        return null;    
    }  

ServerUtil.buildCommandlineOptions(new Options())以及org.apache.rocketmq.namesrv.NamesrvStartup#buildCommandlineOptions方法內部的邏輯:

// org.apache.rocketmq.srvutil.ServerUtil#buildCommandlineOptions
public static Options buildCommandlineOptions(final Options options) {  
    Option opt = new Option("h", "help", false, "Print help");  
    opt.setRequired(false);  
    options.addOption(opt);  
  
    opt =  
        new Option("n", "namesrvAddr", true,  
            "Name server address list, eg: '192.168.0.1:9876;192.168.0.2:9876'");  
    opt.setRequired(false);  
    options.addOption(opt);  
  
    return options;  
}

// org.apache.rocketmq.namesrv.NamesrvStartup#buildCommandlineOptions
public static Options buildCommandlineOptions(final Options options) {  
    Option opt = new Option("c", "configFile", true, "Name server config properties file");  
    opt.setRequired(false);  
    options.addOption(opt);  
  
    opt = new Option("p", "printConfigItem", false, "Print all config items");  
    opt.setRequired(false);  
    options.addOption(opt);  
  
    return options;  
}

從個人來看這個方法並不直觀,並且複用性比較低,個人比較傾向於改成下面的方式:

public static Option buildCommandlineOption(String opt, String longOpt, boolean hasArg, String description, boolean required){
    Option option = new Option(opt, longOpt, hasArg, description);
    option.setRequired(required);
    return option;
}

最後在本地個人把代碼改造為下面的方式,雖然參數還需要優化,但是感覺直觀了不少:

System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));  
//PackageConflictDetect.detectFastjson();  
Options options = new Options();  
// Modified to a more intuitive way of adding commands  
options.addOption(ServerUtil.buildCommandlineOption("c", "configFile", true, "Name server config properties file", false));  
options.addOption(ServerUtil.buildCommandlineOption("p", "printConfigItem", false, "Print all config items", false));  
options.addOption(ServerUtil.buildCommandlineOption("h", "help", false, "Print help", false));  
options.addOption(ServerUtil.buildCommandlineOption("n", "namesrvAddr", true,  
        "Name server address list, eg: '192.168.0.1:9876;192.168.0.2:9876'", false));  
  
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, options, new PosixParser());  
if (null == commandLine) {  
    System.exit(-1);  
    return null;}

如果覺得惹眼可以把這一段放到寫好的方法裏面,經過個人倒騰之後最終的代碼如下:

System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));  
//PackageConflictDetect.detectFastjson();   
Options options = buildCommandlineOptions(options);  
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, options, new PosixParser());  
if (null == commandLine) {  
    System.exit(-1);  
    return null;
    
}

//......
public static Options buildCommandlineOptions() {  
    Options options = new Options(); 
    // Modified to a more intuitive way of adding commands  
    options.addOption(ServerUtil.buildCommandlineOption("c", "configFile", true, "Name server config properties file", false));  
    options.addOption(ServerUtil.buildCommandlineOption("p", "printConfigItem", false, "Print all config items", false));  
    options.addOption(ServerUtil.buildCommandlineOption("h", "help", false, "Print help", false));  
    options.addOption(ServerUtil.buildCommandlineOption("n", "namesrvAddr", true,  
            "Name server address list, eg: '192.168.0.1:9876;192.168.0.2:9876'", false));  
    return options;  
}

    

相信讀者對於Apach Commons CLI 命令行解析工具進行命令解析有了大致的瞭解。Apach的命令行解析工具幫助開發者根據運行時候的參數構建命令行對象,之後再通過 -c 的參數決定是否讀取配置文件,解析配置文件之後填充到namesrvConfignettyServerConfig對象中。

解析命令之後是填充配置到對應的對象,填充配置文件的配置代碼如下:

final NamesrvConfig namesrvConfig = new NamesrvConfig();  
final NettyServerConfig nettyServerConfig = new NettyServerConfig();  
nettyServerConfig.setListenPort(9876);  
if (commandLine.hasOption('c')) {  
    String file = commandLine.getOptionValue('c');  
    if (file != null) {  
        InputStream in = new BufferedInputStream(new FileInputStream(file));  
        properties = new Properties();  
        properties.load(in);  
        MixAll.properties2Object(properties, namesrvConfig);  
        MixAll.properties2Object(properties, nettyServerConfig);  
  
        namesrvConfig.setConfigStorePath(file);  
  
        System.out.printf("load config properties file OK, %s%n", file);  
        in.close();  
    }  
}

這一段算是createNamesrvController(String[] args)核心代碼之一,作用是先創建NettyServerConfig以及NamesrvConfig對象,然後利用commandLine命令行工具讀取-c指定的配置文件路徑,這裏用比較經典的緩衝流文件IO讀取,之後生成Properties對象,這些代碼基本都是JAVAEE基礎,就不一一扣細節了。

當生成Properties對象完成之後,將namesrvConfig和nettyServerConfig對象進行初始化。接下來有一些不重要的代碼,比如發現沒有參數配置RocketMqHome會給出提示:

if (null == namesrvConfig.getRocketmqHome()) {  
    System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);  
    System.exit(-2);  
}

再比如會根據RocketMqHome的根路徑下固定路徑加載logback_namesrv.xml日誌配置文件,如果把日誌重定向到自己其他磁盤路徑,需要注意conf 這個層級文件夾以及日誌配置文件一併拷貝。

LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();  
JoranConfigurator configurator = new JoranConfigurator();  
configurator.setContext(lc);  
lc.reset();  
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

之後便是重點操作創建NamesrvController核心控制器了,這裏面把namesrvConfig和nettyServerConfig載入到核心控制器待後續初始化使用,代碼如下:

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

// remember all configs to prevent discard 
// 記住所有的配置以防止丟棄  
controller.getConfiguration().registerConfig(properties);

上面的代碼水到渠成地利用namesrvConfig和nettyServerConfig對象創建NamesrvController對象,然後在註冊一遍properties防止丟失。

注意這裏使用了JUC的 java.util.concurrent.locks.ReadWriteLock讀寫鎖進行操作

ReadWriteLock 是什麼,可以參考廖老師的博客:使用ReadWriteLock - 廖雪峯的官方網站 (liaoxuefeng.com)
使用ReadWriteLock可以提高讀取效率:

  • ReadWriteLock只允許一個線程寫入;
  • ReadWriteLock允許多個線程在沒有寫入時同時讀取;
  • ReadWriteLock適合讀多寫少的場景。

看完之後我們發現createNamesrvController(String[] args) 是非常重要的方法,內部的關鍵操作如下:

  • 提供namesrvConfignettyServerConfig配置對象
  • 創建NamesrvController核心控制器

創建完核心控制器之後緊接着便是啓動控制器,這裏有着次重要級別的初始化操作:

// 2. 啓動控制器
start(controller);  

初始化

創建核心控制器之後,緊接着是核心控制器的相關初始化動作,初始化的重要任務是下面幾個:

  • 初始化核心控制器,內部邏輯屬於次重要級相關組件啓動。
  • 註冊JVM鈎子函數優雅關閉Netty和釋放資源
  • 核心控制器真正啓動運行,實際上為觸發Netty服務開啓。

org.apache.rocketmq.namesrv.NamesrvStartup#start 初始化代碼如下:

public static NamesrvController start(final NamesrvController controller) throws Exception {  
  
    if (null == controller) {  
        throw new IllegalArgumentException("NamesrvController is null");  
    }  
    // 對核心控制器進行初始化操作  
    boolean initResult = controller.initialize();  
    if (!initResult) {  
        controller.shutdown();  
        System.exit(-3);  
    }  
    // 註冊一個鈎子函數,JVM進程關閉時優雅地釋放netty服務、線程池等資源  
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {  
        controller.shutdown();  
        return null;    }));  
    // 核心控制器啓動操作  
    controller.start();  
  
    return controller;  
}

start()的操作和創建核心控制器有點像,因為也是一個次重要級別的初始化操作。相關操作完成之後註冊一個鈎子函數優雅的釋放Netty服務以及釋放線程池的資源,最後對核心控制器進行啓動操作。

我們繼續深入核心控制器啓動操作,org.apache.rocketmq.namesrv.NamesrvController#initialize代碼如下:

public boolean initialize() {  
    // 加載KV配置  
    this.kvConfigManager.load();  
    // 創建Netty網絡服務對象  
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);  
  
    this.remotingExecutor =  
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));  
  
    this.registerProcessor();  
    // 創建定時任務--每個10s掃描一次Broker,並定時剔除不活躍的Broker  
    this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);  
    // 創建定時任務--每個10分鐘打印一遍KV配置  
    this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);  
  
    // 省略SSL判斷代碼
  
    return true;  
}

這部分代碼主要目的是對核心控制器進行啓動前的一些初始化操作,包括下面一些內容:

  • 根據上面方法初始化的NamesrvConfigkvConfigPath(存儲KV配置屬性的路徑)加載KV配置
  • 創建兩個定時任務:

    • 每隔10s掃描一次Broker,並定時剔除不活躍的Broker
    • 每隔10分鐘打印一遍KV配置

這裏的定時任務每次間隔10s掃描一次Broker,並定時剔除不活躍的Broker。

路由刪除的邏輯放到後面進行介紹,這裏暫時跳過

之後我們繼續看核心控制器是如何啓動的,方法入口為org.apache.rocketmq.namesrv.NamesrvController#start

public void start() throws Exception {  
    this.remotingServer.start();  
  
    if (this.fileWatchService != null) {  
        this.fileWatchService.start();  
    }  
}

非常簡單的,代碼其實就是啓動一下Netty服務罷了,因為RocketMq 底層通信是依賴Netty的,不過Netty的細節不在本文的討論範圍,這裏就不過多介紹挖掘細節了。

至此整個路由啓動的代碼完成。

NameServ註冊Broker

路由註冊的時序圖如下:

路由註冊

路由註冊簡單來説就是Broker註冊到NameServ的過程,主要是通過心跳包實現的,那麼Broker在代碼中是如何存儲的呢?我們根據上面的時序圖最後一步就可以直接找到答案,就是在 RouteManager裏面,裏面維護了下面的信息:

private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;  
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;  
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;  
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;  
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;  
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;

和Spring 管理Bean差不多的套路,用的是萬能的Map,上面定義的變量中比較重要的如下(和文章開頭對應的一致):

  • topicQueueTable:Topic消息隊列路由信息,包括topic所在的broker名稱,讀隊列數量,寫隊列數量,同步標記等信息,rocketmq根據topicQueueTable的信息進行負載均衡消息發送。
  • brokerAddrTable:Broker節點信息,包括brokername,所在集羣名稱,還有主備節點信息。
  • clusterAddrTable:Broker集羣信息,存儲了集羣中所有的Brokername。
  • brokerLiveTable:Broker狀態信息,Nameserver每次收到Broker的心跳包就會更新該信息。

RocketMq在消息隊列消費模式上使用的是發佈訂閲的模式設計,這在[[【RocketMq】RocketMq 掃盲]]中也有提到,這裏不多贅述。

Broker中會存在一個Topic中有很多個Queue的情況,在默認的參數配置中RocketMq為每個新創建的Topic默認分配4個讀隊列和4個寫隊列,多個Broker還會組成集羣,Broker還會定期向NameServ發送心跳包註冊信息,NameServ則通過brokerLiveTable完成Broker節點狀態的管理。

下面我們根據時序圖一步步往下觀察NameServ註冊Broker的過程:

發送心跳包

上面我們分析了NameServ的啓動代碼,其實觀察Broker的啓動代碼會發現有一定的相似之處,都是第一步構建一個控制器,然後start(),創建控制器這一部分內容不是重點這裏跳過,我們接着看start()方法。

public static void main(String[] args) {  
    start(createBrokerController(args));  
}  
  
public static BrokerController start(BrokerController controller) {  
    try {  
  
        controller.start();  
  
        String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "  
            + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();  
  
        if (null != controller.getBrokerConfig().getNamesrvAddr()) {  
            tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();  
        }  
  
        log.info(tip);  
        System.out.printf("%s%n", tip);  
        return controller;  
    } catch (Throwable e) {  
        e.printStackTrace();  
        System.exit(-1);  
    }  
  
    return null;  
}

controller.start();是時序圖的開始,下面是org.apache.rocketmq.broker.BrokerController#start:的內部代碼:

public void start() throws Exception {  

    // 此處省略相關依賴組件的start()過程
    //.....
    
    if (!messageStoreConfig.isEnableDLegerCommitLog()) {  
        startProcessorByHa(messageStoreConfig.getBrokerRole());  
        // 主從同步節點配置處理
        handleSlaveSynchronize(messageStoreConfig.getBrokerRole());  
        // 初次啓動強制發送心跳包
        this.registerBrokerAll(true, false, true);  
    }

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  
  
        @Override  
        public void run() {  
            try {  
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());  
            } catch (Throwable e) {  
                log.error("registerBrokerAll Exception", e);  
            }  
        }  
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);  
  
    if (this.brokerStatsManager != null) {  
        this.brokerStatsManager.start();  
    }  
  
    if (this.brokerFastFailure != null) {  
        this.brokerFastFailure.start();  
    }  
  
  
}

registerBrokerAll 這個方法的參數可讀性不太好,所以這裏列舉一下三個參數的順序以及代碼對應的參數數值:

  • boolean checkOrderConfig(true)
  • boolean oneway(false)
  • boolean forceRegister(true)

搭配上參數之後就比較好懂了,也就是説加上配置校驗以及強制執行一次註冊動作,並且以非oneWay的方式發送一次心跳包。

下面我們順利進入到 registerBrokerAll() 方法,方法內部首先創建topic包裝類 ,然後會有一段比較有意思的代碼,那就是如果沒有讀寫權限會默認重新創建一個臨時使用的topicConfigTable設置到Topic當中,之後是判斷Broker此時是否需要執行發送心跳包。

但是我們回到上一級調用this.registerBrokerAll(true, false, true);這裏的參數傳遞就會發現,實際上forceRegister總是為true,也就是説基本上每個Broker第一次初始化必定需要傳遞心跳包的:

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {  
    // 創建 TopicConfigSerializeWrapper,topic包裝類  
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();  
    // 如果沒有讀寫權限,此時會默認重新創建一個臨時使用的topicConfigTable,作為Topic包裝類的參數數值  
    // 個人認為這一步是防止空參數導致後面的方法出現異常,同時如果後續具備讀寫權限之後不需要重新創建直接使用  
    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())  
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {  
        // 這裏初始化的值可以使用默認的Topic配置數量,比如加上topicConfigWrapper.getTopicConfigTable().values().size()  
        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(topicConfigWrapper.getTopicConfigTable().values().size());  
        for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {  
            TopicConfig tmp =  
                new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),  
                    this.brokerConfig.getBrokerPermission());  
            topicConfigTable.put(topicConfig.getTopicName(), tmp);  
        }  
        topicConfigWrapper.setTopicConfigTable(topicConfigTable);  
    }  
    // 判斷Broker是否需要發送心跳包
    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),  
        this.getBrokerAddr(),  
        this.brokerConfig.getBrokerName(),  
        this.brokerConfig.getBrokerId(),  
        this.brokerConfig.getRegisterBrokerTimeoutMills())) {  
        // 執行發送心跳包
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);  
    }  
}

下面我們接着定位到needRegister方法進行解讀,這裏我們主要定位到org.apache.rocketmq.broker.out.BrokerOuterAPI#needRegister方法,這裏截取關鍵代碼如下:

brokerOuterExecutor.execute(() -> {  
    try {  
        QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();  
        requestHeader.setBrokerAddr(brokerAddr);  
        requestHeader.setBrokerId(brokerId);  
        requestHeader.setBrokerName(brokerName);  
        requestHeader.setClusterName(clusterName);  
        RemotingCommand request = 
        RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);  
        request.setBody(topicConfigWrapper.getDataVersion().encode());  
        // 同步遠程調用到路由中心
        RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);  
        DataVersion nameServerDataVersion = null;  
        Boolean changed = false;  
        // 省略代碼:根據返回代碼進行判斷處理
        //..
        log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);  
    } catch (Exception e) {  
        changedList.add(Boolean.TRUE);  
        log.error("Query data version from name server {}  Exception, {}", namesrvAddr, e);  
    } finally {  
        countDownLatch.countDown();  
    }  
});

這個代碼不難理解,算是我們平常寫HTTP調用的一個變體,我們可以通過RequestCode.QUERY_DATA_VERSION,查到NameServer的接受處理代碼。

利用IDEA我們很快發現org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest方法入口,之後進入到org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#queryBrokerTopicConfig方法,然後這裏看到對應代碼如下:

public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,  
    RemotingCommand request) throws RemotingCommandException {  
    final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);  
    final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();  
    final QueryDataVersionRequestHeader requestHeader =  
        (QueryDataVersionRequestHeader) request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class);  
    DataVersion dataVersion = DataVersion.decode(request.getBody(), DataVersion.class);  

    // 內部處理:如果dataVersion為空或者當前dataVersion不等於brokerLiveTable存儲的brokerLiveTable,Broker就需要發送心跳包
    Boolean changed = this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion);  
    
    if (!changed) {  
        // 更新Broker信息
                this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr(), System.currentTimeMillis());  
    }  
  
    DataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());  
    response.setCode(ResponseCode.SUCCESS);  
    response.setRemark(null);  
  
    if (nameSeverDataVersion != null) {  
        response.setBody(nameSeverDataVersion.encode());  
    }  
    responseHeader.setChanged(changed);  
    return response;  
}

我們進入到關鍵判斷代碼org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#isBrokerTopicConfigChanged

public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {  
    DataVersion prev = queryBrokerTopicConfig(brokerAddr);  
    // 如果dataVersion為空或者當前dataVersion不等於brokerLiveTable存儲的brokerLiveTable,Broker就需要發送心跳包
    return null == prev || !prev.equals(dataVersion);  
}  
  
public DataVersion queryBrokerTopicConfig(final String brokerAddr) {  
    BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);  
    if (prev != null) {  
        return prev.getDataVersion();  
    }  
    return null;  
}

Broker是否需要發送心跳包由該Broker在路由中心org.apache.rocketmq.namesrv.routeinfo.BrokerLiveInfo#dataVersion決定,如果dataVersion為空或者當前dataVersion不等於brokerLiveTable存儲的brokerLiveTable,Broker就需要發送心跳包。

Nameserver處理心跳包

Nameserver的netty服務監聽收到心跳包之後,會調用到路由中心以下方法進行處理,具體的方法入口為:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker

public RegisterBrokerResult registerBroker(  
        final String clusterName,  
        final String brokerAddr,  
        final String brokerName,  
        final long brokerId,  
        final String haServerAddr,  
        final TopicConfigSerializeWrapper topicConfigWrapper,  
        final List<String> filterServerList,  
        final Channel channel) {  
    RegisterBrokerResult result = new RegisterBrokerResult();  
    try {  
        try {  
            this.lock.writeLock().lockInterruptibly();  
  
            // 獲取集羣下所有的Broker,並將當前Broker加入clusterAddrTable,由於brokerNames是Set結構,並不會重複  
            Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());  
            brokerNames.add(brokerName);  
  
            boolean registerFirst = false;  
  
            // 獲取Broker信息,如果是首次註冊,那麼新建一個BrokerData並加入brokerAddrTable  
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);  
            if (null == brokerData) {  
                registerFirst = true;  
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());  
                this.brokerAddrTable.put(brokerName, brokerData);  
            }  
            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();  
            //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>  
            //The same IP:PORT must only have one record in brokerAddrTable            Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();  
            //從庫切換主庫:首先刪除namesrv中的<1, IP:PORT>,然後添加<0, IP:PORT>。  
            //同一個IP:端口在brokerAddrTable中只能有一條記錄。
            while (it.hasNext()) {  
                Entry<Long, String> item = it.next();  
                if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {  
                    log.debug("remove entry {} from brokerData", item);  
                    it.remove();  
                }  
            }  
            // 裏判斷Broker是否是已經註冊過  
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);  
  
            registerFirst = registerFirst || (null == oldAddr);  
            // 如果是Broker是Master節點嗎,並且Topic信息更新或者是首次註冊,那麼創建更新topic隊列信息  
            if (null != topicConfigWrapper  
                    && MixAll.MASTER_ID == brokerId) {  
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())  
                        || registerFirst) {  
                    ConcurrentMap<String, TopicConfig> tcTable =  
                            topicConfigWrapper.getTopicConfigTable();  
                    if (tcTable != null) {  
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {  
                            this.createAndUpdateQueueData(brokerName, entry.getValue());  
                        }  
                    }  
                }  
            }  
            // 更新BrokerLiveInfo狀態信息  
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,  
                    new BrokerLiveInfo(  
                            System.currentTimeMillis(),  
                            topicConfigWrapper.getDataVersion(),  
                            channel,  
                            haServerAddr));  
  
            if (filterServerList != null) {  
                if (filterServerList.isEmpty()) {  
                    this.filterServerTable.remove(brokerAddr);  
                } else {  
                    this.filterServerTable.put(brokerAddr, filterServerList);  
                }  
            }  
            // 如果不是MASTER_ID,則返回結果返回masterAddr。  
            if (MixAll.MASTER_ID != brokerId) {  
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);  
                if (masterAddr != null) {  
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);  
                    if (brokerLiveInfo != null) {  
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());  
                        result.setMasterAddr(masterAddr);  
                    }  
                }  
            }  
        } finally {  
            this.lock.writeLock().unlock();  
        }  
    } catch (Exception e) {  
        log.error("registerBroker Exception", e);  
    }  
  
    return result;  
}

上面的代碼是Broker心跳包的最核心方法,它主要做了下面幾件事:

  • RouteInfoManager路由信息的更新操作

    • clusterAddrTable 更新;
    • brokerAddrTable 更新;
    • topicQueueTable 更新;
    • brokerLiveTable 更新;

定期排除Broker

根據理論學習我們知道,NameServ在啓動的時候會創建一個定時任務,定時剔除不活躍的Broker。這一點的源碼在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker中可以找到答案。

此外在單元測試中就有關於這一項定期清理的測試,也是比較快的找到入口的辦法:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManagerBrokerRegisterTest#testScanNotActiveBroker

這個測試非常簡單直觀我們:

private static RouteInfoManager routeInfoManager;  
public static String clusterName = "cluster";  
public static String brokerPrefix = "broker";  
public static String topicPrefix = "topic";  
public static int brokerPerName = 3;  
public static int brokerNameNumber = 3;
@Test  
public void testScanNotActiveBroker() {  
    for (int j = 0; j < brokerNameNumber; j++) {  
        String brokerName = getBrokerName(brokerPrefix, j);  
  
        for (int i = 0; i < brokerPerName; i++) {  
            String brokerAddr = getBrokerAddr(clusterName, brokerName, i);  
  
            // set not active  
            routeInfoManager.updateBrokerInfoUpdateTimestamp(brokerAddr, 0);  
  
            assertEquals(1, routeInfoManager.scanNotActiveBroker());  
        }  
    }  
  
}

在啓動單元測試之前會先構建10個Broker節點註冊進去,這裏單元測試細心的使用了多個集羣模擬生產環境:

private static RouteInfoManager routeInfoManager;  
public static String clusterName = "cluster";  
public static String brokerPrefix = "broker";  
public static String topicPrefix = "topic";  
public static int brokerPerName = 3;  
public static int brokerNameNumber = 3;
@Before  
public void setup() {  
    routeInfoManager = new RouteInfoManager();  
    cluster = registerCluster(routeInfoManager,  
            clusterName,  
            brokerPrefix,  
            brokerNameNumber,  
            brokerPerName,  
            topicPrefix,  
            10);  
}

之後我們直接跑一邊單元測試,在日誌中單元測試為我們展示了詳細的測試流程:

  1. 首先是構建broker註冊,內部會塞入一些測試數據的Topic進行填充。
06:54:23.353 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-0:0 HAServer: cluster-broker-0:0
06:54:23.353 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-0:1 HAServer: cluster-broker-0:1
06:54:23.353 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-0:2 HAServer: cluster-broker-0:2
06:54:23.353 [main] INFO RocketmqNamesrv - cluster [cluster] brokerName [broker-1] master address change from null to cluster-broker-1:0
06:54:23.353 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-1:0 HAServer: cluster-broker-1:0
06:54:23.355 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-1:1 HAServer: cluster-broker-1:1
06:54:23.355 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-1:2 HAServer: cluster-broker-1:2
06:54:23.355 [main] INFO RocketmqNamesrv - cluster [cluster] brokerName [broker-2] master address change from null to cluster-broker-2:0
06:54:23.355 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-2:0 HAServer: cluster-broker-2:0
06:54:23.355 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-2:1 HAServer: cluster-broker-2:1
06:54:23.355 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-2:2 HAServer: cluster-broker-2:2
  1. 接着便是根據單元測試的代碼進行遍歷排除Broker節點,在循環的最後調用掃描檢查不活躍Broker。這裏為了驗證直接設置lastUpdateTimestamp(最後更新時間)讓Broker存活驗證週期提前結束驗證掃描效果。
06:55:34.483 [main] INFO RocketmqRemoting - closeChannel: close the connection to remote address[embedded] result: true
06:55:34.483 [main] WARN RocketmqNamesrv - The broker channel expired, cluster-broker-1:0 120000ms
06:55:34.483 [main] INFO RocketmqNamesrv - remove brokerAddr[0, cluster-broker-1:0] from brokerAddrTable, because channel destroyed
06:55:34.483 [main] INFO RocketmqRemoting - closeChannel: close the connection to remote address[embedded] result: true
06:55:34.483 [main] WARN RocketmqNamesrv - The broker channel expired, cluster-broker-1:1 120000ms
06:55:34.483 [main] INFO RocketmqNamesrv - remove brokerAddr[1, cluster-broker-1:1] from brokerAddrTable, because channel destroyed
06:55:34.483 [main] INFO RocketmqRemoting - closeChannel: close the connection to remote address[embedded] result: true
06:55:34.483 [main] WARN RocketmqNamesrv - The broker channel expired, cluster-broker-1:2 120000ms
06:55:34.484 [main] INFO RocketmqNamesrv - remove brokerAddr[2, cluster-broker-1:2] from brokerAddrTable, because channel destroyed
06:55:34.484 [main] INFO RocketmqNamesrv - remove brokerName[broker-1] from brokerAddrTable, because channel destroyed
06:55:34.484 [main] INFO RocketmqNamesrv - remove brokerName[broker-1], clusterName[cluster] from clusterAddrTable, because channel destroyed

以上便是單元測試的大致內容,我們接着看看具體的代碼即可,這裏還是用了迭代器模式進行遍歷刪除,又是一個經典的設計模式:

public int scanNotActiveBroker() {  
    int removeCount = 0;  
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();  
    while (it.hasNext()) {  
        Entry<String, BrokerLiveInfo> next = it.next();  
        long last = next.getValue().getLastUpdateTimestamp();  
        // BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2 = 120秒,在單元測試中這裏的last被設置為0所以必然超時
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {  
            RemotingUtil.closeChannel(next.getValue().getChannel());  
            it.remove();  
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);  
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());  
  
            removeCount++;  
        }  
    }  
  
    return removeCount;  
}

剔除Broker信息的邏輯比較簡單,首先從BrokerLiveInfo獲取狀態信息,判斷Broker的心跳時間是否已超過限定值(默認120秒),若超過之後就執行剔除操作。

寫在最後

分析完了rocketmq自帶的路由中心源碼,其實我們自己實現一個路由中心貌似也不難。NameServ小而美的設計非常取巧,當然僅僅幾百行代碼確實還是存在比較多的不完美之處,很多方案需要開發人員自己編寫業務代碼兜底,但是有因為設計簡單負責的任務,使用並且業務代碼擴展性很強,維護成本低並且性能不錯。

NameServ作為整個RocketMq的核心用法上簡單的同時非常適合作為Rocketmq的切入點,個人在閲讀代碼中也會嘗試修改代碼查看效果,自己參與到源碼編寫和改造過程這會對代碼編寫者的思路更為清晰理解,也算是一個源碼閲讀的小技巧吧。

參考資料

  • RocketMQ源碼分析之路由中心 (objcoding.com)
user avatar king_wenzhinan 頭像 tech 頭像 huangxunhui 頭像 yudinghou 頭像 jeecg 頭像
點贊 5 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.