成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

一年省七位數,得物自建HFDS在 Flink Checkpoint 場景下的應用實踐

存儲 數據管理
Checkpoint:簡單的說,在某一時刻,將 Flink 任務本地機器中存儲在狀態后端的狀態去同步到遠程文件存儲系統(比如 HDFS)的過程就叫 Checkpoint。

1、背景

隨著阿里云Flink實例的遷移下云以及新增需求接入,自建Flink平臺規模逐漸壯大,當前總計已超4萬核運行在自建的K8S集群中,然而 Flink 任務數的增加,特別是大狀態任務,每次Checkpoint 時會產生脈沖式帶寬占用,峰值流量超過100Gb/s,早期使用阿里云OSS作為Checkpoint數據存儲,單個Bucket 每 1P數據量只有免費帶寬10Gb/s,超出部分單獨計費,當前規模每月需要增加1x w+/月。

為了控制這部分成本,得物開展了自建HDFS在Flink Checkpoint場景下的落地工作,實現年度成本節省xxx萬元。

此次分享自建HDFS在實時計算checkpoint場景的實踐經驗,希望能為讀者提供一些參考。

2、Flink Checkpoint 介紹

2.1 Flink里的Checkpoint是什么?

Checkpoint:簡單的說,在某一時刻,將 Flink 任務本地機器中存儲在狀態后端的狀態去同步到遠程文件存儲系統(比如 HDFS)的過程就叫 Checkpoint。

狀態后端:做狀態數據持久化的工具就叫做狀態后端。比如你在 Flink 中見到的 RocksDB、FileSystem 的概念就是指狀態后端,再引申一下,也可以理解為:應用中有一份狀態數據,把這份狀態數據存儲到 MySQL 中,這個 MySQL 就能叫做狀態后端。

2.2 Checkpoint解決什么問題?

其實在實時計算中的狀態的功能主要體現在任務可以做到失敗重啟后沒有數據質量、時效問題。

實時任務一般都是 7x24 小時 Long run 的,掛了之后,就會有以下兩個問題,首先給一個實際場景:一個消費上游 Kafka,使用 Set 去重計算 DAU 的實時任務。

  • 數據質量問題:當這個實時任務掛了之后恢復,Set空了,這時候任務再繼續從上次失敗的 Offset 消費 Kafka 產出數據,則產出的數據就是錯誤數據了
  • 數據時效問題:一個實時任務,產出的指標是有時效性(主要是時延)要求的。你可以從今天 0 點開始重新消費,但是你回溯數據也是需要時間的。舉例:中午 12 點掛了,實時任務重新回溯 12 個小時的數據能在 1 分鐘之內完成嘛?大多數場景下是不能的!一般都要回溯幾個小時,這就是實時場景中的數據時效問題。而 Flink的Checkpoint就是把 Set 定期的存儲到遠程 HDFS 上,當任務掛了,我們的任務還可以從 HDFS 上面把這個數據給讀回來,接著從最新的一個 Kafka Offset 繼續計算就可以,這樣即沒有數據質量問題,也沒有數據時效性問題。

2.3 Checkpoint的運行流程?

  1. JM 定時調度 Checkpoint 的觸發,接受到 JM 做 Checkpoint 的請求后,開始做本地 Checkpoint,暫停處理新流入的數據,將新數據緩存起來。
  2. 將任務的本地狀態數據,復制到一個遠程的持久化存儲(HDFS)空間上。
  3. 繼續處理新流入的數據,包括剛才緩存起來的數據。

圖片

3、自建HDFS引入

3.1 為什么用HDFS?

Flink 做為一個成熟的流計算引擎,對外宣稱可以實現 Exactly Once。為了實現業務上的 Exactly Once,Flink 肯定不能丟數據,也就是狀態數據必須保障高可靠性,而HDFS作為是一個分布式文件系統,具備高容錯率、高吞吐量等特性,是業界使用最廣泛的開源分布式文件系統,針對大狀態的Checkpoint任務非常契合,帶寬易擴展且成本低廉。

HDFS主要有如下幾項特點:

  • 和本地文件系統一樣的目錄樹視圖
  • Append Only 的寫入(不支持隨機寫)
  • 順序和隨機讀
  • 超大數據規模
  • 易擴展,容錯率高

3.2 得物自建HDFS架構

