成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Flink Metrics監控與 RestApi

安全 應用安全
Flink 的 metrics 是 Flink 公開的一個度量系統,metrics 也可以暴露給外部系統,通過在 Flink 配置文件 conf/flink-conf.yaml 配置即可,Flink原生已經支持了很多reporter,如 JMX、InfluxDB、Prometheus 等等。

[[404556]]

本文轉載自微信公眾號「KK架構師」,作者wangkai 。轉載本文請聯系KK架構師公眾號。

 一、Flink metrics簡介

Flink 的 metrics 是 Flink 公開的一個度量系統,metrics 也可以暴露給外部系統,通過在 Flink 配置文件 conf/flink-conf.yaml 配置即可,Flink原生已經支持了很多reporter,如 JMX、InfluxDB、Prometheus 等等。

我們也可以自定義指標通過 metric 收集,實際開發時經常需要查看當前程序的運行狀況,flink 提供了 UI 界面,有比較詳細的統計信息。

但是 UI 界面也有不完善的地方,比如想要獲取 flink 的實時吞吐。本文將詳細介紹如何通過 metric 監控 flink 程序,自定義監控指標以及 metrics 在 flink 的 UI 界面的應用。

二、Metrics在UI頁面上的應用

在 flink 的 UI 的界面上我們點擊任務詳情,然后點擊 Task Metrics 會彈出如下的界面,在 add metic 按鈕上 我們可以添加我需要的監控指標。

注意:如果點擊 Task Metrics 沒有顯示 Add metics 點擊一下任務的 DAG 圖就會顯示出來,當我們點擊了 DAG 圖中某個算子的名字,那么 Add metric 顯示的就是該算子的監控指標,且按照分區顯示,算子名前置的數字就是分區號。

三、各個指標的含義

關于各個指標的含義官網上有詳細介紹:

https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/metrics.html#availability

四、自定義監控指標

