從 Pulsar Client 的原理到它的監(jiān)控面板
背景
前段時間業(yè)務(wù)團(tuán)隊(duì)偶爾會碰到一些 Pulsar 使用的問題,比如消息阻塞不消費(fèi)了、生產(chǎn)者消息發(fā)送緩慢等各種問題。
雖然我們有個監(jiān)控頁面可以根據(jù) topic 維度查看他的發(fā)送狀態(tài),比如速率、流量、消費(fèi)狀態(tài)等信息。
但也有幾個問題:
- 無法在應(yīng)用維度查看他所依賴的所有 topic 的各種狀態(tài)。
- 監(jiān)控的信息還不夠,比如發(fā)送/消費(fèi)延遲、發(fā)送/消費(fèi)失敗等數(shù)據(jù)。
總之就是缺少一個全局的監(jiān)控視角,通過這些指標(biāo)可以很方便的分析出當(dāng)時的運(yùn)行情況。
基于這個需求經(jīng)過一段時間的折騰,現(xiàn)在已經(jīng)上線使用幾個月,目前比較穩(wěn)定,效果圖如下:
現(xiàn)在就可以在每個應(yīng)用的監(jiān)控面板里看到自己使用了哪些 topic,分別的生產(chǎn)消費(fèi)情況如何。
核心流程
要實(shí)現(xiàn)這些功能就得在應(yīng)用的 metrics 中加入相關(guān)的監(jiān)控信息,但官方的 Java client 是沒有暴露出這些指標(biāo)的。
但 pulsar-client-go 是自帶了這些指標(biāo)的
由于 SDK 不支持所以只能自己想辦法實(shí)現(xiàn)了,為此其實(shí)有兩種實(shí)現(xiàn)方案:
- 魔改 Java client,在需要監(jiān)控的地方手動埋點(diǎn)指標(biāo)。
- 由于我們使用了 SkyWalking,所以可以編寫插件,以 agent 的方式獲取數(shù)據(jù)、埋點(diǎn)指標(biāo)。
不過第一種方案有以下一些問題:
- 需要自己維護(hù)一個代碼分支,還需要定期和官方保持一致,難免會出現(xiàn)代碼沖突。
- 需要推動業(yè)務(wù)方進(jìn)行依賴升級,線上有著幾百個應(yīng)用,推動起來時間太慢。
第二種方案的好處就不言而喻了:
- 升級無感知,只需要在我們的基礎(chǔ)鏡像中加上插件即可。
- Java client 的版本也更容易統(tǒng)一。
Client 原理
但其實(shí)不管是哪種方案我們都得熟悉 Java Client 的實(shí)現(xiàn)原理,才能知道哪些數(shù)據(jù)是我們需要重點(diǎn)關(guān)注的,可以幫助我們更好的定位問題。
本文重點(diǎn)不在于此,具體代碼就不仔細(xì)分析了。
從上圖可以看出,如果我們想要監(jiān)控消費(fèi)是否存在阻塞的情況,這幾個內(nèi)部隊(duì)列是需要重點(diǎn)監(jiān)控的,一旦他們出現(xiàn)堆積,那就會出現(xiàn)消費(fèi)阻塞。
其實(shí)這些數(shù)據(jù)都可以通過。
org.apache.pulsar.client.api.ProducerStats
org.apache.pulsar.client.api.ConsumerStats
這兩個接口獲取到生產(chǎn)者和消費(fèi)者的大部分指標(biāo),只是這里還有一個小插曲。
那就是在獲取消費(fèi)者隊(duì)列大小的時候,獲取到的數(shù)據(jù)一直為空。
最終經(jīng)過源碼排查,原來是我們大量使用的 messageListener 在獲取隊(duì)列大小時有 bug,導(dǎo)致獲取到的數(shù)據(jù)一直都為 0.
相關(guān)的 issue 和 PR 可以在這兩個鏈接查看,問題原因和修復(fù)過程都有具體描述:https://github.com/apache/pulsar/issues/20076 https://github.com/apache/pulsar/pull/20245
但這個修復(fù)得在新版本才能使用,就導(dǎo)致我們現(xiàn)在的監(jiān)控頁面一直顯示為空。
開發(fā) SkyWalking 插件
然后就是開發(fā)一個 SkyWalking 的插件了,其實(shí)直接使用 SW 開發(fā)插件是上手 Java-Agent 比較快的方式。
SW 的 SDK 封裝了許多 agent 原生接口,使得開發(fā)起來非常容易;當(dāng)然缺點(diǎn)也有,就是得集成整個 SW 的 agent。
這里我簡單介紹下這個插件的運(yùn)行流程:
- 在創(chuàng)建和刪除 consumer 的時候維護(hù) consumerPool
- 啟動一個定時任務(wù),定期從這些 consumer 中獲取指標(biāo)數(shù)據(jù)。
當(dāng)消費(fèi)多分區(qū) topic 時,為了能唯一標(biāo)志一個 consumer,所以給每個消費(fèi)者都加了一個 hashcode 的 label。
因?yàn)槲覀兯械?Java 技術(shù)棧都是使用的 Prometheus 的包來生成 metrics ,所以該插件也是使用該包生成的數(shù)據(jù)。
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
<version>0.12.0</version>
<scope>provided</scope>
</dependency>
為了兼容一些特殊 Java 應(yīng)用沒有該包時會啟動報(bào)錯,所以在初始化插件的時候需要檢測當(dāng)前 classpath
下是否存在該依賴。
這些功能 SW 已經(jīng)封裝好了,對我們來說也是開箱即用。
其實(shí) SW 插件自己也是支持 metrics 的,由于我們只是使用了它的 trace 功能,所以這里就沒有使用它的 API。
關(guān)于開發(fā)一個 SW 插件的流程也比較簡單,可以參考官方文檔或者是一些現(xiàn)成的插件源碼。https://skywalking.apache.org/docs/skywalking-java/next/en/setup/service-agent/java-agent/java-plugin-development-guide/
總結(jié)
有了這個監(jiān)控面板后,對于 Pulsar 客戶端內(nèi)部的一些運(yùn)行情況就不再是黑盒了,還可以基于此做一些報(bào)警,比如消費(fèi)堆積、發(fā)送延遲過大等。
當(dāng)然僅僅只有這個面板依然是不夠的,后續(xù)我們又開發(fā)了可以通過 messageId
查詢它的整個生命周期,包括:
- 生產(chǎn)者、消費(fèi)者信息
- 消息生產(chǎn)時間
- 推送時間
- ack 時間等
同時借助與 Pulsar-SQL 的能力,還能以列表的形式展示當(dāng)前 topic 的消息列表。
當(dāng)然在實(shí)現(xiàn)這兩個功能的同時也踩了不少坑,提了幾個 PR ,后面在抽時間做具體的分享。