網易游戲 Flink SQL 平臺化實踐
?摘要:本文整理自網易游戲資深開發工程師林小鉑在 Flink Forward Asia 2021 平臺建設專場的演講。主要內容包括:
- 網易游戲 Flink SQL 發展歷程
- 基于模板 jar 的 StreamflySQL v1
- 基于 SQL Gateway 的 StreamflySQL v2
- 未來工作?
01網易游戲 Flink SQL 發展歷程
網易游戲實時計算平臺叫做 Streamfly,這個名字取名自電影《馴龍高手》中的 Stormfly。由于我們已經在從 Storm 遷移到 Flink,所以將 Stormfly 中的 Storm 替換成了更為通用的 Stream。
Streamfly 前身是離線作業平臺 Omega 下的名為 Lambda 的子系統,它負責了所有實時作業的調度,最開始開始支持 Storm 和 Spark Streaming,后來改為只支持 Flink。在 2019 年的時候我們將 Lambda 獨立出來以此為基礎建立了 Streamfly 計算平臺。隨后,我們在 2019 年底開發并上線了第一個版本 Flink SQL 平臺 StreamflySQL。這個版本基于模板 jar 提供了基本 Flink SQL 的功能,但是用戶體驗還有待提升,因此我們在 2021 年年初從零開始重新建設了第二個版本的 StreamflySQL,而第二個版本是基于 SQL Gateway。
要了解這兩個版本的不同,我們需要先回顧下 Flink SQL 的基本工作流程。
用戶提交的 SQL 首先會被 Parser 解析為邏輯執行計劃;邏輯執行計劃經過 Planner Optimizer 優化,會生成物理執行計劃;物理執行計劃再通過 Planner CodeGen 代碼生成,翻譯為 DataStream API 常見的 Transformation;最后 StreamGraphGenerator 會將這些 Transformation 轉換為 Flink 作業的最終表示 JobGraph 提交到 Flink 集群。
上述一系列過程都發生在 TableEnvironment 里面。取決于部署模式的不同,TableEnvironment 可能運行在 Flink Client 或者 JobManager 里。Flink 現在支持 3 種集群部署模式,包括 Application、 Per-Job 和 Session 模式。在 Application 模式下,TableEnvironment 會在 JobManager 端運行,而在其余兩種模式下,TableEnvironment 都運行在 Client 端。不過這三種模式都有一個共同的特點,TableEnvironment 都是一次性的,會在提交 JobGraph 之后自動退出。
為了更好地復用 TableEnvironment 提高效率和提供有狀態的操作,有的項目會將 TableEnvironment 放到一個新的獨立 Server 端進程里面去運行,由此產生了一種新的架構,我們稱之為 Server 端 SQL 編譯。相對地,還有 Client 端 SQL 編譯。
有同學可能會問,為什么沒有 JobManager 端 SQL 編譯,這是因為 JobManager 是相對封閉的組件,不適合拓展,而且即使做了達到的效果跟 Client 端編譯效果基本一樣。所以總體來看,一般就有 Client 和 Server 兩種常見的 Flink SQL 平臺架構。
Client 端 SQL 編譯,顧名思義就是 SQL 的解析翻譯優化都在 Client 端里進行(這里的 Client 是廣義的 Client,并不一定是 Flink Client)。典型的案例就是通用模板 jar 和 Flink 的 SQL Client。這種架構的優點是開箱即用,開發成本低,而且使用的是 Flink public 的 API,版本升級比較容易;缺點是難以支持高級的功能,而且每次都要先啟動一個比較重的 TableEnvironment 所以性能比較差。
然后是 Server 端 SQL 編輯。這種架構將 SQL 解析翻譯優化邏輯放到一個獨立的 Server 進程去進行,讓 Client 變得非常輕,比較接近于傳統數據庫的架構。典型的案例是 Ververica 的 SQL Gateway。這種架構的優點是可拓展性好,可以支持很多定制化功能,而且性能好;缺點則是現在開源界沒有成熟的解決方案,像上面提到 SQL Gateway 只是一個比較初期的原型系統,缺乏很多企業級特性,如果用到生產環境需要經過一定的改造,而且這些改造涉及比較多 Flink 內部 API,需要比較多 Flink 的背景知識,總體來說開發成本比較高,而且后續版本升級工作量也比較大。
回到我們 Flink SQL 平臺,我們 StreamflySQL v1 是基于 Client 端 SQL 編譯,而 v2 是基于 Server 端的 SQL 編譯。下面就讓我逐個介紹一下。
02基于模板 jar 的 StreamflySQL v1
StreamflySQL v1 選擇 Client 端 SQL 編譯的主要原因有三個:
- 首先是平臺集成。不同于很多公司的作業調度器用大數據中比較主流的 Java 編寫,我們的 Lambda 調度器是用 Go 開發的。這是因為 Lambda 在設計之初支持了多種實時計算框架,出于松耦合和公司技術棧的考慮,Lambda 以 Go 作為開發語言,會采用與 YARN 類似的動態生成 Shell 腳本的方式來調用不同框架的命令行接口。這樣松耦合的接口方式給我們帶來很大的靈活性,比如我們可以輕松支持多個版本的 Flink,不需要強制用戶隨著系統版本升級,但同時也導致沒辦法直接去調用 Flink 原生的 Java API。
- 第二個原因是松耦合。開發的時候 Flink 版本是1.9,當時 Client API 比較復雜,不太適合平臺集成,并且當時社區也在推動 Client 的重構,所以我們盡量避免依賴 Client API去開發 Flink SQL 平臺。
- 第三個原因是實踐經驗。因為模板 jar + 配置中心模式在網易游戲內部已經有了比較多的應用,所以我們在這方面積累了很多實踐經驗。綜合之下我們很自然地采用了模板 jar + 配置中心的架構來實現 v1 版本。
上圖是 v1 版本的整體架構圖。我們在主要在 Lambda 作業平臺的基礎上新增了 StreamflySQL 后端作為配置中心,負責根據用戶提交的 SQL 和作業運行配置加上通用的模板 jar 來生成一個 Lambda 作業。
總體的作業提交流程如下:
- 用戶在前端的 SQL 編輯器提交 SQL 和運行配置。
- StreamflySQL 后端收到請求后生成一個 Lambda 作業并傳遞配置 ID。
- 然后 Lambda 啟動作業,背后是執行 Flink CLI run 命令來提交作業。、
- Flink CLI run 命令會啟動 Flink Client 來加載并執行模版 jar 的 main 函數,這時會讀取 SQL 和配置,并初始化 TableEnvironment。
- TableEnvironment 會從 Catalog 讀取必要的 Database/Table 等元信息。這里順帶一提是,在網易游戲我們沒有使用統一的 Catalog 來維護不同組件的元信息,而是不同組件有自己的元數據中心,對應不同的 Catalog。
- 最后 TableEnvironment 編譯好 JobGraph,以 Per-Job Cluster 的方式部署作業。
StreamflySQL v1 實現了 Flink SQL 平臺從零到一的建設,滿足了部分業務需求,但仍有不少痛點。
第一個痛點是響應慢。
以一個比較典型的 SQL 來說,以模板 jar 的方式啟動作業需要準備 TableEnviroment,這可能會花費 5 秒鐘,然后執行 SQL 的編譯優化包括與 Catalog 交互去獲取元數據,也可能會花費 5 秒鐘;編譯得到jobgraph之后還需要準備 per-job cluster,一般來說也會花費 20 秒以上;最后還需要等待 Flink job的調度,也就是作業從 scheduled 變成 running 的狀態,這個可能也需要 10 秒鐘。
總體來說,v1 版本啟動一個 Flink SQL 作業至少需要 40 秒的時間,這樣的耗時相對來說是比較長的。但是仔細分析這些步驟,只有 SQL的編譯優化和 job 調度是不可避免的,其他的比如 TableEnvironment 和 Flink cluster 其實都可以提前準備,這里的慢就慢在資源是懶初始化的,而且幾乎沒有復用。
第二個痛點是調試難。
我們對 SQL 調試的需求有以下幾點:
- 第一點是調試的 SQL 與線上的 SQL 要基本一致。
- 第二點是調試 SQL 不能對線上的數據產生影響,它可以去讀線上的數據,但不能去寫。
- 第三點,因為調試的 SQL 通常只需要抽取少量的數據樣本就可以驗證 SQL 的正確性,所以我們希望限制調試 SQL 的資源,一方面是出于成本的考慮,另外一方面也是為了防止調試的 SQL 與線上作業產生資源競爭。
- 第四點,因為調試 SQL 處理的數據量比較少,我們希望以更快更便捷的方式獲取到結果。
在 v1 版本中,我們對上述需求設計了如下解決方案:
- 首先對于調試的 SQL,系統會在 SQL 翻譯的時候將原來的一個 Sink 替換為專用的 PrintSink,這解決了需求中的前兩點。
- 然后對 PrintSink 進行限流,通過 Flink 的反壓機制達到總體的限流,并且會限制作業的最長執行時間,超時之后系統會自動把作業結束掉,這解決了需求中的資源限制這點。
- 最后為了更快地響應,調試的作業并不會提交到 YARN 集群上去運行,而是會在 Lamdba 服務器本地開啟開啟一個 MiniCluster 去執行,同時也方便我們從標準輸出去提取 PrintSink 的結果,這點解決了需求中的最后一點。
調試模式的架構如上圖所示,比起一般的 SQL 提交流程,主要區別在于作業不會提交到 YARN 上,而是在 Lambda 服務器的本地執行,從而節省了準備 Flink 集群的開銷,并且更容易管控資源和獲取結果。
上述調試解決方案基本可用,但是實際使用過程中依然存在不少問題。
- 第一,如果用戶提交的 SQL 比較復雜,那么 SQL 的編譯優化可能會耗費比較久的時間,這會導致作業很容易超時,在有結果輸出之前可能就被系統結束掉,同時這樣的 SQL 也會給服務器造成比較大的壓力。
- 第二,該架構沒法去調試時間窗口比較長的作業或者需要 Bootstrap State 的作業。
- 第三,因為執行結果是在作業結束之后才批量返回的,不是在作業執行過程中就流式返回,因此用戶需要等到作業結束——通常是 10 分鐘以上才可以看到結果。
- 第四,在 SQL 的翻譯階段把調試 SQL 的 Sink 替換掉,這個功能是通過改造 Flink 的 Planner 來實現的,相當于業務邏輯入侵到了 Planner 里面,這樣并不優雅。
第三個痛點是 v1 版本只允許單條 DML。
相比傳統的數據庫,我們支持的 SQL 語句是很有限的,比如,MySQL 的 SQL 可以分成 DML、DQL、DDL 和 DCL。
- DML 用于操控數據,常見的語句有 INSERT / UPDATE / DELETE。StreamflySQL v1 只支持了 INSERT,這和 Flink SQL 是保持一致的。Flink SQL 用 Retract 模式 — 也就是類似 Changelog 的方式 — 來表示 UPDATE/DELETE,所以只支持 INSERT,這點其實沒有問題。
- DQL 用于查詢數據,常見語句是 SELECT。這在 Flink SQL 是支持的,但因為缺乏 Sink 不能生成一個有意義的 Flink 作業,所以 StreamflySQL v1 不支持 DQL。
- DDL 用于定義元數據,常見語句是 CREATE / ALTER /DROP 等。這在 StreamflySQL v1 版本是不支持的,因為模板 jar 調用 SQL 的入口是 sqlUpdate,不支持純元數據的操作,而且為純元數據的操作單獨啟動一個 TableEnvironment 來執行也是完全不劃算。
- 最后是 DCL,用于管理數據權限,比如 GRANT 跟 REVOKE 語句。這個 Flink SQL 是不支持的,原因是 Flink 目前只是數據的用戶而不是管理者,DCL 并沒有意義。
綜合來看,v1 版本只支持了單條 DML,這讓我們很漂亮的 SQL 編輯器變得空有其表。基于以上這些痛點,我們在今年調研并開發了 StreamflySQL v2。v2 采用的是 Server 端 SQL 編譯的架構。
03基于 SQL Gateway 的 StreamflySQL v2
我們的核心需求是解決 v1 版本的幾個痛點,包括改善用戶體驗和提供更完整的 SQL 支持。總體的思路是采用 Server 端的 SQL 編譯的架構,提高可拓展性和性能。此外,我們的集群部署模式也改成 Session Cluster,預先準備好集群資源,省去啟動 YARN application 的時間。
這里會有兩個關鍵問題。
- 首先是我們要完全自研還是基于開源項目?在調研期間我們發現 Ververica 的 SQL Gateway 項目很符合我們需求,容易拓展而且是 Flink 社區 FLIP-91 SQL Gateway 的一個基礎實現,后續也容易與社區的發展方向融合。
- 第二個問題是,SQL Gateway 本身有提交作業的能力,這點跟我們已有的 Lambda 平臺是重合的,會造成重復建設和難以統一管理的問題,比如認證授權、資源管理、監控告警等都會有兩個入口。那么兩者應當如何進行分工?我們最終的解決方案是,利用 Session Cluster 的兩階段調度,即資源初始化和作業執行是分離的,所以我們可以讓 Lambda 負責 Session Cluster 的管理,而 StreamflySQL 負責 SQL 作業的管理,這樣能復用 Lambda 大部分的基礎能力。
這是 StreamflySQL v2 的架構圖。我們將 SQL Gateway 內嵌到 SpringBoot 應用中,開發了新的后端。總體看起來比 v1 版本要復雜,原因是原本的一級調度變成了會話和作業的兩級調度。
首先用戶需要創建一個 SQL 會話,StreamflySQL 后端會生成一個會話作業。在 Lambda 看來會話作業是一種特殊作業,啟動時會使用 yarn-session 的腳本來啟動一個 Flink Session Cluster。在 Session Cluster 初始化之后,用戶就可以在會話內去提交 SQL。StreamflySQL 后端會給每個會話開啟一個 TableEnvironment,負責執行 SQL 語句。如果是只涉及元數據的 SQL,會直接調用 Catalog 接口完成,如果是作業類型的 SQL,會編譯成 JobGraph 提交到 Session Cluster 去執行。
v2 版本很大程度上解決了 v1 版本的幾個痛點:
- 在響應時間方面,v1 常常會需要 1 分鐘左右,而 v2 版本通常在 10 秒內完成。
- 在調試預覽方面,v2 不需要等作業結束,而是在作業運行時,將結果通過 socket 流式地返回。這點是依賴了 SQL gateway 比較巧妙的設計。對于 select 語句,SQL Gateway 會自動注冊一個基于 socket 的臨時表,并將 select 結果寫入到這個表。
- 在 SQL 支持方面,v1 只支持 DML,而 v2 借助于 SQL Gateway 可以支持 DML/DQL/DDL。
不過 SQL Gateway 雖然有不錯的核心功能,但我們使用起來并不是一帆風順,也遇到一些挑戰。
首先最為重要的是元數據的持久化。
SQL Gateway 本身的元數據只保存在內存中,如果進程重啟或是遇到異常崩潰,就會導致元數據丟失,這在企業的生產環境里面是不可接受的。因此我們將 SQL Gateway 集成到 SpringBoot 程序之后,很自然地就將元數據保存到了數據庫。
元數據主要是會話元數據,包括會話的 Catalog、Function、Table 和作業等等。這些元數據按照作用范圍可以分為 4 層。底下的兩層是全局的配置,以配置文件的形式存在;上面兩層是運行時動態生成的元數據,存在數據庫中。上層的配置項優先級更高,可以用于覆蓋下層的配置。
我們從下往上看這些元數據:
- 最底層是全局的默認 Flink Configuration,也就是我們在 Flink Home 下的 flink-conf yaml 配置。
- 再上面一層是 Gateway 自身的配置,比如部署模式(比如是 YARN 還是 K8S),比如默認要出冊的 Catalog 和 Function 等等。
- 第三層是 Session 會話級別的 Session Configuraion,比如會話對應的 Session Cluster 的集群 ID 或者 TaskManager 的資源配置等等。
- 最上面一層是 Job 級別的配置,包括作業動態生成的元數據,比如作業 ID、用戶設置 checkpoint 周期等等。
這樣比較靈活的設計除了解決了元數據持久化的問題,也為我們的多租戶特性奠定了基礎。
第二個挑戰是多租戶。
多租戶分為資源和認證兩個方面:
- 在資源方面,StreamflySQL 利用 Lambda 作業平臺可以在不同的隊列啟動 Session Cluster,它們的 Master 節點和資源很自然就是隔離的,所以沒有像 Spark Thrift Server 那樣不同用戶共用一個 Master 節點和混用資源的問題。
- 在認證方面,因為 Session Cluster 屬于不同用戶,所以 StreamflySQL 后端需要實現多租戶的偽裝。在網易游戲,組件一般會使用 Kerberos 認證。我們采用多租戶實現的方式是使用 Hadoop 的 Proxy User,先登錄為超級用戶,然后偽裝成項目用戶來向不同組件獲取 delegation token,這里的組件主要是 Hive MetaStore 跟 HDFS,最后把這些 token 存到 UGI 里面并用 doAS 的方式來提交作業。
第三個挑戰是水平拓展。
為了高可用和拓展服務能力,StreamflySQL 很自然需要以多實例的架構部署。因為我們已經將主要的狀態元數據存到數據庫,我們可以隨時從數據庫構建出一個新的 TableEnvironment,所以 StreamflySQL 實例類似普通 Web 服務一樣非常輕,可以很容易地擴容縮容。
但是并不是所有狀態都可以持久化的,另外有些狀態我們故意會不持久化。比如用戶使用 SET 命令來改變 TableEnvironment 的屬性,比如開啟 Table Hints,這些屬于臨時屬性,會在重建 TableEnvironment 后被重置。這是符合預期的。再比如用戶提交 select 查詢做調試預覽時,TaskManager 會與 StreamflySQL 后端建立 socket 鏈接,而 socket 鏈接顯然也是不可持久化的。因此我們在 StreamflySQL 的多實例前加了親和性的負載均衡,按照 Session ID 來調度流量,讓在正常情況下同一個用戶的請求都落到同一個實例上,確保用戶使用體驗的連續性。
第四個挑戰是作業狀態管理。
其實這里的狀態一詞是雙關,有兩個含義:
- 第一個含義是作業的運行狀態。SQL gateway 目前只是提交 SQL 并不監控后續的運行狀態。因此,StreamflySQL 設置了監控線程池來定時輪詢并更新作業狀態。因為 StreamflySQL 是多實例的,它們的監控線程同時操作同一個作業的話,可能會有更新丟失的問題,所以我們這里使用了 CAS 樂觀鎖來保證過時的更新不會生效。然后我們會在作業異常退出或者無法獲取狀態時進行告警,比如 JobManager 進行 failover 的情況下,我們無法得知 Flink 作業的狀態,這時系統就會發出 disconnected 的異常狀態告警。
- 第二個含義是 Flink 的持久化狀態,即 Flink State。原生的 SQL gateway 并沒有管理 Flink 的 Savepoint 和 Checkpoint,因此我們加上了 stop 和 stop-with-savepoint 的功能,并強制開啟 retained checkpoint。這使得在作業遇到異常終止或者簡單 stop 之后,再次重啟時系統可以自動查找到最新的 checkpoint。
這里我可以分享下我們的算法。其實自動查找最新 checkpoint 的功能 Lambda 也有提供,但是 Lambda 假設作業都是 Per-Job Cluster,因此只要查找集群 checkpoint 目錄里最新的一個 checkpoint 就可以了。但這樣的算法對 StreamflySQL 卻不適用,因為 Session Cluster 有多個作業,最新的 checkpoint 并不一定是我們目標作業的。因此,我們改為了使用類似 JobManager HA 的查找方式,先讀取作業歸檔目錄元數據,從里面提取最新的一個 checkpoint。
04未來工作
- 未來我們首先要解決的一個問題是 State 遷移的問題,即用戶對 SQL 進行變更后,如何從原先的 Savepoint 進行恢復。目前只能通過變更類型來告知用戶風險,比如通常而言加減字段不會造成 Savepoint 的不兼容,但如果新增一個 join 表,造成的影響就很難說了。因此后續我們計劃通過分析 SQL 變更前后的執行計劃,來預先告知用戶變更前后的狀態兼容性。
- 第二個問題是細粒度的資源管理。目前我們并不能在作業編譯時去指定 SQL 的資源,比如 TaskManager 的 CPU 和內存在 Session Cluster 啟動之后就確定了,是會話級別的。目前調整資源只能通過作業并行度調整,很不靈活并且容易造成浪費。現在 Flink 1.14 已經支持了 DataStream API 的細粒度資源管理,可以在算子級別設置資源,但 SQL API 現在還沒有計劃,后續我們可能參與進去推動相關議案的進展。
- 最后是社區貢獻。我們對 SQL Gateway 有一定使用經驗,而且也對其進行了不少的改進,后續希望這些改進能回饋給 Flink 社區,推動 FLIP-91 SQL Gateway 的進展。?