Flink源碼解讀系列 | Flink集羣Standalone啓動腳本 -_flink

前文中,我們已經瞭解了 Flink 的三種執行圖是怎麼生成的。今天繼續看一下 Flink 集羣是如何啓動的。

前文中,我們已經瞭解了 Flink 的三種執行圖是怎麼生成的。今天繼續看一下 Flink 集羣是如何啓動的。

啓動腳本

集羣啓動腳本的位置在:

flink-dist/src/main/flink-bin/bin/start-cluster.sh

腳本會負責啓動 JobManager 和 TaskManager,我們主要關注 standalone 啓動模式,具體的流程見下圖。

Flink源碼解讀系列 | Flink集羣Standalone啓動腳本 -_啓動流程_02

從圖中可以看出 JobManager 是通過 jobmanager.sh 文件啓動的,TaskManager 是通過taskmanager.sh 啓動的,兩者都調用了 flink-daemon.sh,通過傳遞不同的參數,最終運行不同的 Java 類。

case $DAEMON in
    (taskexecutor)
        CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
    ;;

    (zookeeper)
        CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
    ;;

    (historyserver)
        CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
    ;;

    (standalonesession)
        CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
    ;;

    (standalonejob)
        CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
    ;;

    (sql-gateway)
        CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway
        SQL_GATEWAY_CLASSPATH="`findSqlGatewayJar`":"`findFlinkPythonJar`"
    ;;

    (*)
        echo "Unknown daemon '${DAEMON}'. $USAGE."
        exit 1
    ;;
esac

JobManager 啓動流程

在 StandaloneSessionClusterEntrypoint 的 main 方法中,主要就是加載各種配置和環境變量,然後調用 ClusterEntrypoint.runClusterEntrypoint 來啓動集羣。跟着調用鏈一直找到 ClusterEntrypoint.runCluster 方法,這裏會啓動 ResourceManager、DispatcherRunner 等組件。

private void runCluster(Configuration configuration, PluginManager pluginManager)
        throws Exception {
    synchronized (lock) {
        // 初始化各種服務
        initializeServices(configuration, pluginManager);

        // 創建 DispatcherResourceManagerComponentFactory,
        // 包含了三個核心組件的 Factory
        // DispatcherRunnerFactory、ResourceManagerFactory、RestEndpointFactory
        final DispatcherResourceManagerComponentFactory
                dispatcherResourceManagerComponentFactory =
                        createDispatcherResourceManagerComponentFactory(configuration);

        // 啓動 ResourceManager、DispatcherRunner、WebMonitorEndpoint
        clusterComponent =
                dispatcherResourceManagerComponentFactory.create(
                        configuration,
                        resourceId.unwrap(),
                        ioExecutor,
                        commonRpcService,
                        haServices,
                        blobServer,
                        heartbeatServices,
                        delegationTokenManager,
                        metricRegistry,
                        executionGraphInfoStore,
                        new RpcMetricQueryServiceRetriever(
                                metricRegistry.getMetricQueryServiceRpcService()),
                        failureEnrichers,
                        this);

        // 關閉服務
        clusterComponent
                .getShutDownFuture()
                .whenComplete(
                        (ApplicationStatus applicationStatus, Throwable throwable) -> {
                            if (throwable != null) {
                                shutDownAsync(
                                        ApplicationStatus.UNKNOWN,
                                        ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                                        ExceptionUtils.stringifyException(throwable),
                                        false);
                            } else {
                                // This is the general shutdown path. If a separate more
                                // specific shutdown was
                                // already triggered, this will do nothing
                                shutDownAsync(
                                        applicationStatus,
                                        ShutdownBehaviour.GRACEFUL_SHUTDOWN,
                                        null,
                                        true);
                            }
                        });
    }
}

下面來詳細看一下這幾個方法, initializeServices 就是負責初始化各種服務,有幾個比較重要的可以着重關注下:

// 初始化並啓動一個通用的 RPC Service
commonRpcService = RpcUtils.createRemoteRpcService(...);

// 創建一個 IO 線程池,線程數量位 CPU 核數 * 4
ioExecutor = Executors.newFixedThreadPool(...);

// 創建 HA 服務組件,根據配置初始化 Standalone、ZK、K8S 三種
haServices = createHaServices(configuration, ioExecutor, rpcSystem);

// 創建並啓動 blobServer,blobServer 可以理解為是 Flink 內部的
blobServer = BlobUtils.createBlobServer(...);
blobServer.start();

// 創建心跳服務
heartbeatServices = createHeartbeatServices(configuration);

// 創建一個監控服務
processMetricGroup = MetricUtils.instantiateProcessMetricGroup(...);

createDispatcherResourceManagerComponentFactory 這個方法就是創建了三個工廠類,不需要過多介紹。我們重點關注 dispatcherResourceManagerComponentFactory.create 方法,即 ResourceManager、DispatcherRunner、WebMonitorEndpoint 是如何啓動的。

WebMonitorEndpoint

WebMonitorEndpoint 的啓動流程圖如下,圖中細箭頭代表同一個方法中順序調用,粗箭頭代表進入上一個方法內部的調用。

Flink源碼解讀系列 | Flink集羣Standalone啓動腳本 -_啓動流程_03

