一、Flink部署
環境版本:
- Flink1.17.0
- hadoop2.7.6
- jdk1.8
1.集羣角色
- 客户端(client):代碼由客户端獲取並做轉換,之後提交給jobmanager
- Jobmanager就是Flink集羣裏的"管理者",對作業進行中央調度管理;獲取到客户端提交的任務後,進一步進行任務的拆分,將具體的執行邏輯分發給taskmaanger
- taskmanager,執行計算角色,數據的具體處理操作。
Flink支持不同的部署模式和資源平台。例如standalone模式、Yarn模式等,Job提交方式有Session模式、Per job模式、Application模式。
2.Flink集羣部署
利用Flink部署包部署的集羣是獨立於第三方資源管理器的。
集羣規格
|
hb1
|
hb2
|
hb3
|
|
jobmanager、taskmanager
|
taskmanager
|
taskmanager
|
2.1集羣配置
- 解壓
[jiang@hb1 ~]$ tar -zxvf flink-1.17.0-bin-scala_2.12.tgz /opt/module/
- 修改conf/flink-conf.yaml
# JobManager節點地址.
jobmanager.rpc.address: hb1
jobmanager.bind-host: 0.0.0.0
rest.address: hb1
rest.bind-address: 0.0.0.0
# TaskManager節點地址.需要配置為當前機器名
taskmanager.bind-host: 0.0.0.0
# 修改成taskmanager的主機地址,例如hb2節點改成hb2
taskmanager.host: hb1
- 配置master文件conf/masters
hb1
- 配置conf/workers
hb1
hb2
hb3
- 啓動flink集羣
[jiang@hb1 flink-1.17.0]$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hb1.
Starting taskexecutor daemon on host hb1.
Starting taskexecutor daemon on host hb2.
Starting taskexecutor daemon on host hb3.
[jiang@hb1 flink-1.17.0]$ jpsall
-------hb1---------
40101 StandaloneSessionClusterEntrypoint
40693 Jps
40473 TaskManagerRunner
-------hb2---------
27794 TaskManagerRunner
27980 Jps
-------hb3---------
45718 TaskManagerRunner
45878 Jps
[jiang@hb1 flink-1.17.0]$
- Dashboard訪問
可以看到可用的slot是3,3個taskmanager每個taskmanager提供一個slot。
slot參數可以修改 taskmanager.numberOfTaskSlots
2.2作業提交測試
1.準備Flink執行jar包。(後續會從0-1編寫)
bin/flink run -m hb1:8081 -d -c com.test.SocketApp Flink_Code-1.0-SNAPSHOT.jar
-m 指定集羣
-c 指定主類
-d 分離模式,客户端提交完成後斷開
執行日誌
2.dashboard觀察作業執行情況
使用flink部署包部署的集羣是standalone模式,提交作業是會話模式。
3.作業部署模式
3.1會話模式
Session Mode
- 集羣生命週期:先啓動集羣再提交任務,首先啓動一個運行的Flink集羣(會話集羣),然後將多個Flink作業提交到集羣上執行,所有作業執行完成後,集羣仍然繼續運行,等待新的作業提交
- 資源隔離:資源共享,所有提交到該集羣的作業共享該會話集羣的資源(TaskManager、JobManager)
執行流程:
- 啓動一個Flink會話集羣(一般利用standalone或yarn資源管理器)bin/start-cluster.sh在獨立部署模式下,bin/flink run -m yarn-cluster -d在Yarn資源管理器上
- 客户端將作業的Jar和依賴上傳到集羣的Jobmanager
- jobmanager接收到作業後向資源管理器申請作業運行所需的資源Slot
- Taskmanager提供slot來運行作業
- 作業運行完成後slot資源會釋放,但是taskmanager和jobmanager進行繼續運行不會停止
優缺點:
- 優點
- 資源複用、啓動快,不需要為每個作業都啓動集羣,作業提交速度快,適合需要頻繁提交短時間作業的場景
- 簡單直觀,管理和維護一個集羣
- 缺點
- 資源隔離性差,多個作業共享資源,可能會因異常作業導致taskmanager異常重啓而影響到其他作業
- 資源競爭,作業之間會因資源不足等待資源釋放,極端情況作業會一直等待
3.2單作業模式
Per-job Mode
單作業模式是為了解決會話模式資源共享、資源搶佔的。
集羣生命週期:一個作業,一個集羣。每提交一次作業,Flink會為其專門啓動一個集羣,當作業運行執行完成時,整個集羣的資源會被釋放。
資源隔離:完美隔離,每個作業擁有獨立的JobManager和TaskManager,作業之間不會相互受到干擾。
注意:圖中有很多組件(AppMaster、Container等)並未繪製,只是為了描述單作業模式大概的流程,注意區分。
執行流程:
- 客户端提交作業,bin/flink run -m yarn-cluster -d 在yarn上,yarn資源調度管理器會識別這是一個per-job模式
- 資源管理器首先會分配容器啓動一個該作業專屬的jobmanager
- 該jobmanager與client通信,接收客户端提交的作業(業務邏輯以及依賴)
- 通過解析客户端提交的作業,得到所需的資源後再向資源管理器申請資源所需的taskmanager(slot資源)
- 資源管理器分配作業所需資源並啓動taskmanager
- jobmanager再將作業執行的操作分發給taskmanager進行處理
- 作業運行完成後,taskmanager和jobmanager資源釋放
優缺點:
- 優點
- 優秀的資源隔離,作業之間互不干擾,一個作業失敗不會影響到其他作業
- 資源按需分配,每個作業能夠獲取到運行所需的資源,避免了資源競爭
- 缺點
- 集羣啓動開銷大,每個作業啓動獨立的集羣,啓動延遲較高
- 對資源管理器壓力大,作業高峯時存在頻繁的資源分配和資源銷燬的動作,給資源管理器帶來巨大的壓力
使用場景:一般用於生產環境,有較高穩定性和資源隔離性的要求。
3.3應用模式
application Mode
集羣生命週期:一個應用,一個集羣。這裏的應用通常指的是1個或多個作業組成的應用程序的jar包。集羣是為這個應用jar專門啓動,應用執行完成後,集羣資源釋放。
資源隔離:應用級別隔離。不同的應用運行環境是隔離的
與單作業模式和會話模式的核心區別:在會話和單作業模式下,main方法是在客户端執行的,然後由客户端下載作業依賴解析生成作業執行圖(JobGraph)然後提交給JobManager。而應用模式客户端只需要將依賴和作業jar提交給jobmanager,由jobmanager解析生成作業執行圖(JobGraph)。
執行流程:
- 客户端提交應用(指定Application mode)
- 資源管理器為應用啓動專門的集羣,首先啓動Jobmanager
- 客户端將應用jar和所有依賴提交到該Jobmanager
- Jobmanager端執行:Jobmanager進程調用應用的main方法,生成JobGraph,意味着依賴和解析發生在集羣內部,而非客户端。
- 集羣執行該應用的所有作業
- 應用執行結束,集羣資源釋放
優缺點:
- 優點
- 解耦客户端,客户端只需要提交應用jar和依賴即可斷開連接,極大的降低了客户端資源消耗和網絡帶寬的需求
- 擁有單作業資源隔離的優點
- 缺點
- 如果同一個應用被多次提交,會啓動多個獨立的集羣(後面説明)
使用場景:客户端資源有限制或者Rest API進行作業部署的場景。
4.Yarn運行模式
YARN上部署的過程是:客户端把Flink應用提交給Yarn的ResourceManager,Yarn的ResourceManager會向Yarn的NodeManager申請容器。在這些容器上,Flink會部署JobManager和TaskManager的實例,從而啓動集羣。Flink會根據運行在JobManger上的作業所需要的Slot數量動態分配TaskManager資源。
這個過程在3.2單作業模式模式的執行過程以及描述的比較清楚。
4.1相關配置
將Flink作業提交到Hadoop集羣的Yarn資源管理運行,需要進行相關配置才能實現。
環境要求:hadoop
具體配置
1)添加Hadoop環境變量/etc/profile或~/.bash_profile
#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-2.7.6
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`
2)啓動hadoop集羣
start-dfs.sh
start-yarn.sh
4.2 單作業模式提交
單作業模式提交依賴於第三方,本節利用Yarn資源管理器來進行單作業模式提交,從而運行Flink集羣
- 提交命令
bin/flink run -m yarn-cluster -yqu default -ynm flink-socket-test -ys 2 -ytm 2048 -yjm 2048 -d -c com.test.SocketApp Flink_Code-1.0-SNAPSHOT.jar
參數解釋:
- -d 分離模式,任務提交完成之後,客户端斷開
- -m 指定提交模式yarn-cluster
- -yq 指定提交yarn隊列名稱
- -ynm 指定作業名稱
- -ys 指定作業taskmanager的slot數量
- -ytm 指定taskmanager內存大小
- -yjm指定jobmanager內存大小
2)在yarn界面觀察是否提交上去
Name就是在命令提交的時候指定的-jnm
3)查看flink dashboard任務
從上圖可以看到,總共有兩個slot,但是作業裏面只使用了1個slot,因為代碼裏面指定了並行度=1。後續會説明並行度的優先級。
4)取消作業或停止作業
#通過flink命令取消作業
bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1762323629516_0002
#通過yarn的命令直接停止
yarn application -kill application_1762323629516_0002
當執行完命令後,該flink集羣就會停止,資源釋放歸還給ResourceManager
4.3會話模式提交
會話模式Session,在Flink集羣部署章節中,作業提交測試就是會話模式,首先啓動一個集羣,再提交作業到該集羣進行運行。但是yarn-Session與獨立模式還是有所不同,yarn-session是向ResouorceManager申請容器,NodeManager創建容器後啓動JobManager服務,等待客户端提交作業
1)啓動yarn-session
bin/yarn-session.sh -d -nm session-name -qu default -tm 1024 -jm 1024 -s 2
參數解釋:
- -d 分離模式,如果你不想讓Flink YARN客户端一直前台運行,可以使用這個參數,即使關掉當前對話窗口,YARN session也可以後台運行
- -jm,配置jobmanager內存大小
- -nm,配置yarn-session該集羣的應用名稱
- -qu,指定提交隊列名稱
- tm,指定taskmanager內存大小
Yarn-session模式會根據需要動態的分配taskmanager數量
2)提交作業
執行以下命令將該任務提交到已經開啓的Yarn-Session中運行
bin/flink run -d -c com.test.SocketApp Flink_Code-1.0-SNAPSHOT.jar
3)查看flink dashboard
一共提交了兩個作業,共用當前session資源
客户端也可以查詢jobmanager地址,提交的時候通過-m指定jobmanager進行作業提交,通過dashboard下jobmanager的configuration可以找到。
bin/flink run -m hb1 -d -c com.test.SocketApp Flink_Code-1.0-SNAPSHOT.jar
4.4 應用模式提交
應用模式提交跟單作業模式類似,只是傳遞參數不一樣。
bin/flink run-application -t yarn-application -d -c com.test.SocketApp Flink_Code-1.0-SNAPSHOT.jar
參數解釋
- -d 分離模式
- -c 指定運行主類
- -t 指定部署目標模式 yarn-application
查看yarn dashboard
應用模型運行時不依賴於客户端解析作業的邏輯執行圖,而是jobmanager運行main進行解析,那麼客户端的作用往往就是上傳依賴和作業的jar包,在flink提交的設計中,在指定目標運行jar時,可以直接指定hdfs路徑,從而減輕客户端上傳作業jar包的過程。
1)上傳Jar到HDFS路徑
#創建flink依賴存儲路徑
[jiang@hb1 flink-1.17.0]$ hdfs dfs -mkdir -p /flink/remote_lib
[jiang@hb1 flink-1.17.0]$ hdfs dfs -put lib/ /flink/remote_lib
[jiang@hb1 flink-1.17.0]$ hdfs dfs -put plugins/ /flink/remote_lib
#創建運行jar存儲路徑
[jiang@hb1 flink-1.17.0]$ hdfs dfs -mkdir -p /flink/jars
[jiang@hb1 flink-1.17.0]$ hdfs dfs -put Flink_Code-1.0-SNAPSHOT.jar /flink/jars
2)提交作業
bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://hb1:9000/flink/remote_lib" -c com.test.SocketApp hdfs://hb1:9000/flink/jars/Flink_Code-1.0-SNAPSHOT.jar
這種方式就比較輕量級了。
日誌如下:
[jiang@hb1 flink-1.17.0]$ bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://hb1:9000/flink/remote_lib" -c com.test.SocketApp hdfs://hb1:9000/flink/jars/Flink_Code-1.0-SNAPSHOT.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-1.17.0/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.6/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2025-11-05 15:58:53,567 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-jiang.
2025-11-05 15:58:53,567 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-jiang.
2025-11-05 15:58:53,778 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/opt/module/flink-1.17.0/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2025-11-05 15:58:53,884 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at hb1/192.168.100.131:8032
2025-11-05 15:58:54,066 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2025-11-05 15:58:54,187 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2025-11-05 15:58:54,187 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2025-11-05 15:58:54,187 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
2025-11-05 15:58:55,070 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
2025-11-05 15:58:55,081 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1762327641882_0005
2025-11-05 15:58:55,174 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1762327641882_0005
2025-11-05 15:58:55,174 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated
2025-11-05 15:58:55,182 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED
2025-11-05 15:59:06,307 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
2025-11-05 15:59:06,308 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface hb3:41495 of application 'application_1762327641882_0005'.
[jiang@hb1 flink-1.17.0]$
以上是個人理解,如有問題可溝通