如何使用Kafka Connect創建用于處理實時數據的開源數據管道?
譯文【51CTO.com快譯】Kafka Connect是一種特別強大的開源數據流工具;有了它,將Kafka與其他數據技術結合使用非常輕松。作為一種分布式技術,Kafka Connect提供了特別高的可用性和獨立于Kafka集群的彈性擴展。Kafka Connect使用源或sink連接件發送進出Kafka主題的數據,無需代碼即可與多種非Kafka技術實現整合。
圖1
可靠的開源Kafka連接件可供許多流行的數據技術使用,您還有機會編寫自己的連接件。本文介紹了一個真實的實際數據用例,即如何使用Kafka Connect將來自Kafka的實時流數據與Elasticsearch(以啟用索引Kafka記錄的可擴展搜索)和Kibana(以便可視化那些結果)整合起來。
圖2
針對表明Kafka和Kafka Connect優點的一個用例,我受到CDC新冠疫情數據跟蹤器的啟發。基于Kafka的跟蹤器從多個位置、以多種格式并使用多種協議收集實時新冠病毒檢測數據,并將這些事件處理成易于使用的可視化結果。跟蹤器還有必要的數據治理機制,以確保結果快速到達,并值得信任。
我開始尋找一個同樣復雜且引人注目的用例——但理想情況下,不像新冠疫情那樣令人擔憂。最終,我發現了一個有趣的領域:月潮,包括公開可用的流REST API和采用簡單JSON格式的豐富數據。
月潮數據
潮汐遵循太陰日,這是一個24小時50分鐘的周期;在此期間,地球完全自轉到軌道衛星下方的同一點。每個太陰日有月球引力引起的兩個高潮和兩個低潮:
圖3. 來自美國國家海洋和大氣管理局
美國國家海洋和大氣管理局(NOAA)提供了一個REST API,可以從全球潮汐站輕松獲取詳細的傳感器數據。
圖4
比如說,下列REST調用指定了潮汐站ID、數據類型(我選擇了海平面)和數據(平均海平面),并請求一個采用公制單位的最近結果:
https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8724580&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json
該調用返回JSON結果,含有潮汐站的經緯度、時間和水位值。請注意,您必須記住您調用的是什么,以便了解所返回結果的數據類型、數據和單位!
- {"metadata": {
- "id":"8724580",
- "name":"Key West",
- "lat":"24.5508”,
- "lon":"-81.8081"},
- "data":[{
- "t":"2020-09-24 04:18",
- "v":"0.597",
- "s":"0.005", "f":"1,0,0,0", "q":"p"}]}
啟動數據管道(使用REST源連接件)
要開始創建Kafka Connect流數據管道,我們必須先準備Kafka集群和Kafka Connect集群。
圖5
接下來,我們引入一個REST連接件,比如這個可用的開源連接件。我們會將其部署到AWS S3存儲桶(如果需要,參照這些說明)。 然后我們將要求Kafka Connect集群使用S3存儲桶,對它同步以便在集群中可見,配置連接件,最后讓它運行起來。這種“BYOC”(自帶連接件)方法確保您有無數的方法來尋找滿足特定要求的連接件。
圖6
下列示例演示使用“curl”命令將完全開源的Kafka Connect部署環境配置成可使用REST API。請注意,您需要更改URL、名稱和密碼以匹配您自己的部署:
- curl https://connectorClusterIP:8083/connectors -k -u name:password -X POST -H 'Content-Type: application/json' -d '
- {
- "name": "source_rest_tide_1",
- "config": {
- "key.converter":"org.apache.kafka.connect.storage.StringConverter",
- "value.converter":"org.apache.kafka.connect.storage.StringConverter",
- "connector.class": "com.tm.kafka.connect.rest.RestSourceConnector",
- "tasks.max": "1",
- "rest.source.poll.interval.ms": "600000",
- "rest.source.method": "GET",
- "rest.source.url": "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8454000&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json",
- "rest.source.headers": "Content-Type:application/json,Accept:application/json",
- "rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector",
- "rest.source.destination.topics": "tides-topic"
- }
- }
該代碼創建的連接件任務以10分鐘為間隔輪詢REST API,并將結果寫入到“tides-topic”Kafka主題。通過隨機選擇五個潮汐傳感器以這種方式收集數據,潮汐數據現在通過五個配置和五個連接件填充了潮汐主題。
圖7
結束管道(使用Elasticsearch sink連接件)
為了將該潮汐數據放在某個地方,我們將在管道末端引入Elasticsearch集群和Kibana。 我們將配置一個開源Elasticsearch sink連接件,以便向Elasticsearch發送數據。
圖8
以下示例配置使用sink名稱、類、Elasticsearch索引和我們的Kafka主題。如果索引尚未存在,會創建一個有默認映射的索引。
- curl https://connectorClusterIP:8083/connectors -k -u name:password -X POST -H 'Content-Type: application/json' -d '
- {
- "name" : "elastic-sink-tides",
- "config" :
- {
- "connector.class" : "com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector",
- "tasks.max" : 3,
- "topics" : "tides",
- "connect.elastic.hosts" : ”ip",
- "connect.elastic.port" : 9201,
- "connect.elastic.kcql" : "INSERT INTO tides-index SELECT * FROM tides-topic",
- "connect.elastic.use.http.username" : ”elasticName",
- "connect.elastic.use.http.password" : ”elasticPassword"
- }
- }'
該管道現在可運作起來。然而,由于默認索引映射,進入到Tides索引的所有潮汐數據是字符串。
圖9
需要自定義映射以準確地繪制我們的時間序列數據。我們將為下面的潮汐索引創建這個自定義映射,使用JSON“t”字段用于自定義日期,“v”作為兩倍數,“name”作為代表聚合的關鍵字。
- curl -u elasticName:elasticPassword ”elasticURL:9201/tides-index" -X PUT -H 'Content-Type: application/json' -d'
- {
- "mappings" : {
- "properties" : {
- "data" : {
- "properties" : {
- "t" : { "type" : "date",
- "format" : "yyyy-MM-dd HH:mm"
- },
- "v" : { "type" : "double" },
- "f" : { "type" : "text" },
- "q" : { "type" : "text" },
- "s" : { "type" : "text" }
- }
- },
- "metadata" : {
- "properties" : {
- "id" : { "type" : "text" },
- "lat" : { "type" : "text" },
- "long" : { "type" : "text" },
- "name" : { "type" : ”keyword" } }}}} }'
每次更改Elasticsearch索引映射時,通常都需要Elasticsearch“重新索引”(刪除索引并重新索引所有數據)。數據既可以從現有的Kafka sink連接件重放,就像我們在這個用例中所做的那樣,也可以使用Elasticsearch重新索引操作來獲取。
使用Kibana可視化數據
為了可視化潮汐數據,我們先用Kibana創建一個索引模式,將“t”配置為時間過濾器字段。然后,我們將創建一個可視化,選擇線圖類型。最后,我們將配置圖設置,以便y軸顯示30分鐘內的平均潮位,x 軸顯示隨時間變化的該數據。
結果是下圖顯示了五個樣本潮汐站的潮汐變化,管道從這些潮汐站收集數據:
圖10
結果
我們可以從可視化中清楚地看到潮汐的周期性,每個太陰日出現兩次高潮。
圖11
更令人驚訝的是,每個全球潮汐站的高潮和低潮之間的間隔不一樣。這不僅受月球的影響,還受太陽、當地地理、天氣和氣候變化的影響。這個示例Kafka Connect管道利用Kafka、Elasticsearch和Kibana幫助演示可視化的優點:它們通常可以揭示原始數據無法揭示的信息!
原文標題:How to Use Kafka Connect to Create an Open Source Data Pipeline for Processing Real-Time Data,作者:Paul Brebner
【51CTO譯稿,合作站點轉載請注明原文譯者和出處為51CTO.com】