案例:在map算子內計算輸入的總數據,設置 :

  1. DataStream<String> userData = kafkaData.map(new RichMapFunction<String, String>() { 
  2.             Counter mapDataNub; 
  3.             @Override 
  4.             public void open(Configuration parameters) throws Exception { 
  5.                 mapDataNub=  getRuntimeContext() 
  6.                        .getMetricGroup() 
  7.                        .addGroup("flink_test_metric"
  8.                        .counter("mapDataNub"); 
  9.             } 
  10.             @Override 
  11.             public String map(String s)  { 
  12.                 String s1 =""
  13.                 try { 
  14.                     String[] split = s.split(","); 
  15.                     long userID = Long.parseLong(split[0]); 
  16.                     long itemId = Long.parseLong(split[1]); 
  17.                     long categoryId = Long.parseLong(split[2]); 
  18.                     String behavior = split[3]; 
  19.                     long timestamp = Long.parseLong(split[4]); 
  20.                     Map map = new HashMap(); 
  21.                     map.put("userID", userID); 
  22.                     map.put("itemId", itemId); 
  23.                     map.put("categoryId", categoryId); 
  24.                     map.put("behavior", behavior); 
  25.                     map.put("timestamp"timestamp); 
  26.                     s1 = JSON.toJSONString(map); 
  27.                     mapDataNub.inc(); 
  28.                     System.out.println("數據"+map.toString()); 
  29.                 } catch (NumberFormatException e) { 
  30.                     e.printStackTrace(); 
  31.                 } 
  32.                 return  s1; 
  33.             } 

程序啟動之后就可以在任務的ui界面上查看

注意點:

搜索自定義或者查看某個指標需要點擊DAG圖中對應算子的名稱

指標的前綴0,1,2....是指算子的分區數

進行監控時,盡量不要對算子進行重命名,使用默認的名字,這樣一套監控程序可以監控多個flink任務,比如對sink重新命名,如果不同的flink程序對sink的命名不一樣,則一套監控無法監控多個flink程序

  1. .addSink(KafkaSink.getProducer()).name("kafka_sink"); 

五、Flink UI 不顯示算子數據接收和發送的條數

有時候我們Flink任務正常運行,數據也可以打印,而且都保存到數據庫了,但是UI上面卻不顯示數據接收和發送的條數 ,導致無法進行指標監控和查查flink任務運行的具體情況,這是什么原因導致的呢?

原因:是因為默認情況下Flink開啟了operator chain,所以當flink程序所有的算子都在一個chain里面時,也就是在一個DAG(task)里面,所有沒有向下游發送數據,所以顯示都為0。比如下圖的情況所有指標都是0;

解決方案:第一種方法:在flink程序里添加自定義metric

第二種方法:使用startNewChain和disableChainin打斷程序默認的operator chain

第三種方法:修改某個算子的并行度使其和上下游算子并行度不一致

六、Metric Reporter

Metrics可以暴露給外部系統,通過在flink配置文件conf/flink-conf.yaml配置即可,flink原生已經支持了很多reporter,如JMX、InfluxDB、Prometheus等等,同時也支持自定義reporter。

Flink自帶了很多Reporter,包括JMX、InfluxDB、Prometheus等等,接下來介紹下InfluxDB Reporter的使用。

只需在flink配置文件conf/flink-conf.yaml中配置Influxdb相關信息即可,主要包括域名、端口號、用戶密碼等等。

flink1.10之后采用

  1. metrics.reporter.influxdb.factory.class: org.apache.flink.metrics.influxdb.InfluxdbReporterFactory 
  2. metrics.reporter.influxdb.host: localhost 
  3. metrics.reporter.influxdb.port: 8086 
  4. metrics.reporter.influxdb.db: flink 
  5. metrics.reporter.influxdb.consistency: ANY 
  6. metrics.reporter.influxdb.connectTimeout: 60000 
  7. metrics.reporter.influxdb.writeTimeout: 60000 
  8. metrics.reporter.influxdb.interval: 30 SECONDS 

flink1.10之前

  1. metrics.reporters: influxdb 
  2. metrics.reporter.influxdb.class: org.apache.flink.metrics.influxdb.InfluxdbReporter 
  3. metrics.reporter.influxdb.host: localhost 
  4. metrics.reporter.influxdb.port: 8086 
  5. metrics.reporter.influxdb.db: flink_monitor 
  6. metrics.reporter.influxdb.username: flink-metrics 
  7. metrics.reporter.influxdb.password: 123 

注意事項:收集flinkSQL任務的監控指標,如果用戶書寫的sql語句 insert into 或者insert overwrite 中單引號帶有換行符,寫入influxdb會報錯

查看influxdb收集到監控信息,發現會自動給我生成數據庫和measurement,所有的指標都存儲在了具體的measurement中

七、flink metric監控程序

前面介紹了flink公共的監控指標以及如何自定義監控指標,那么實際開發flink任務我們需要及時知道這些監控指標的數據,去獲取程序的健康值以及狀態。這時候就需要我們通過 flink REST API ,自己編寫監控程序去獲取這些指標。很簡單,當我們知道每個指標請求的URL,我們便可以編寫程序通過http請求獲取指標的監控數據。

八、flink REST API監控程序

為了獲取flink任務運行狀態和吞吐量我們需要注意一下兩點:

  • flink集群模式需要知道 JobManager 的地址和端口(5004)
  • 對于 flink on yarn 模式來說,則需要知道 RM 代理的 JobManager UI 地址,例如 http://yarn-resource-manager-ui/proxy/application_155316436xxxx_xxxx

1.獲取flink任務運行狀態(我們可以在瀏覽器進行測試,輸入如下的連接)

http://yarn-resource-manager-ui/proxy/application_155316436xxxx_xxxx/jobs

返回的結果

  1.  jobs: [{ 
  2.    id: "ce793f18efab10127f0626a37ff4b4d4"
  3.    status: "RUNNING" 
  4.   } 
  5.  ] 

2.獲取 job 詳情

需要在/jobs/jobid

http://yarn-resource-manager-ui/proxy/application_155316436xxxx_xxxx/jobs/ce793f18efab10127f0626a37ff4b4d4

  1.  jid: "ce793f18efab10127f0626a37ff4b4d4"
  2.  name"Test"
  3.  isStoppable: false
  4.  state: "RUNNING"
  5.  start - time: 1551577191874, 
  6.  end - time: -1, 
  7.  duration: 295120489, 
  8.  now: 1551872312363, 
  9.  。。。。。。 
  10.       此處省略n行 
  11.     。。。。。。 
  12.    }, { 
  13.     id: "cbc357ccb763df2852fee8c4fc7d55f2"
  14.     parallelism: 12, 
  15.     operator: ""
  16.     operator_strategy: ""
  17.     description: "Source: Custom Source -&gt; Flat Map"
  18.     optimizer_properties: {} 
  19.    } 
  20.   ] 
  21.  } 

九、更靈活的方式獲取每個指標的請求連接

有人可能會問,這么多指標,難道我要把每個指標的請求的URL格式都記住嗎?

今天教大家一個小技巧,一個前端技術,就是進入flink任務的UI界面,按住F12進入開發者模式,然后我們點擊任意一個metric指標,便能立即看到每個指標的請求的URL。比如獲取flink任務的背壓情況:

如下圖我們點擊某一個task的status,按一下f12,便看到了backpressue,點開backpressue就是獲取任務背壓情況的連接如下:

http://127.0.0.1/proxy/application_12423523_133234/jobs/86eb310874aeccb37b58ae2892feced3/vertices/cbc357ccb763df2852fee8c4fc7d55f2/backpressure

