一、Flink部署

環境版本:

  • Flink1.17.0
  • hadoop2.7.6
  • jdk1.8

1.集羣角色

  • 客户端(client):代碼由客户端獲取並做轉換,之後提交給jobmanager
  • Jobmanager就是Flink集羣裏的"管理者",對作業進行中央調度管理;獲取到客户端提交的任務後,進一步進行任務的拆分,將具體的執行邏輯分發給taskmaanger
  • taskmanager,執行計算角色,數據的具體處理操作。

Flink支持不同的部署模式和資源平台。例如standalone模式、Yarn模式等,Job提交方式有Session模式、Per job模式、Application模式。

大數據Flink進階(十三):Flink 任務提交模式-_客户端

2.Flink集羣部署

利用Flink部署包部署的集羣是獨立於第三方資源管理器的。

集羣規格

hb1

hb2

hb3

jobmanager、taskmanager

taskmanager

taskmanager

2.1集羣配置

  1. 解壓
[jiang@hb1 ~]$ tar -zxvf flink-1.17.0-bin-scala_2.12.tgz /opt/module/
  1. 修改conf/flink-conf.yaml
# JobManager節點地址.
jobmanager.rpc.address: hb1
jobmanager.bind-host: 0.0.0.0
rest.address: hb1
rest.bind-address: 0.0.0.0
# TaskManager節點地址.需要配置為當前機器名
taskmanager.bind-host: 0.0.0.0
# 修改成taskmanager的主機地址,例如hb2節點改成hb2
taskmanager.host: hb1
  1. 配置master文件conf/masters
hb1
  1. 配置conf/workers
hb1
hb2
hb3
  1. 啓動flink集羣
[jiang@hb1 flink-1.17.0]$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hb1.
Starting taskexecutor daemon on host hb1.
Starting taskexecutor daemon on host hb2.
Starting taskexecutor daemon on host hb3.
[jiang@hb1 flink-1.17.0]$ jpsall
-------hb1---------
40101 StandaloneSessionClusterEntrypoint
40693 Jps
40473 TaskManagerRunner
-------hb2---------
27794 TaskManagerRunner
27980 Jps
-------hb3---------
45718 TaskManagerRunner
45878 Jps
[jiang@hb1 flink-1.17.0]$
  1. Dashboard訪問

大數據Flink進階(十三):Flink 任務提交模式-_jar_02

可以看到可用的slot是3,3個taskmanager每個taskmanager提供一個slot。

slot參數可以修改 taskmanager.numberOfTaskSlots

2.2作業提交測試

1.準備Flink執行jar包。(後續會從0-1編寫)

bin/flink run -m hb1:8081 -d -c com.test.SocketApp Flink_Code-1.0-SNAPSHOT.jar

-m 指定集羣

-c 指定主類

-d 分離模式,客户端提交完成後斷開

執行日誌

大數據Flink進階(十三):Flink 任務提交模式-_flink_03

2.dashboard觀察作業執行情況

大數據Flink進階(十三):Flink 任務提交模式-_jar_04

使用flink部署包部署的集羣是standalone模式,提交作業是會話模式。

3.作業部署模式

3.1會話模式

Session Mode

  • 集羣生命週期:先啓動集羣再提交任務,首先啓動一個運行的Flink集羣(會話集羣),然後將多個Flink作業提交到集羣上執行,所有作業執行完成後,集羣仍然繼續運行,等待新的作業提交
  • 資源隔離:資源共享,所有提交到該集羣的作業共享該會話集羣的資源(TaskManager、JobManager)

大數據Flink進階(十三):Flink 任務提交模式-_flink_05

執行流程:

  1. 啓動一個Flink會話集羣(一般利用standalone或yarn資源管理器)bin/start-cluster.sh在獨立部署模式下,bin/flink run -m yarn-cluster -d在Yarn資源管理器上
  2. 客户端將作業的Jar和依賴上傳到集羣的Jobmanager
  3. jobmanager接收到作業後向資源管理器申請作業運行所需的資源Slot
  4. Taskmanager提供slot來運行作業
  5. 作業運行完成後slot資源會釋放,但是taskmanager和jobmanager進行繼續運行不會停止

優缺點:

  • 優點
  • 資源複用、啓動快,不需要為每個作業都啓動集羣,作業提交速度快,適合需要頻繁提交短時間作業的場景
  • 簡單直觀,管理和維護一個集羣
  • 缺點
  • 資源隔離性差,多個作業共享資源,可能會因異常作業導致taskmanager異常重啓而影響到其他作業
  • 資源競爭,作業之間會因資源不足等待資源釋放,極端情況作業會一直等待

3.2單作業模式

Per-job Mode

單作業模式是為了解決會話模式資源共享、資源搶佔的。

集羣生命週期:一個作業,一個集羣。每提交一次作業,Flink會為其專門啓動一個集羣,當作業運行執行完成時,整個集羣的資源會被釋放。

資源隔離:完美隔離,每個作業擁有獨立的JobManager和TaskManager,作業之間不會相互受到干擾。

大數據Flink進階(十三):Flink 任務提交模式-_flink_06