WebMonitorEndpoint 創建和啓動步驟如下:

  1. 通過工廠創建出了 WebMonitorEndpoint,這裏就是比較常規的初始化操作。
  2. 調用 WebMonitorEndpoint 的 start 方法開始啓動,start 方法內部先是創建了一個 Router 並調用 initializeHandlers 創建了一大堆 handler(是真的一大堆,這個方法有接近一千行,都是在創建 handler),創建完成之後,對 handler 進行排序和去重,再把它們都註冊到 Router 中。這裏排序是為了確保路由匹配的正確性,排序規則是先靜態路徑(/jobs/overview),後動態路徑(/jobs/:jobid),假如我們沒有排序,先註冊了 /jobs/:jobid ,後註冊 /jobs/overview ,這時當我們請求 /jobs/overview 時,就會被錯誤的路由到 /jobs/:jobid 上去。
  3. 是調用 startInternal 方法,在 startInternal 方法內部只有 leader 選舉和啓動緩存清理任務兩個步驟。

ResourceManager

Flink源碼解讀系列 | Flink集羣Standalone啓動腳本 -_啓動流程_04

ResourceManager 創建和啓動步驟如下:

  1. 調用 ResourceManagerServiceImpl.create 方法創建 ResourceManagerService,這裏只是創建 ResourceManager 服務,實際創建 ResourceManager 在後面的步驟中。
  2. 調用 resourceManagerService.start 方法啓動服務,這裏就是啓動選主服務,standalne 模式直接調用 grantLeadership 成為 leader。
  3. 成為 leader 後,就會調用 startNewLeaderResourceManager 方法,這個方法中會調用 resourceManagerFactory.createResourceManager 正式創建 resourceManager。創建完成後,就會調用 resourceManager.start 來啓動它。
  4. 啓動後會回調 ResourceManager.onStart 方法。這裏調用 startHeartbeatServices 啓動了兩個心跳服務,一個是 ResourceManager 和 TaskManager 之間的心跳,一個是 ResourceManager 和 JobManager 之間的心跳,然後會啓動 SlotManager。SlotManager 可以被當作 Flink 集羣的資源調度中心。它會負責管理集羣中的所有 Slot 資源,也需要響應 JobManager 的資源請求。

DispatcherRunner

Flink源碼解讀系列 | Flink集羣Standalone啓動腳本 -_flink_05

  1. 先創建工廠,創建完成後調用 DefaultDispatcherRunner.create 創建出 DispatcherRunner,接着是調用 start 啓動選主流程。
  2. 選主完成後就調用 startNewDispatcherLeaderProcess 啓動新的流程。啓動新的流程需要先關閉舊流程,然後創建新的 dispatcherLeaderProcess,並調用 start 啓動。
  3. 啓動時,會回調 onStart 方法。
  4. 回調方法中,先啓動 executionPlanStore,它主要是用於持久化 JobGraph。然後恢復執行計劃,重建狀態(如果是從失敗中恢復),實例化 Dispatcher,完成作業啓動。

TaskManager 啓動流程

Flink源碼解讀系列 | Flink集羣Standalone啓動腳本 -_flink_06

TaskManager 是 Flink 的執行節點,其最小執行單元是 slot。TaskManager 啓動流程也主要是和資源管理相關,包括 slot 列表的管理和與 ResourceManager 的通信。

TaskManager 啓動流程大體分為以下幾部分:

  1. 構建並啓動 TaskManagerRunner(藍色部分)
  2. 啓動 TaskExecutor(紅色部分)
  3. 完成與 ResourceManager 的連接(橙色部分)

啓動 TaskManagerRunner

在 TaskManagerRunner 的 start 方法中,有兩個步驟:

第一步是調用 startTaskManagerRunnerServices 創建和啓動了很多服務,這一點和 JobManager 的啓動流程比較像。這些服務包括了高可用服務、心跳服務、監控指標服務等,這裏也創建了 taskExecutorService,它的啓動在第二步。

第二步是調用 taskExecutorService.start 方法,啓動 TaskExecutorService,它內部主要負責啓動 TaskExecutor。

啓動 TaskExecutor

TaskExecutor 是 TaskManager 內部的一個核心組件,負責幫助 TaskManager 完成 task 的部署和執行等核心操作。

在上一步調用 taskExecutor 的 start 方法後,會回調 onStart 方法,這裏主要是三個步驟

  1. 連接 ResourceManager 以及註冊監聽
  2. 啓動 taskSlotTable
  3. 連接 JobMaster 以及註冊監聽

第一步我們在下面詳細解釋。第二步啓動的 TaskSlotTable 是 TaskManager 中負責資源的核心組件,它維護了一個 Slot 列表,管理每個 Slot 的狀態,負責 Slot 的分配和釋放。第三步主要是和 JobMaster 建立連接並保持心跳,同時也會接收 Slot 申請的請求。

連接 ResourceManager

TaskExecutor 註冊完監聽之後,會收到 ResourceManagerLeaderListener.notifyLeaderAddress 方法回調。回調方法中,會創建一個 TaskExecutorToResourceManagerConnection 實例並啓動它。這個類是用來將 TaskExecutor 註冊到 ResourceManager,註冊成功會回調 onRegistrationSuccess 方法。回調成功的方法中,TaskManager 會調用 resourceManagerGateway.sendSlotReport 將 Slot 的狀態進行上報。

總結

本文介紹了 Flink 集羣在 Standalone 模式下的啓動過程,其中 JobManager 重點介紹了 WebMonitorEndpoint、ResourceManager 和 DispatcherRunner 這三個組件的啓動過程。TaskManager 主要介紹了啓動 TaskExecutor 和連接 ResourceManager 的過程。