請求連接返回的json字符串如下:我們可以獲取每一個分區的背壓情況,如果不是OK狀態便可以進行任務報警,其他的指標獲取監控值都可以這樣獲取 簡單而又便捷。

 

十、案例:實時獲取yarn上flink任務運行狀態

我們使用 flink REST API的方式,通過http請求實時獲取flink任務狀態,不是RUNNING狀態則進行電話或郵件報警,達到實時監控的效果。

  1. public class SendGet { 
  2.     public static String sendGet(String url) { 
  3.         String result = ""
  4.         BufferedReader in = null
  5.         try { 
  6.             String urlNameString = url; 
  7.             URL realUrl = new URL(urlNameString); 
  8.             // 打開和URL之間的連接 
  9.             URLConnection connection = realUrl.openConnection(); 
  10.             // 設置通用的請求屬性 
  11.             connection.setRequestProperty("accept""*/*"); 
  12.             connection.setRequestProperty("connection""Keep-Alive"); 
  13.             connection.setRequestProperty("user-agent"
  14.                     "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)"); 
  15.             // 建立實際的連接 
  16.             connection.connect(); 
  17.             in = new BufferedReader(new InputStreamReader( 
  18.                     connection.getInputStream())); 
  19.             String line; 
  20.             while ((line = in.readLine()) != null) { 
  21.                 result += line; 
  22.             } 
  23.         } catch (Exception e) { 
  24.             System.out.println("發送GET請求出現異常!" + e); 
  25.             e.printStackTrace(); 
  26.         } 
  27.         // 使用finally塊來關閉輸入流 
  28.         finally { 
  29.             try { 
  30.                 if (in != null) { 
  31.                     in.close(); 
  32.                 } 
  33.             } catch (Exception e2) { 
  34.                 e2.printStackTrace(); 
  35.             } 
  36.         } 
  37.         return result; 
  38.     } 
  39.  
  40.     public static void main(String[] args) { 
  41.         String s = sendGet("http://127.0.0.1:5004/proxy/application_1231435364565_0350/jobs"); 
  42.         JSONObject jsonObject = JSON.parseObject(s); 
  43.         String string = jsonObject.getString("jobs"); 
  44.         String substring = string.substring(1, string.length() - 1); 
  45.         JSONObject jsonObject1 = JSONObject.parseObject(substring); 
  46.         String status = jsonObject1.getString("status"); 
  47.         System.out.println(status); 
  48.     } 

結果

 

責任編輯:武曉燕 來源: KK架構師
相關推薦

2021-09-11 21:02:24

監控Sentry Web性能

2021-06-03 09:00:00

Kubernetes集群容器

2024-03-13 13:44:43

開發插件開源

2021-09-30 06:35:23

監控性能優化

2022-07-26 07:47:14

架構

2024-01-03 16:29:01

Agent性能優化

2021-09-08 10:47:33

Flink執行流程

2015-04-13 10:13:29

2014-12-04 09:47:59

2022-05-18 07:30:51

OperatorprometheusVM 集群

2010-09-17 10:41:27

SIP協議視頻監控

2013-11-06 10:46:58

OpenStack監控監控系統

2021-04-16 08:20:00

Flink CEP直播監控

2022-08-25 18:23:07

攜程HBase存儲Metrics

2021-04-29 08:27:06

druidundertowMetrics

2012-10-29 10:14:07

APPHadoopSplunk

2022-07-12 16:54:54

字節跳動Flink狀態查詢

2022-06-20 05:52:27

FlinkTTL流查詢

2017-07-07 14:30:27

Flink架構拓撲

2018-05-21 14:57:38

云監控服務監控原因
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产免费一区二区三区网站免费 | 91麻豆产精品久久久久久 | 日韩国产在线 | 亚洲精品久久久久久一区二区 | 蜜桃在线视频 | 久久99精品国产 | 日日夜夜天天 | 亚洲综合婷婷 | 中文字幕免费在线 | 天天爽天天操 | 中文二区| 97精品一区二区 | 欧美久久不卡 | 99精品欧美一区二区蜜桃免费 | 亚洲成人免费视频在线观看 | 熟女毛片 | 国产精品永久免费观看 | 免费久草 | 密桃av| 亚洲国产网址 | 国产香蕉视频在线播放 | 天天夜夜操 | 亚洲国产成人精品久久 | 国产免费黄网 | 欧美激情一区 | 女女百合av大片一区二区三区九县 | 国产a视频| 日韩毛片在线观看 | 亚洲精品电影在线观看 | 成人亚洲精品久久久久软件 | 激情五月综合 | 激情欧美一区二区三区中文字幕 | 自拍偷拍亚洲欧美 | 亚洲一区二区三区在线视频 | 亚洲国产精品一区在线观看 | 国产成人网 | 久久99精品国产 | 国产精品久久久久久久久久三级 | 久久久九九九九 | 欧美精品一区二区三区四区五区 | 国产精品视频在线观看 |