架構層面是典型的主從結構,架構見下圖,核心思想是將文件按照固定大小進行分片存儲,

  • 主節點:稱為 NameNode,主要存放諸如目錄樹、文件分片信息、分片存放位置等元數據信息
  • 從節點:稱為 DataNode,主要用來存分片數據

比如用戶發出了一個1GB的文件寫請求給HDFS客戶端,HDFS客戶端會根據配置(默認是128MB),對這個文件進行切分,HDFS客戶端會切分成8個Block,然后詢問NameNode應該將這些切分好的Block往哪幾臺DataNode上寫,此后client端和NameNode分配的多個DataNode構成pipeline管道,開始以packet為單位向Datanode寫數據。

圖片

4、自建HDFS落地實踐

4.1 集群規劃

早期使用OSS的主要瓶頸在于帶寬,為了匹配將大狀態的任務從OSS遷移到Hdfs帶寬需求,支撐寫入流量100Gib+/s,對比OSS的帶寬成本,結合到成本與帶寬瓶頸考慮,內部大數據d2s.5xlarge機型做了一次性能壓測,單節點吞吐能達到12Gib/s,按100Gib/s預估,算上Buffer,3副本集群需要xx臺機器,滿足現在的帶寬及寫入吞吐需求,最終選擇d2s.5xlarge類型Ecs機器,對應實例詳情如下:

圖片

圖片

4.2 穩定性保障建設

4.2.1 Hdfs組件指標采集

為了確保HDFS集群的穩定和可靠性,支撐線上實時Flink任務Checkpoint,監控告警建設是必不可少的,我們通過統一的采集程序Hadoop Exporter將集群里各組件的JMX信息換為維度模型,將下述為扁平化的事實指標Jmx數據,轉換為維度結構,比如針對NameNode、DataNode,可以直接將指標使用預定義維度,例如:cluster、instance等維度,并存儲到Prometheus能夠識別的指標數據,存儲為一個二維字典結構,例如: _hadoop_namenode_metrics[指標分類(通常是MBean的名稱)][指標名稱]

4.2.2 指標采集架構

結合當前集群的規模,我們通過集中是Pull的方式采集架構,只需要啟動時指定集群Namenode及Jn的Jmx的url信息,就能采集集群的所有組件的指標信息,這樣當有集群擴展或變更時,會自動采集上報到apm里,方便運維,具體采集架構如下圖:

圖片

4.2.3 監控與告警

監控:基于已采集匯報上的指標數據,目前配置了Namenode、Datanode組件核心指標監控大盤,包括HDFS節點健康狀態、HDFS服務健康狀態、數據塊健康狀態、節點的寫入吞吐量等指標。

圖片

圖片

告警:當前監控數據已完成接入公司天眼監控平臺,我們將影響hdfs服務可用性的指標統一配置了告警模版,比如集群總的寫入帶寬、Callqueue隊列、DN存活數量、集群節點基礎io值班等,可以動態覆蓋多集群,實現定制化告警,更加靈活及方便感知問題,減少故障止損時長,滿足線上HDFS穩定性保障SLA目標。

4.2.4 集群快速變更能力

隨著Hdfs集群規模的增加,在日常運維過程中,如何做到快速擴、縮容、節點重啟及配置變更能力,

保障集群具備快速止損的能力,我們封裝了一整套HDFS的各組件變更能力,包括節點自動上報到cmdb對應應用、集群數據節點maintenance模式快速無影響重啟、日常變配等,并集成到ansible playbook,做到集群擴容在分鐘級完成。

圖片

4.3 遷移到HDFS攻克難關

4.3.1 DN 心跳匯報于刪除共用一把寫鎖問題

現象:自建Flink平臺大部分大狀態任務遷移后,自建HDFS集群節點整體的水位各個ecs的網絡帶寬峰值,出現偶發部分任務因checkpiont 寫入失敗問題,報錯信息如下:

問題定位過程:
  • 根據客戶端日志的堆棧信息,查看Namenode的日志找到對應的文件、塊,發現了錯誤日志,文件塊在寫入成功后不能及時上報,塊的狀態一直處于not COMPLETE。

圖片

這里介紹下Hdfs文件寫入流程介紹:

客戶端向datanode寫入塊結束后,datanode通過IBR(增量塊匯報)向namenode匯報新寫入的塊

namenode收到匯報后更新文件的塊副本數,當文件塊副本數>=1時,文件寫入狀態為COMPLETE

