前文中,我們已經瞭解了 Flink 的三種執行圖是怎麼生成的。今天繼續看一下 Flink 集羣是如何啓動的。
前文中,我們已經瞭解了 Flink 的三種執行圖是怎麼生成的。今天繼續看一下 Flink 集羣是如何啓動的。
啓動腳本
集羣啓動腳本的位置在:
flink-dist/src/main/flink-bin/bin/start-cluster.sh
腳本會負責啓動 JobManager 和 TaskManager,我們主要關注 standalone 啓動模式,具體的流程見下圖。
從圖中可以看出 JobManager 是通過 jobmanager.sh 文件啓動的,TaskManager 是通過taskmanager.sh 啓動的,兩者都調用了 flink-daemon.sh,通過傳遞不同的參數,最終運行不同的 Java 類。
case $DAEMON in
(taskexecutor)
CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
;;
(zookeeper)
CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
;;
(historyserver)
CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
;;
(standalonesession)
CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
;;
(standalonejob)
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
;;
(sql-gateway)
CLASS_TO_RUN=org.apache.flink.table.gateway.SqlGateway
SQL_GATEWAY_CLASSPATH="`findSqlGatewayJar`":"`findFlinkPythonJar`"
;;
(*)
echo "Unknown daemon '${DAEMON}'. $USAGE."
exit 1
;;
esac
JobManager 啓動流程
在 StandaloneSessionClusterEntrypoint 的 main 方法中,主要就是加載各種配置和環境變量,然後調用 ClusterEntrypoint.runClusterEntrypoint 來啓動集羣。跟着調用鏈一直找到 ClusterEntrypoint.runCluster 方法,這裏會啓動 ResourceManager、DispatcherRunner 等組件。
private void runCluster(Configuration configuration, PluginManager pluginManager)
throws Exception {
synchronized (lock) {
// 初始化各種服務
initializeServices(configuration, pluginManager);
// 創建 DispatcherResourceManagerComponentFactory,
// 包含了三個核心組件的 Factory
// DispatcherRunnerFactory、ResourceManagerFactory、RestEndpointFactory
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
// 啓動 ResourceManager、DispatcherRunner、WebMonitorEndpoint
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
resourceId.unwrap(),
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
delegationTokenManager,
metricRegistry,
executionGraphInfoStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
failureEnrichers,
this);
// 關閉服務
clusterComponent
.getShutDownFuture()
.whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more
// specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
ShutdownBehaviour.GRACEFUL_SHUTDOWN,
null,
true);
}
});
}
}
下面來詳細看一下這幾個方法, initializeServices 就是負責初始化各種服務,有幾個比較重要的可以着重關注下:
// 初始化並啓動一個通用的 RPC Service
commonRpcService = RpcUtils.createRemoteRpcService(...);
// 創建一個 IO 線程池,線程數量位 CPU 核數 * 4
ioExecutor = Executors.newFixedThreadPool(...);
// 創建 HA 服務組件,根據配置初始化 Standalone、ZK、K8S 三種
haServices = createHaServices(configuration, ioExecutor, rpcSystem);
// 創建並啓動 blobServer,blobServer 可以理解為是 Flink 內部的
blobServer = BlobUtils.createBlobServer(...);
blobServer.start();
// 創建心跳服務
heartbeatServices = createHeartbeatServices(configuration);
// 創建一個監控服務
processMetricGroup = MetricUtils.instantiateProcessMetricGroup(...);
createDispatcherResourceManagerComponentFactory 這個方法就是創建了三個工廠類,不需要過多介紹。我們重點關注 dispatcherResourceManagerComponentFactory.create 方法,即 ResourceManager、DispatcherRunner、WebMonitorEndpoint 是如何啓動的。
WebMonitorEndpoint
WebMonitorEndpoint 的啓動流程圖如下,圖中細箭頭代表同一個方法中順序調用,粗箭頭代表進入上一個方法內部的調用。
WebMonitorEndpoint 創建和啓動步驟如下:
- 通過工廠創建出了 WebMonitorEndpoint,這裏就是比較常規的初始化操作。
- 調用 WebMonitorEndpoint 的 start 方法開始啓動,start 方法內部先是創建了一個 Router 並調用 initializeHandlers 創建了一大堆 handler(是真的一大堆,這個方法有接近一千行,都是在創建 handler),創建完成之後,對 handler 進行排序和去重,再把它們都註冊到 Router 中。這裏排序是為了確保路由匹配的正確性,排序規則是先靜態路徑(/jobs/overview),後動態路徑(/jobs/:jobid),假如我們沒有排序,先註冊了 /jobs/:jobid ,後註冊 /jobs/overview ,這時當我們請求 /jobs/overview 時,就會被錯誤的路由到 /jobs/:jobid 上去。
- 是調用 startInternal 方法,在 startInternal 方法內部只有 leader 選舉和啓動緩存清理任務兩個步驟。
ResourceManager
ResourceManager 創建和啓動步驟如下:
- 調用 ResourceManagerServiceImpl.create 方法創建 ResourceManagerService,這裏只是創建 ResourceManager 服務,實際創建 ResourceManager 在後面的步驟中。
- 調用 resourceManagerService.start 方法啓動服務,這裏就是啓動選主服務,standalne 模式直接調用 grantLeadership 成為 leader。
- 成為 leader 後,就會調用 startNewLeaderResourceManager 方法,這個方法中會調用 resourceManagerFactory.createResourceManager 正式創建 resourceManager。創建完成後,就會調用 resourceManager.start 來啓動它。
- 啓動後會回調 ResourceManager.onStart 方法。這裏調用 startHeartbeatServices 啓動了兩個心跳服務,一個是 ResourceManager 和 TaskManager 之間的心跳,一個是 ResourceManager 和 JobManager 之間的心跳,然後會啓動 SlotManager。SlotManager 可以被當作 Flink 集羣的資源調度中心。它會負責管理集羣中的所有 Slot 資源,也需要響應 JobManager 的資源請求。
DispatcherRunner
- 先創建工廠,創建完成後調用 DefaultDispatcherRunner.create 創建出 DispatcherRunner,接着是調用 start 啓動選主流程。
- 選主完成後就調用 startNewDispatcherLeaderProcess 啓動新的流程。啓動新的流程需要先關閉舊流程,然後創建新的 dispatcherLeaderProcess,並調用 start 啓動。
- 啓動時,會回調 onStart 方法。
- 回調方法中,先啓動 executionPlanStore,它主要是用於持久化 JobGraph。然後恢復執行計劃,重建狀態(如果是從失敗中恢復),實例化 Dispatcher,完成作業啓動。
TaskManager 啓動流程
TaskManager 是 Flink 的執行節點,其最小執行單元是 slot。TaskManager 啓動流程也主要是和資源管理相關,包括 slot 列表的管理和與 ResourceManager 的通信。
TaskManager 啓動流程大體分為以下幾部分:
- 構建並啓動 TaskManagerRunner(藍色部分)
- 啓動 TaskExecutor(紅色部分)
- 完成與 ResourceManager 的連接(橙色部分)
啓動 TaskManagerRunner
在 TaskManagerRunner 的 start 方法中,有兩個步驟:
第一步是調用 startTaskManagerRunnerServices 創建和啓動了很多服務,這一點和 JobManager 的啓動流程比較像。這些服務包括了高可用服務、心跳服務、監控指標服務等,這裏也創建了 taskExecutorService,它的啓動在第二步。
第二步是調用 taskExecutorService.start 方法,啓動 TaskExecutorService,它內部主要負責啓動 TaskExecutor。
啓動 TaskExecutor
TaskExecutor 是 TaskManager 內部的一個核心組件,負責幫助 TaskManager 完成 task 的部署和執行等核心操作。
在上一步調用 taskExecutor 的 start 方法後,會回調 onStart 方法,這裏主要是三個步驟
- 連接 ResourceManager 以及註冊監聽
- 啓動 taskSlotTable
- 連接 JobMaster 以及註冊監聽
第一步我們在下面詳細解釋。第二步啓動的 TaskSlotTable 是 TaskManager 中負責資源的核心組件,它維護了一個 Slot 列表,管理每個 Slot 的狀態,負責 Slot 的分配和釋放。第三步主要是和 JobMaster 建立連接並保持心跳,同時也會接收 Slot 申請的請求。
連接 ResourceManager
TaskExecutor 註冊完監聽之後,會收到 ResourceManagerLeaderListener.notifyLeaderAddress 方法回調。回調方法中,會創建一個 TaskExecutorToResourceManagerConnection 實例並啓動它。這個類是用來將 TaskExecutor 註冊到 ResourceManager,註冊成功會回調 onRegistrationSuccess 方法。回調成功的方法中,TaskManager 會調用 resourceManagerGateway.sendSlotReport 將 Slot 的狀態進行上報。
總結
本文介紹了 Flink 集羣在 Standalone 模式下的啓動過程,其中 JobManager 重點介紹了 WebMonitorEndpoint、ResourceManager 和 DispatcherRunner 這三個組件的啓動過程。TaskManager 主要介紹了啓動 TaskExecutor 和連接 ResourceManager 的過程。