成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

DataLeap 的 Catalog 系統近實時消息同步能力優化

精選
大數據
Apache Atlas 對于實時消息的消費處理不滿足性能要求,內部使用 Flink 任務的處理方案在 ToB 場景中也存在諸多限制,所以團隊自研了輕量級異步消息處理框架,很好的支持了字節內部和火山引擎上同步元數據的訴求。本文定義了需求場景,并詳細介紹框架的設計與實現。

字節數據中臺 DataLeap 的 Data Catalog 系統通過接收 MQ 中的近實時消息來同步部分元數據。Apache Atlas 對于實時消息的消費處理不滿足性能要求,內部使用 Flink 任務的處理方案在 ToB 場景中也存在諸多限制,所以團隊自研了輕量級異步消息處理框架,很好的支持了字節內部和火山引擎上同步元數據的訴求。本文定義了需求場景,并詳細介紹框架的設計與實現。

1. 背景

1.1 動機

字節數據中臺 DataLeap 的 Data Catalog 系統基于 Apache Atlas 搭建,其中 Atlas 通過 Kafka 獲取外部系統的元數據變更消息。在開源版本中,每臺服務器支持的 Kafka Consumer 數量有限,在每日百萬級消息體量下,經常有長延時等問題,影響用戶體驗。

在 2020 年底,我們針對 Atlas 的消息消費部分做了重構,將消息的消費和處理從后端服務中剝離出來,并編寫了 Flink 任務承擔這部分工作,比較好的解決了擴展性和性能問題。然而,到 2021 年年中,團隊開始重點投入私有化部署和火山公有云支持,對于 Flink 集群的依賴引入了可維護性的痛點。

在仔細的分析了使用場景和需求,并調研了現成的解決方案后,我們決定投入人力自研一個消息處理框架。當前這個框架很好的支持了字節內部以及 ToB 場景中 Data Catalog 對于消息消費和處理的場景。

本文會詳細介紹框架解決的問題,整體的設計,以及實現中的關鍵決定。

1.2 需求定義

使用下面的表格將具體場景定義清楚。

需求維度

需求描述

吞吐量

每日百萬級別,每秒峰值>100

服務質量(QoS)

至少一次

延遲消息

支持將消息標記為延遲處理,最高延遲 1 min

重試

自動對處理失敗消息重試,重試次數可定義

并行與順序處理

Partition 內部支持按照某個 Key 重新分組,不同 Key 之間接受并行,同一個 Key 要求順序處理

消息處理時間

不同類型的消息,處理時間會有較大差別,從< 1 s~1 min

封裝

確保不丟消息的前提下,依賴框架做 Offset 的提交,業務側只需要編寫消息的處理邏輯;另外,將系統狀態以 Metric 方式暴露

輕量

支持與后端服務混合部署,不引入額外的維護成本

1.3 相關工作

在啟動自研之前,我們評估了兩個比較相關的方案,分別是 Flink 和 Kafka Streaming。

Flink 是我們之前生產上使用的方案,在能力上是符合要求的,最主要的問題是長期的可維護性。在公有云場景,那個階段 Flink 服務在火山云上還沒有發布,我們自己的服務又有嚴格的時間線,所以必須考慮替代;在私有化場景,我們不確認客戶的環境一定有 Flink 集群,即使部署的數據底座中帶有 Flink,后續的維護也是個頭疼的問題。另外一個角度,作為通用流式處理框架,Flink 的大部分功能其實我們并沒有用到,對于單條消息的流轉路徑,其實只是簡單的讀取和處理,使用 Flink 有些“殺雞用牛刀”了。

另外一個比較標準的方案是 Kafka Streaming。作為 Kafka 官方提供的框架,對于流式處理的語義有較好的支持,也滿足我們對于輕量的訴求。最終沒有采用的主要考慮點是兩個:

  • 對于 Offset 的維護不夠靈活:我們的場景不能使用自動提交(會丟消息),而對于同一個 Partition 中的數據又要求一定程度的并行處理,使用 Kafka Streaming 的原生接口較難支持。
  • 與 Kafka 強綁定:大部分場景下,我們團隊不是元數據消息隊列的擁有者,也有團隊使用 RocketMQ 等提供元數據變更,在應用層,我們希望使用同一套框架兼容。

2. 設計

2.1 概念說明

  • MQ Type:Message Queue 的類型,比如 Kafka與RocketMQ。后續內容以 Kafka 為主,設計一定程度兼容其他 MQ。
  • Topic:一批消息的集合,包含多個 Partition,可以被多個 Consumer Group消費。
  • Consumer Group:一組 Consumer,同一 Group 內的 Consumer 數據不會重復消費。
  • Consumer:消費消息的最小單位,屬于某個 Consumer Group。?
  • Partition:Topic 中的一部分數據,同一 Partition 內消息有序。同一 Consumer Group 內,一個 Partition 只會被其中一個 Consumer 消費。
  • Event:由 Topic 中的消息轉換而來,部分屬性如下。
  • Event Type:消息的類型定義,會與 Processor 有對應關系;
  • Event Key:包含消息 Topic、Partition、Offset 等元數據,用來對消息進行 Hash 操作;
  • Processor:消息處理的單元,針對某個 Event Type 定制的業務邏輯。
  • Task:消費消息并處理的一條 Pipeline,Task 之間資源是相互獨立的。