客戶端寫入結束后不斷向namenode詢問文件寫入狀態是否COMPLETE,失敗5(默認)次后報錯寫入失敗。

  • 根據上述寫入流程,懷疑問題出現在IBR階段,查看Namenode監控指標,Namenode處理塊匯報平均時長<10ms,所以猜測問題出在Datanode端,觀察發現,Datanode偶發心跳匯報間隔>30s(正常3s一次),Datanode IBR和心跳都是BPServiceActor線程處理,很可能是心跳阻塞了IBR。

圖片

  • 我們根據猜測的方向,繼續定位什么原因導致心跳阻塞了IBR匯報,于是在每臺節點上,部署了腳本(見下圖),根據Datanode的Jmx指標監聽本節點心跳間隔,大于10s時就打印Datanode的Jstack。

Datanode 每個節點上的metric信息里包含心跳匯報間隔的數據。

圖片

  • 分析多個Jstack代碼(具體內容見下),可以發現BPServiceActor線程被CommandProcessingThread線程阻塞,而CommandProcessingThread線程在調用invalidate()方法,而invalidate()是在調用刪除操作。
"BP-1732625734-****-1675758643065 heartbeating to ****:8020" #56 daemon prio=5 os_prio=0 tid=0x00007f8fc6417800 nid=0x77e0 waiting on condition [0x00007f8f933f5000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000720e9d988> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.writeLock(BPOfferService.java:118)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.updateActorStatesFromHeartbeat(BPOfferService.java:570)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(BPServiceActor.java:699)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:879)
        at java.lang.Thread.run(Thread.java:748)


   Locked ownable synchronizers:
        - None
        
"Command processor" #54 daemon prio=5 os_prio=0 tid=0x00007f8fc640f800 nid=0x77de runnable [0x00007f8f935f7000]
   java.lang.Thread.State: RUNNABLE
        at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
        at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:242)
        at java.io.File.isDirectory(File.java:858)
        at java.io.File.toURI(File.java:741)
        at org.apache.hadoop.hdfs.server.datanode.LocalReplica.getBlockURI(LocalReplica.java:256)
        at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.invalidate(FsDatasetImpl.java:2133)
        at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.invalidate(FsDatasetImpl.java:2099)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.processCommandFromActive(BPOfferService.java:738)
        at org.apache.hadoop.hdfs.server.datanode.BPOfferService.processCommandFromActor(BPOfferService.java:684)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.processCommand(BPServiceActor.java:1359)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.lambda$enqueue$2(BPServiceActor.java:1405)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread$$Lambda$75/2086554487.run(Unknown Source)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.processQueue(BPServiceActor.java:1332)
        at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.run(BPServiceActor.java:1315)


   Locked ownable synchronizers:
        - <0x00000007204cf938> (a java.util.concurrent.locks.ReentrantReadWriteLock$FairSync)
        - <0x0000000720e9d988> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)

結合堆棧信息定位到代碼,確實發現processCommandFromActor方法在執行刪除(調用invalidate()方法)操作時與心跳匯報updateActorStatesFromHeartbeat方法共用同一把寫鎖。

class BPOfferService {
private final Lock mWriteLock = mReadWriteLock.writeLock();
void writeLock() {
  mWriteLock.lock();
}


void writeUnlock() {
  mWriteLock.unlock();
}


void updateActorStatesFromHeartbeat(
    BPServiceActor actor,
    NNHAStatusHeartbeat nnHaState) {
  writeLock();
  try {
//... 心跳匯報
  } finally {
    writeUnlock();
  }
}
boolean processCommandFromActor(DatanodeCommand cmd,
    BPServiceActor actor) throws IOException {
  assert bpServices.contains(actor);
// ...省略
  writeLock();
  try {
//...執行刪除邏輯
  } finally {
    writeUnlock();
  }
}
}
  • 確認問題:查看Namenode審計日志發現,集群持續有大量文件刪除(Flink刪除過期Checkpoint meta文件)操作,修改Datanode端代碼,在調用processCommandFromActive方法超過一定10s后打印調用時長與CommandAction日志。查看datanode日志發現確實存在刪除操作大于30s的情況,由此確認問題就是出現在刪除操作耗時過長影響了Datanode的增量塊匯報。

圖片

由此確定問題:

