Flink 提交模式,你了解多少?
前言
寫這種文章的初衷就是,在開發(fā)的過程中不知道怎么去選擇,各種模式不是太充分了解?,F(xiàn)在花點時間去找資料研究對比了一番,所以出此篇。
此篇以先以了解flink組件開始,再以簡單模式Local 和 Standlone 正式進入正題。本篇主要是以Yarn 方式下三種模式展開細講,當(dāng)然還有Kubernetes方式(本篇不細說)。
組件
在了解提交模式之前,先了解一下Flink組件與組件之間的協(xié)作關(guān)系。
資源管理器(Resource Manager)
(1)主要負責(zé)管理任務(wù)管理器TaskManager的插槽slot。
(2) 當(dāng)作業(yè)管理器JM申請插槽資源時, RM會將有空閑插槽的TM分配給JM。如果 RM沒有足夠的插槽來滿足JM的請求。
(3)它還可以向資源提供平臺發(fā)起會話,以提供啟動 TM進程的容器。
作業(yè)管理器(JobManager)
(1) 控制一個應(yīng)用程序執(zhí)行的主進程,也就是說,每個應(yīng)用程序 都會被一個不同的JM所控制執(zhí)行。
(2) JM會先接收到要執(zhí)行的應(yīng)用程序,這個應(yīng)用程序會包括:作業(yè)圖(Job Graph)、邏輯數(shù)據(jù)流圖( ogical dataflow graph)和打包了所有的類、庫和其它資源的JAR包。
(3) JM會把 Jobgraph轉(zhuǎn)換成一個物理層面的 數(shù)據(jù)流圖,這個圖被叫做 “執(zhí)行圖”(Executiongraph),包含了所有可以并發(fā)執(zhí)行的任務(wù)。Job Manager會向資源管理器( Resourcemanager)請求執(zhí)行任務(wù)必要的資源,也就是 任務(wù)管理器(Taskmanager)上的插槽slot。一旦它獲取到了足夠的資源,就會將執(zhí)行圖分發(fā)到真正運行它們的TM上。而在運行過程中JM會負責(zé)所有需要中央?yún)f(xié)調(diào)的操作,比如說檢查點(checkpoints)的協(xié)調(diào)。
任務(wù)管理器(Taskmanager)
(1) Flink中的工作進程。通常在 Flink中會有多個TM運行, 每個TM都包含了一定數(shù)量的插槽slots。插槽的數(shù)量限制了TM能夠執(zhí)行的任務(wù)數(shù)量。
(2) 啟動之后,TM會向資源管理器注冊它的插槽;收到資源管理器的指令后, TM就會將一個或者多個插槽提供給JM調(diào)用。TM就可以向插槽分配任務(wù)tasks來執(zhí)行了。
(3) 在執(zhí)行過程中, 一個TM可以跟其它運行同一應(yīng)用程序的TM交換數(shù)據(jù)。
分發(fā)器(Dispatcher)
(1)可以跨作業(yè)運行,它為應(yīng)用提交提供了REST接口。
(2)當(dāng)一個應(yīng)用被提交執(zhí)行時,分發(fā)器就會啟動并將應(yīng)用移交給JM。
(3)Dispatcher他會啟動一個 WebUi,用來方便地 展示和監(jiān)控作業(yè)執(zhí)行的信息。
Local模式
JobManager 和 TaskManager 共用一個 JVM,只需要jdk支持,單節(jié)點運行,主要用來調(diào)試。
Standlone模式
Standlone 是Flink自帶的一個分布式集群,它不依賴其他的資源調(diào)度框架、不依賴yarn 等。
充當(dāng)Master角色的是JobManager。
充當(dāng)Slave/Worker角色是TaskManager
配置與啟動
(1)conf 目錄下有兩個文件:masters 和 workers 指定地址。
(2)需要配置 conf/flink-conf.yaml 的自行配置。
(3)分發(fā)各個機器。
(4)啟動集群 bin/start-cluster.sh
(5)提交任務(wù) flink run
Yarn 模式
首先認識下提交流程
(1)提交App之前,先上傳Flink的Jar包和配置到HDFS,以便JobManager和TaskManager共享HDFS的數(shù)據(jù)。
(2)客戶端向ResourceManager提交Job,ResouceManager接到請求后,先分配container資源,然后通知NodeManager啟動ApplicationMaster。
(3)ApplicationMaster會加載HDFS的配置,啟動對應(yīng)的JobManager,然后JobManager會分析當(dāng)前的作業(yè)圖,將它轉(zhuǎn)化成執(zhí)行圖(包含了所有可以并發(fā)執(zhí)行的任務(wù)),從而知道當(dāng)前需要的具體資源。
(4)接著,JobManager會向ResourceManager申請資源,ResouceManager接到請求后,繼續(xù)分配container資源,然后通知ApplictaionMaster啟動更多的TaskManager(先分配好container資源,再啟動TaskManager)。container在啟動TaskManager時也會從HDFS加載數(shù)據(jù)。
(5)TaskManager啟動后,會向JobManager發(fā)送心跳包。JobManager向TaskManager分配任務(wù)。
Session Mode
Session模式提前初始化好一個集群,然后向這個集群提交應(yīng)用。所有應(yīng)用都在同一個集群中執(zhí)行,共享資源。這里JobManager僅有一個。提交到這個集群的作業(yè)可以直接運行。如圖所示
Session模式共享Dispatcher 和 ResourceManager,作業(yè)共享集群資源。
Session多個作業(yè)之間又不是隔離的,如果有一個TaskManager掛掉,它上面承載著的所有作業(yè)也會失敗。同樣來說,啟動的Job任務(wù)越多,JobManager的負載也就越大。
所以,Session模式適合生命周期短資源消耗低的場景。
提交
- ./bin/flink run -t yarn-session \
- -Dyarn.application.id=application_XXXX_YY \
- ./examples/streaming/TopSpeedWindowing.jar
Per-Job Cluster Mode
在Per-Job模式下,每個提交到Y(jié)ARN上的作業(yè)會有單獨的Flink集群,擁有專屬的JobManager和TaskManager。也即:一個作業(yè)一個集群,作業(yè)之間相互隔離。
以Per-Job模式提交作業(yè)的啟動延遲可能會較高,因為不需要共享集群,所以在PipelineExecutor中執(zhí)行作業(yè)提交的時候,創(chuàng)建集群并將JobGraph以及所需要的文件等一同提交給Yarn集群,進行一系列的初始化動作,這個時候需要些時間。提交任務(wù)的時候會把本地flink的所有jar包先上傳到hdfs上相應(yīng)的臨時目錄,這個也會帶來大量的網(wǎng)絡(luò)的開銷。
優(yōu)點就是作業(yè)之間的資源完全隔離,一個作業(yè)的TaskManager失敗不會影響其他作業(yè)的運行,JobManager的負載也是分散開來的,不存在單點問題。當(dāng)作業(yè)運行完成,與它關(guān)聯(lián)的集群也就被銷毀,資源被釋放。
所以,Per-Job模式一般用來部署那些長時間運行的作業(yè)。
提交
- /bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
「其他操作」
- # List running job on the cluster
- ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
- # Cancel running job
- ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
Application Mode
Application 模式嘗試去將per-job 模式的資源隔離性和輕量級,可擴展的應(yīng)用提交進程相結(jié)合。為了實現(xiàn)這個目的,它會每個Job 創(chuàng)建一個集群,但是 應(yīng)用的main()將被在JobManager 執(zhí)行。
Application 模式為每個提交的應(yīng)用程序創(chuàng)建一個集群,該集群可以看作是在特定應(yīng)用程序的作業(yè)之間共享的會話集群,并在應(yīng)用程序完成時終止。在這種體系結(jié)構(gòu)中Application模式在不同應(yīng)用之間提供了資源隔離和負載平衡保證
在JobManager 中執(zhí)行main()方法,可以節(jié)省所需的CPU周期。還有個好處就是,由于每個應(yīng)用程序有一個JobManager,因此可以更平均地分散網(wǎng)絡(luò)負載。
提交
- ./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
「其他操作」
- # List running job on the cluster
- ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
- # Cancel running job
- ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
Application mode中的多個job,實際在代碼上的表現(xiàn)就是能夠允許在一個Application里面調(diào)用多次execute/executeAsyc方法。但是execute方法會被阻塞,也就是只有一個job完成之后才能繼續(xù)下一個job的execute,但是可以通過executeAsync進行異步非阻塞執(zhí)行。
Yarn 模式總結(jié)
模式 | 生命周期 | 資源隔離 | 優(yōu)點 | 缺點 | main方法 |
---|---|---|---|---|---|
Session | 關(guān)閉會話,才會停止 | 共用JM和TM | 預(yù)先啟動,啟動作業(yè)不再啟動。資源充分共享 | 資源隔離比較差,TM不容易擴展 | 在客戶端執(zhí)行 |
Per-job | Job停止,集群停止 | 單個Job獨享JM和TM | 充分隔離,資源根據(jù)job按需申請 | job啟動慢,每個job需要啟動一個JobManager | 在客戶端執(zhí)行 |
Application | 當(dāng)Application全部執(zhí)行完,集群才會停止 | Application使用一套JM和TM | Client負載低,Application之間實現(xiàn)資源隔離,Application內(nèi)實現(xiàn)資源共享 | 對per-job模式和session模式的優(yōu)化部署模式(優(yōu)點) | 在Cluster中 |