2.2 框架架構

圖片

整個框架主要由 MQ Consumer, Message Processor 和 State Manager 組成。

  • MQ Consumer:負責從Kafka Topic拉取消息,并根據 Event Key 將消息投放到內部隊列,如果消息需要延時消費,會被投放到對應的延時隊列;該模塊還負責定時查詢 State Manager 中記錄的消息狀態,并根據返回提交消息 Offset;上報與消息消費相關的 Metric。
  • Message Processor:負責從隊列中拉取消息并異步進行處理,它會將消息的處理結果更新給 State Manager,同時上報與消息處理相關的 Metric。
  • State Manager:負責維護每個 Kafka Partition 的消息狀態,并暴露當前應提交的 Offset 信息給 MQ Consumer。

3. 實現

3.1 線程模型

每個 Task 可以運行在一臺或多臺實例,建議部署到多臺機器,以獲得更好的性能和容錯能力。

每臺實例中,存在兩組線程池:

  • Consumer Pool:負責管理 MQ Consumer Thread 的生命周期,當服務啟動時,根據配置拉起一定規模的線程,并在服務關閉時確保每個 Thread 安全退出或者超時停止。整體有效 Thread 的上限與 Topic 的 Partition 的總數有關。
  • Processor Pool:負責管理 Message Processor Thread 的生命周期,當服務啟動時,根據配置拉起一定規模的線程,并在服務關閉時確保每個 Thread 安全退出或者超時停止。可以根據 Event Type 所需要處理的并行度來靈活配置。

兩類 Thread 的性質分別如下:

  • Consumer Thread:每個 MQ Consumer 會封裝一個 Kafka Consumer,可以消費 0 個或者多個 Partition。根據 Kafka 的機制,當 MQ Consumer Thread 的個數超過 Partition 的個數時,當前 Thread 不會有實際流量。
  • Processor Thread:唯一對應一個內部的隊列,并以 FIFO 的方式消費和處理其中的消息。

3.2 StateManager

圖片

在 State Manager 中,會為每個 Partition 維護一個優先隊列(最小堆),隊列中的信息是 Offset,兩個優先隊列的職責如下:

  • 處理中的隊列:一條消息轉化為 Event 后,MQ Consumer 會調用 StateManager 接口,將消息 Offset  插入該隊列。
  • 處理完的隊列:一條消息處理結束或最終失敗,Message Processor 會調用 StateManager 接口,將消息 Offset 插入該隊列。

MQ Consumer 會周期性的檢查當前可以 Commit 的 Offset,情況枚舉如下:

  • 處理中的隊列堆頂 < 處理完的隊列堆頂或者處理完的隊列為空:代表當前消費回來的消息還在處理過程中,本輪不做 Offset 提交。
  • 處理中的隊列堆頂 = 處理完的隊列堆頂:表示當前消息已經處理完,兩邊同時出隊,并記錄當前堆頂為可提交的 Offset,重復檢查過程。
  • 處理中的隊列堆頂 > 處理完的隊列堆頂:異常情況,通常是數據回放到某些中間狀態,將處理完的隊列堆頂出堆。

注意:當發生 Consumer 的 Rebalance 時,需要將對應 Partition 的隊列清空

3.3 KeyBy 與 Delay Processing 的支持

因源頭的 Topic 和消息格式有可能不可控制,所以 MQ Consumer 的職責之一是將消息統一封裝為 Event。

根據需求,會從原始消息中拼裝出 Event Key,對 Key 取 Hash 后,相同結果的 Event 會進入同一個隊列,可以保證分區內的此類事件處理順序的穩定,同時將消息的消費與處理解耦,支持增大內部隊列數量來增加吞吐。

Event 中也支持設置是否延遲處理屬性,可以根據 Event Time 延遲固定時間后處理,需要被延遲處理的事件會被發送到有界延遲隊列中,有界延遲隊列的實現繼承了 DelayQueue,限制 DelayQueue 長度, 達到限定值入隊會被阻塞。

3.4 異常處理

Processor 在消息處理過程中,可能遇到各種異常情況,設計框架的動機之一就是為業務邏輯的編寫者屏蔽掉這種復雜度。Processor 相關框架的邏輯會與 State Manager 協作,處理異常并充分暴露狀態。比較典型的異常情況以及處理策略如下:

  • 處理消息失敗:自動觸發重試,重試到用戶設置的最大次數或默認值后會將消息失敗狀態通知 State Manager。
  • 處理消息超時:超時對于吞吐影響較大,且通常重試的效果不明顯,因此當前策略是不會對消息重試,直接通知 State Manager  消息處理失敗。
  • 處理消息較慢:上游 Topic 存在 Lag,Message Consumer 消費速率大于  Message Processor 處理速率時,消息會堆積在隊列中,達到隊列最大長度, Message Consumer 會被阻塞在入隊操作,停止拉取消息,類似 Flink 框架中的背壓。?