刪除塊操作耗時過長,阻塞datanode心跳,導致IBR被阻塞,塊寫入成功后不能及時上報,客戶端重試一定次數后失敗拋異常,重試次數由dfs.client.block.write.locateFollowingBlock.retries決定,默認5次,第一次等待0.4s,之后每次等待時長翻倍,5次約為15s左右。

 問題解決方案

找到問題就是出現在BPServiceActor 線程做了太多的事,包含FBR、IBR、心跳匯報,而且心跳匯報和刪除共同持有一把寫鎖,那解決方案一個就把這兩把鎖進行拆分,一個就是將IBR邏輯單獨獨立出來,不受心跳匯報影響。

而社區3.4.0版本已經將IBR從BPServiceActor 線程獨立出來了,所有我們最終將HDFS-16016 patch 合并到自建Hdfs3.3.3版本中,IBR不會被invalidate()阻塞,問題得到根治!

圖片

5、總結與規劃

總結:Oss的流量已從早期137Gib/s降低到30Gib/s左右(下圖一),自建Hdfs集群峰值流量達到120Gb/s(下圖二),且平穩運行

圖片

圖片

整個項目已完成全部大狀態任務從Oss遷移到自建Hdfs,當前Hdfs集群規模xx臺,成本x w/月,原OSS帶寬費用阿里報價1x w/月,相比節省xx w/月。

未來規劃:對于全量 checkpoint 來說,TM 將每個 Checkpoint 內部的數據都寫到同一個文件,而對于 RocksDBStateBackend 的增量 Checkpoint 來說,則會將每個 sst 文件寫到一個分布式系統的文件內,當作業量很大,且作業的并發很大時,則會對底層 HDFS 形成非常大的壓力,

1)大量的 RPC 請求會影響 RPC 的響應時間。

2)大量文件對 NameNode 內存造成很大壓力。

針對上面的問題我們未來考慮引入小文件合并方案降低 HDFS 的壓力,包括 RPC 壓力以及 NameNode 內存的壓力。

責任編輯:武曉燕 來源: 得物技術
相關推薦

2025-02-20 09:17:50

2023-07-07 19:26:50

自建DTS平臺

2023-03-30 18:39:36

2023-03-06 10:24:53

CIO驅動技術

2023-02-23 17:45:26

服務器DHHCTO

2023-10-09 18:35:37

得物Redis架構

2025-03-13 06:48:22

2021-01-22 10:27:44

物聯網互聯網IoT

2021-10-14 06:52:47

算法校驗碼結構

2022-12-14 18:40:04

得物染色環境

2022-12-07 08:31:45

ClickHouse并行計算數據

2024-09-11 19:36:24

2023-11-27 18:38:57

得物商家測試

2023-02-08 18:33:49

SRE探索業務

2022-01-14 07:56:38

Checkpoint機制Flink

2024-12-19 09:45:24

2023-08-30 09:00:05

2023-09-13 18:59:40

SRE視角藍綠發布

2022-12-19 19:12:17

分布式事務

2022-11-17 10:25:41

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 中文字幕亚洲国产 | 成人精品视频99在线观看免费 | 亚洲欧美精品在线观看 | 日韩www视频 | 亚洲自拍偷拍免费视频 | 日韩精品成人 | 免费一区二区三区 | 色噜噜亚洲男人的天堂 | 美女午夜影院 | 日韩一级电影免费观看 | 色天堂影院 | 在线播放国产一区二区三区 | 一区二区视频免费观看 | 国产高清在线视频 | www.奇米| 日韩一级| 国产欧美一区二区三区在线看 | 亚洲va在线va天堂va狼色在线 | 久久免费看 | 曰韩三级 | 五月天婷婷丁香 | 亚洲一区在线日韩在线深爱 | 精品av天堂毛片久久久借种 | 国产一级片免费视频 | 中文字幕高清av | 午夜电影网址 | 美女国内精品自产拍在线播放 | 日韩福利| 伊人中文字幕 | 成人在线观看免费视频 | 国产一区成人 | 青青草原综合久久大伊人精品 | 99热碰| 精品三区 | 亚洲一区在线日韩在线深爱 | 久久久婷婷 | 欧美日韩精品免费 | 99精品一区二区 | 久久久日韩精品一区二区三区 | 亚洲国产aⅴ成人精品无吗 亚洲精品久久久一区二区三区 | 免费视频一区二区 |