Flink Metrics監控與 RestApi
本文轉載自微信公眾號「KK架構師」,作者wangkai 。轉載本文請聯系KK架構師公眾號。
一、Flink metrics簡介
Flink 的 metrics 是 Flink 公開的一個度量系統,metrics 也可以暴露給外部系統,通過在 Flink 配置文件 conf/flink-conf.yaml 配置即可,Flink原生已經支持了很多reporter,如 JMX、InfluxDB、Prometheus 等等。
我們也可以自定義指標通過 metric 收集,實際開發時經常需要查看當前程序的運行狀況,flink 提供了 UI 界面,有比較詳細的統計信息。
但是 UI 界面也有不完善的地方,比如想要獲取 flink 的實時吞吐。本文將詳細介紹如何通過 metric 監控 flink 程序,自定義監控指標以及 metrics 在 flink 的 UI 界面的應用。
二、Metrics在UI頁面上的應用
在 flink 的 UI 的界面上我們點擊任務詳情,然后點擊 Task Metrics 會彈出如下的界面,在 add metic 按鈕上 我們可以添加我需要的監控指標。
注意:如果點擊 Task Metrics 沒有顯示 Add metics 點擊一下任務的 DAG 圖就會顯示出來,當我們點擊了 DAG 圖中某個算子的名字,那么 Add metric 顯示的就是該算子的監控指標,且按照分區顯示,算子名前置的數字就是分區號。
三、各個指標的含義
關于各個指標的含義官網上有詳細介紹:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/metrics.html#availability
四、自定義監控指標
案例:在map算子內計算輸入的總數據,設置 :
- DataStream<String> userData = kafkaData.map(new RichMapFunction<String, String>() {
- Counter mapDataNub;
- @Override
- public void open(Configuration parameters) throws Exception {
- mapDataNub= getRuntimeContext()
- .getMetricGroup()
- .addGroup("flink_test_metric")
- .counter("mapDataNub");
- }
- @Override
- public String map(String s) {
- String s1 ="";
- try {
- String[] split = s.split(",");
- long userID = Long.parseLong(split[0]);
- long itemId = Long.parseLong(split[1]);
- long categoryId = Long.parseLong(split[2]);
- String behavior = split[3];
- long timestamp = Long.parseLong(split[4]);
- Map map = new HashMap();
- map.put("userID", userID);
- map.put("itemId", itemId);
- map.put("categoryId", categoryId);
- map.put("behavior", behavior);
- map.put("timestamp", timestamp);
- s1 = JSON.toJSONString(map);
- mapDataNub.inc();
- System.out.println("數據"+map.toString());
- } catch (NumberFormatException e) {
- e.printStackTrace();
- }
- return s1;
- }
程序啟動之后就可以在任務的ui界面上查看
注意點:
搜索自定義或者查看某個指標需要點擊DAG圖中對應算子的名稱
指標的前綴0,1,2....是指算子的分區數
進行監控時,盡量不要對算子進行重命名,使用默認的名字,這樣一套監控程序可以監控多個flink任務,比如對sink重新命名,如果不同的flink程序對sink的命名不一樣,則一套監控無法監控多個flink程序
- .addSink(KafkaSink.getProducer()).name("kafka_sink");
五、Flink UI 不顯示算子數據接收和發送的條數
有時候我們Flink任務正常運行,數據也可以打印,而且都保存到數據庫了,但是UI上面卻不顯示數據接收和發送的條數 ,導致無法進行指標監控和查查flink任務運行的具體情況,這是什么原因導致的呢?
原因:是因為默認情況下Flink開啟了operator chain,所以當flink程序所有的算子都在一個chain里面時,也就是在一個DAG(task)里面,所有沒有向下游發送數據,所以顯示都為0。比如下圖的情況所有指標都是0;
解決方案:第一種方法:在flink程序里添加自定義metric
第二種方法:使用startNewChain和disableChainin打斷程序默認的operator chain
第三種方法:修改某個算子的并行度使其和上下游算子并行度不一致
六、Metric Reporter
Metrics可以暴露給外部系統,通過在flink配置文件conf/flink-conf.yaml配置即可,flink原生已經支持了很多reporter,如JMX、InfluxDB、Prometheus等等,同時也支持自定義reporter。
Flink自帶了很多Reporter,包括JMX、InfluxDB、Prometheus等等,接下來介紹下InfluxDB Reporter的使用。
只需在flink配置文件conf/flink-conf.yaml中配置Influxdb相關信息即可,主要包括域名、端口號、用戶密碼等等。
flink1.10之后采用
- metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
- metrics.reporter.influxdb.host: localhost
- metrics.reporter.influxdb.port: 8086
- metrics.reporter.influxdb.db: flink
- metrics.reporter.influxdb.consistency: ANY
- metrics.reporter.influxdb.connectTimeout: 60000
- metrics.reporter.influxdb.writeTimeout: 60000
- metrics.reporter.influxdb.interval: 30 SECONDS
flink1.10之前
- metrics.reporters: influxdb
- metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter
- metrics.reporter.influxdb.host: localhost
- metrics.reporter.influxdb.port: 8086
- metrics.reporter.influxdb.db: flink_monitor
- metrics.reporter.influxdb.username: flink-metrics
- metrics.reporter.influxdb.password: 123
注意事項:收集flinkSQL任務的監控指標,如果用戶書寫的sql語句 insert into 或者insert overwrite 中單引號帶有換行符,寫入influxdb會報錯
查看influxdb收集到監控信息,發現會自動給我生成數據庫和measurement,所有的指標都存儲在了具體的measurement中
七、flink metric監控程序
前面介紹了flink公共的監控指標以及如何自定義監控指標,那么實際開發flink任務我們需要及時知道這些監控指標的數據,去獲取程序的健康值以及狀態。這時候就需要我們通過 flink REST API ,自己編寫監控程序去獲取這些指標。很簡單,當我們知道每個指標請求的URL,我們便可以編寫程序通過http請求獲取指標的監控數據。
八、flink REST API監控程序
為了獲取flink任務運行狀態和吞吐量我們需要注意一下兩點:
- flink集群模式需要知道 JobManager 的地址和端口(5004)
- 對于 flink on yarn 模式來說,則需要知道 RM 代理的 JobManager UI 地址,例如 http://yarn-resource-manager-ui/proxy/application_155316436xxxx_xxxx
1.獲取flink任務運行狀態(我們可以在瀏覽器進行測試,輸入如下的連接)
http://yarn-resource-manager-ui/proxy/application_155316436xxxx_xxxx/jobs
返回的結果
- {
- jobs: [{
- id: "ce793f18efab10127f0626a37ff4b4d4",
- status: "RUNNING"
- }
- ]
- }
2.獲取 job 詳情
需要在/jobs/jobid
http://yarn-resource-manager-ui/proxy/application_155316436xxxx_xxxx/jobs/ce793f18efab10127f0626a37ff4b4d4
- {
- jid: "ce793f18efab10127f0626a37ff4b4d4",
- name: "Test",
- isStoppable: false,
- state: "RUNNING",
- start - time: 1551577191874,
- end - time: -1,
- duration: 295120489,
- now: 1551872312363,
- 。。。。。。
- 此處省略n行
- 。。。。。。
- }, {
- id: "cbc357ccb763df2852fee8c4fc7d55f2",
- parallelism: 12,
- operator: "",
- operator_strategy: "",
- description: "Source: Custom Source -> Flat Map",
- optimizer_properties: {}
- }
- ]
- }
- }
九、更靈活的方式獲取每個指標的請求連接
有人可能會問,這么多指標,難道我要把每個指標的請求的URL格式都記住嗎?
今天教大家一個小技巧,一個前端技術,就是進入flink任務的UI界面,按住F12進入開發者模式,然后我們點擊任意一個metric指標,便能立即看到每個指標的請求的URL。比如獲取flink任務的背壓情況:
如下圖我們點擊某一個task的status,按一下f12,便看到了backpressue,點開backpressue就是獲取任務背壓情況的連接如下:
http://127.0.0.1/proxy/application_12423523_133234/jobs/86eb310874aeccb37b58ae2892feced3/vertices/cbc357ccb763df2852fee8c4fc7d55f2/backpressure
請求連接返回的json字符串如下:我們可以獲取每一個分區的背壓情況,如果不是OK狀態便可以進行任務報警,其他的指標獲取監控值都可以這樣獲取 簡單而又便捷。
十、案例:實時獲取yarn上flink任務運行狀態
我們使用 flink REST API的方式,通過http請求實時獲取flink任務狀態,不是RUNNING狀態則進行電話或郵件報警,達到實時監控的效果。
- public class SendGet {
- public static String sendGet(String url) {
- String result = "";
- BufferedReader in = null;
- try {
- String urlNameString = url;
- URL realUrl = new URL(urlNameString);
- // 打開和URL之間的連接
- URLConnection connection = realUrl.openConnection();
- // 設置通用的請求屬性
- connection.setRequestProperty("accept", "*/*");
- connection.setRequestProperty("connection", "Keep-Alive");
- connection.setRequestProperty("user-agent",
- "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
- // 建立實際的連接
- connection.connect();
- in = new BufferedReader(new InputStreamReader(
- connection.getInputStream()));
- String line;
- while ((line = in.readLine()) != null) {
- result += line;
- }
- } catch (Exception e) {
- System.out.println("發送GET請求出現異常!" + e);
- e.printStackTrace();
- }
- // 使用finally塊來關閉輸入流
- finally {
- try {
- if (in != null) {
- in.close();
- }
- } catch (Exception e2) {
- e2.printStackTrace();
- }
- }
- return result;
- }
- public static void main(String[] args) {
- String s = sendGet("http://127.0.0.1:5004/proxy/application_1231435364565_0350/jobs");
- JSONObject jsonObject = JSON.parseObject(s);
- String string = jsonObject.getString("jobs");
- String substring = string.substring(1, string.length() - 1);
- JSONObject jsonObject1 = JSONObject.parseObject(substring);
- String status = jsonObject1.getString("status");
- System.out.println(status);
- }
- }
結果