注意:圖中有很多組件(AppMaster、Container等)並未繪製,只是為了描述單作業模式大概的流程,注意區分。

執行流程:

  1. 客户端提交作業,bin/flink run -m yarn-cluster -d 在yarn上,yarn資源調度管理器會識別這是一個per-job模式
  2. 資源管理器首先會分配容器啓動一個該作業專屬的jobmanager
  3. 該jobmanager與client通信,接收客户端提交的作業(業務邏輯以及依賴)
  4. 通過解析客户端提交的作業,得到所需的資源後再向資源管理器申請資源所需的taskmanager(slot資源)
  5. 資源管理器分配作業所需資源並啓動taskmanager
  6. jobmanager再將作業執行的操作分發給taskmanager進行處理
  7. 作業運行完成後,taskmanager和jobmanager資源釋放

優缺點:

  • 優點
  • 優秀的資源隔離,作業之間互不干擾,一個作業失敗不會影響到其他作業
  • 資源按需分配,每個作業能夠獲取到運行所需的資源,避免了資源競爭
  • 缺點
  • 集羣啓動開銷大,每個作業啓動獨立的集羣,啓動延遲較高
  • 對資源管理器壓力大,作業高峯時存在頻繁的資源分配和資源銷燬的動作,給資源管理器帶來巨大的壓力

使用場景:一般用於生產環境,有較高穩定性和資源隔離性的要求。

3.3應用模式

application Mode

集羣生命週期:一個應用,一個集羣。這裏的應用通常指的是1個或多個作業組成的應用程序的jar包。集羣是為這個應用jar專門啓動,應用執行完成後,集羣資源釋放。

資源隔離:應用級別隔離。不同的應用運行環境是隔離的

與單作業模式和會話模式的核心區別:在會話和單作業模式下,main方法是在客户端執行的,然後由客户端下載作業依賴解析生成作業執行圖(JobGraph)然後提交給JobManager。而應用模式客户端只需要將依賴和作業jar提交給jobmanager,由jobmanager解析生成作業執行圖(JobGraph)。

執行流程:

  1. 客户端提交應用(指定Application mode)
  2. 資源管理器為應用啓動專門的集羣,首先啓動Jobmanager
  3. 客户端將應用jar和所有依賴提交到該Jobmanager
  4. Jobmanager端執行:Jobmanager進程調用應用的main方法,生成JobGraph,意味着依賴和解析發生在集羣內部,而非客户端。
  5. 集羣執行該應用的所有作業
  6. 應用執行結束,集羣資源釋放

優缺點:

  • 優點
  • 解耦客户端,客户端只需要提交應用jar和依賴即可斷開連接,極大的降低了客户端資源消耗和網絡帶寬的需求
  • 擁有單作業資源隔離的優點
  • 缺點
  • 如果同一個應用被多次提交,會啓動多個獨立的集羣(後面説明)

使用場景:客户端資源有限制或者Rest API進行作業部署的場景。

4.Yarn運行模式

YARN上部署的過程是:客户端把Flink應用提交給Yarn的ResourceManager,Yarn的ResourceManager會向Yarn的NodeManager申請容器。在這些容器上,Flink會部署JobManager和TaskManager的實例,從而啓動集羣。Flink會根據運行在JobManger上的作業所需要的Slot數量動態分配TaskManager資源。

這個過程在3.2單作業模式模式的執行過程以及描述的比較清楚。

4.1相關配置

將Flink作業提交到Hadoop集羣的Yarn資源管理運行,需要進行相關配置才能實現。

環境要求:hadoop

具體配置

1)添加Hadoop環境變量/etc/profile或~/.bash_profile