3.5 監控

為了方便運維,在框架層面暴露了一組監控指標,并支持用戶自定義 Metrics。其中默認支持的 Metrics 如下表所示:

監控類別

監控指標

Message Consumer

Consumer Lag


Rebalance rate


Deserialize QPS


Consumer heartbeat


Message Enqueue Time

Message Processor

Process QPS


Process time

Internal Queue

Queue length

4. 線上運維 Case 舉例

實際生產環境運行時,偶爾需要做些運維操作,其中最常見的是消息堆積和消息重放。

對于 Conusmer Lag 這類問題的處理步驟大致如下:

  • 查看 Enqueue Time,Queue Length 的監控確定服務內隊列是否有堆積。
  • 如果隊列有堆積,查看 Process Time 指標,確定是否是某個 Processor 處理慢,如果是,根據指標中的 Tag 確定事件類型等屬性特征,判斷業務邏輯或者 Key 設置是否合理;全部 Processor 處理慢,可以通過增加 Processor 并行度來解決。
  • 如果隊列無堆積,排除網絡問題后,可以考慮增加 Consumer 并行度至 Topic Partition 上限。

消息重放被觸發的原因通常有兩種,要么是業務上需要重放部分數據做補全,要么是遇到了事故需要修復數據。為了應對這種需求,我們在框架層面支持了根據時間戳重置 Offset 的能力。具體操作時的步驟如下:

  • 使用服務側暴露的 API,啟動一臺實例使用新的 Consumer GroupId: {newConsumerGroup} 從某個 startupTimestamp 開始消費。
  • 更改全部配置中的 Consumer GroupId 為 {newConsumerGroup}。
  • 分批重啟所有實例。

5. 總結

為了解決字節數據中臺 DataLeap 中 Data Catalog 系統消費近實時元數據變更的業務場景,我們自研了輕量級消息處理框架。當前該框架已在字節內部生產環境穩定運行超過 1 年,并支持了火山引擎上的數據地圖服務的元數據同步場景,滿足了我們團隊的需求。

下一步會根據優先級排期支持 RocketMQ 等其他消息隊列,并持續優化配置動態更新,監控報警,運維自動化等方面。

責任編輯:未麗燕 來源: 字節跳動技術團隊
相關推薦

2023-04-14 15:37:02

DataLeap存儲優化MySQL

2015-04-01 15:03:58

Spark大數據

2023-05-03 08:58:46

數據庫開源

2023-06-27 07:11:37

湖倉一體MaxCompute

2022-06-08 09:55:19

Data Catal字節跳動業務系統

2013-06-27 09:59:26

網絡通信HTML5Web

2022-11-03 07:22:42

2022-04-12 08:22:54

Linux內核操作系統

2022-11-24 08:50:07

數據中臺Data Catal

2011-06-22 10:37:08

rsyncinotify

2023-10-19 11:43:47

惡意軟件

2024-10-18 11:39:55

MySQL數據檢索

2020-03-18 07:11:24

實時同步搜索

2024-07-03 11:33:02

2013-05-16 10:15:11

信息泄密彭博Bloomberg

2009-04-28 10:00:52

中華網壓縮開支裁員

2024-07-03 08:02:19

MySQL數據搜索

2023-08-14 09:46:12

高并發消息
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: av免费网站在线观看 | 欧美日韩在线视频一区 | 免费成人高清在线视频 | 中文字幕日韩一区 | 国产一区二区三区 | 国产九九九九 | 国产精品视频一区二区三区四区国 | 毛片免费看 | 亚洲va欧美va天堂v国产综合 | 欧美亚洲视频 | 99这里只有精品视频 | 丝袜美腿一区 | 日屁视频 | 久产久精国产品 | 亚洲国产精品福利 | 久久久久久国产免费视网址 | 欧美一区免费 | 国产三区四区 | 久久精品a | 日韩1区| 少妇久久久 | 玖玖综合网| av片在线观看网站 | 一区二区三区四区在线视频 | jav成人av免费播放 | 日本一区二区三区免费观看 | 伊人影院在线观看 | 久久久久久久久久久久久9999 | 久久国产精品一区二区三区 | 精品欧美一区二区精品久久久 | 久久精品国产a三级三级三级 | 精品成人佐山爱一区二区 | 91精品久久久久久综合五月天 | 国产精品亚洲第一区在线暖暖韩国 | 国产999在线观看 | 亚洲精品视频二区 | 国产视频福利在线观看 | 99在线资源 | 成人午夜黄色 | 日本色综合 | 日韩影音 |