#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-2.7.6
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`

2)啓動hadoop集羣

start-dfs.sh
start-yarn.sh

大數據Flink進階(十三):Flink 任務提交模式-_jar_07

4.2 單作業模式提交

單作業模式提交依賴於第三方,本節利用Yarn資源管理器來進行單作業模式提交,從而運行Flink集羣

  1. 提交命令
bin/flink run -m yarn-cluster -yqu default -ynm flink-socket-test -ys 2 -ytm 2048 -yjm 2048 -d -c com.test.SocketApp Flink_Code-1.0-SNAPSHOT.jar

參數解釋:

  • -d 分離模式,任務提交完成之後,客户端斷開
  • -m 指定提交模式yarn-cluster
  • -yq 指定提交yarn隊列名稱
  • -ynm 指定作業名稱
  • -ys 指定作業taskmanager的slot數量
  • -ytm 指定taskmanager內存大小
  • -yjm指定jobmanager內存大小

2)在yarn界面觀察是否提交上去

大數據Flink進階(十三):Flink 任務提交模式-_flink_08

Name就是在命令提交的時候指定的-jnm

3)查看flink dashboard任務

大數據Flink進階(十三):Flink 任務提交模式-_客户端_09

從上圖可以看到,總共有兩個slot,但是作業裏面只使用了1個slot,因為代碼裏面指定了並行度=1。後續會説明並行度的優先級。

4)取消作業或停止作業

#通過flink命令取消作業
bin/flink cancel  -t yarn-per-job -Dyarn.application.id=application_1762323629516_0002
#通過yarn的命令直接停止
yarn application -kill application_1762323629516_0002

當執行完命令後,該flink集羣就會停止,資源釋放歸還給ResourceManager

4.3會話模式提交

會話模式Session,在Flink集羣部署章節中,作業提交測試就是會話模式,首先啓動一個集羣,再提交作業到該集羣進行運行。但是yarn-Session與獨立模式還是有所不同,yarn-session是向ResouorceManager申請容器,NodeManager創建容器後啓動JobManager服務,等待客户端提交作業

1)啓動yarn-session

bin/yarn-session.sh -d -nm session-name -qu default -tm 1024 -jm 1024 -s 2

參數解釋:

  • -d 分離模式,如果你不想讓Flink YARN客户端一直前台運行,可以使用這個參數,即使關掉當前對話窗口,YARN session也可以後台運行
  • -jm,配置jobmanager內存大小
  • -nm,配置yarn-session該集羣的應用名稱
  • -qu,指定提交隊列名稱
  • tm,指定taskmanager內存大小

Yarn-session模式會根據需要動態的分配taskmanager數量

2)提交作業

執行以下命令將該任務提交到已經開啓的Yarn-Session中運行

bin/flink run  -d -c com.test.SocketApp Flink_Code-1.0-SNAPSHOT.jar

3)查看flink dashboard

一共提交了兩個作業,共用當前session資源

大數據Flink進階(十三):Flink 任務提交模式-_jar_10

客户端也可以查詢jobmanager地址,提交的時候通過-m指定jobmanager進行作業提交,通過dashboard下jobmanager的configuration可以找到。

大數據Flink進階(十三):Flink 任務提交模式-_客户端_11

bin/flink run -m hb1 -d -c com.test.SocketApp Flink_Code-1.0-SNAPSHOT.jar

4.4 應用模式提交

應用模式提交跟單作業模式類似,只是傳遞參數不一樣。

bin/flink run-application -t yarn-application -d -c com.test.SocketApp Flink_Code-1.0-SNAPSHOT.jar

參數解釋

  • -d 分離模式
  • -c 指定運行主類
  • -t 指定部署目標模式 yarn-application

查看yarn dashboard

大數據Flink進階(十三):Flink 任務提交模式-_flink_12

應用模型運行時不依賴於客户端解析作業的邏輯執行圖,而是jobmanager運行main進行解析,那麼客户端的作用往往就是上傳依賴和作業的jar包,在flink提交的設計中,在指定目標運行jar時,可以直接指定hdfs路徑,從而減輕客户端上傳作業jar包的過程。

1)上傳Jar到HDFS路徑

#創建flink依賴存儲路徑
[jiang@hb1 flink-1.17.0]$ hdfs dfs -mkdir -p /flink/remote_lib
[jiang@hb1 flink-1.17.0]$ hdfs dfs -put lib/ /flink/remote_lib
[jiang@hb1 flink-1.17.0]$ hdfs dfs -put plugins/ /flink/remote_lib
#創建運行jar存儲路徑
[jiang@hb1 flink-1.17.0]$ hdfs dfs -mkdir -p /flink/jars
[jiang@hb1 flink-1.17.0]$ hdfs dfs -put Flink_Code-1.0-SNAPSHOT.jar /flink/jars

2)提交作業

bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://hb1:9000/flink/remote_lib" -c com.test.SocketApp   hdfs://hb1:9000/flink/jars/Flink_Code-1.0-SNAPSHOT.jar

這種方式就比較輕量級了。

日誌如下:

[jiang@hb1 flink-1.17.0]$ bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://hb1:9000/flink/remote_lib" -c com.test.SocketApp   hdfs://hb1:9000/flink/jars/Flink_Code-1.0-SNAPSHOT.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-1.17.0/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.6/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2025-11-05 15:58:53,567 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-jiang.
2025-11-05 15:58:53,567 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-jiang.
2025-11-05 15:58:53,778 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/module/flink-1.17.0/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2025-11-05 15:58:53,884 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at hb1/192.168.100.131:8032
2025-11-05 15:58:54,066 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2025-11-05 15:58:54,187 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured JobManager memory is 1600 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 448 MB may not be used by Flink.
2025-11-05 15:58:54,187 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - The configured TaskManager memory is 1728 MB. YARN will allocate 2048 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 320 MB may not be used by Flink.
2025-11-05 15:58:54,187 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=1600, taskManagerMemoryMB=1728, slotsPerTaskManager=1}
2025-11-05 15:58:55,070 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
2025-11-05 15:58:55,081 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1762327641882_0005
2025-11-05 15:58:55,174 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1762327641882_0005
2025-11-05 15:58:55,174 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2025-11-05 15:58:55,182 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2025-11-05 15:59:06,307 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2025-11-05 15:59:06,308 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface hb3:41495 of application 'application_1762327641882_0005'.
[jiang@hb1 flink-1.17.0]$

以上是個人理解,如有問題可溝通