Apache Kafka:大數(shù)據(jù)的實(shí)時(shí)處理時(shí)代
在過去幾年,對(duì)于 Apache Kafka 的使用范疇已經(jīng)遠(yuǎn)不僅是分布式的消息系統(tǒng):我們可以將每一次用戶點(diǎn)擊,每一個(gè)數(shù)據(jù)庫更改,每一條日志的生成,都轉(zhuǎn)化成實(shí)時(shí)的結(jié)構(gòu)化數(shù)據(jù)流,更早的存儲(chǔ)和分析它們,并從中獲得價(jià)值。同時(shí),越來越多的企業(yè)應(yīng)用也開始從批處理數(shù)據(jù)平臺(tái)向?qū)崟r(shí)的流數(shù)據(jù)數(shù)據(jù)平臺(tái)轉(zhuǎn)移。本演講將介紹最近 Apache Kafka 添加的一些系統(tǒng)架構(gòu),包括 Kafka Connect 和 Kafka Streams,并且描述一些如何使用它們的實(shí)際應(yīng)用體驗(yàn)。
注:本文由王國璋在 QCon 北京 2017 站上的演講整理而成。
流處理
在流處理剛被提出來的時(shí)候,很多人認(rèn)為流處理只能進(jìn)行做近似的結(jié)果或者增量的計(jì)算,倘若你想保證其安全性,以 Lamda 架構(gòu)為基礎(chǔ),利用流處理得到最現(xiàn)在的結(jié)果。但同時(shí)你需要采用 batch processing 等其他方式來保證其全局的安全性以正確性。
在如此多年的研究結(jié)果下,在我看來,流處理并不一定是近似的,或者是僅僅以無法保證真確性為代價(jià)而提高速度的一種數(shù)據(jù)處理方式。相反,流處理應(yīng)該是一個(gè)與全局計(jì)算、batch processing 稍微有點(diǎn)不同的計(jì)算模型。跟批量處理不同之處在于,批量處理將數(shù)據(jù)引向計(jì)算,而流處理將計(jì)算引向數(shù)據(jù)。這句話大概有點(diǎn)模糊,接下來,我舉幾個(gè)大家熟悉的計(jì)算模型例子。
***個(gè)計(jì)算模型例子—請(qǐng)求應(yīng)答模型。
請(qǐng)求應(yīng)答模型是業(yè)務(wù)生活中最常用的模型例子。首先提交一個(gè)請(qǐng)求到服務(wù)方,而服務(wù)方可能是一個(gè)數(shù)據(jù)庫、也可能是別的存儲(chǔ)工具;然后進(jìn)行等待…等待;***得到一個(gè)回答。這便是一次請(qǐng)求、一次計(jì)算、一次回答。該模型非常簡單、也極易操作,當(dāng)你需要延展到多個(gè)機(jī)器上時(shí),只要簡單地增加客戶端以及處理器即可成功。但是缺點(diǎn)在于,不能達(dá)到大的吞吐量,每提交一次請(qǐng)求,都需要等待時(shí)間來獲得最終應(yīng)答的結(jié)果。
第二種常見的模型就是批量處理如上圖所示。如果請(qǐng)求應(yīng)答模型在譜系的一端,那么 typo 的另一端則認(rèn)為是批量處理。當(dāng)我積累數(shù)據(jù)數(shù)量足夠多的時(shí)候,一次性提交任務(wù)到數(shù)據(jù)倉庫,再進(jìn)行等待,等待時(shí)間短則幾秒鐘、幾分鐘,長則幾小時(shí),***才得到最終的結(jié)果—所有輸入對(duì)應(yīng)的所有輸出。該批處理模型的好處在于能夠提高其吞吐率,一次的請(qǐng)求和應(yīng)答可以得出較多結(jié)果。但它的缺點(diǎn)是具有高延時(shí)性,比如某數(shù)據(jù)產(chǎn)生時(shí)間為上午 6 點(diǎn)鐘,用戶點(diǎn)擊某網(wǎng)頁,由于批處理模型,每 12 小時(shí)才會(huì)運(yùn)行一次,那么它必須等到上午 6 點(diǎn)到下午 6 點(diǎn)的所有數(shù)據(jù)完整以后才會(huì)進(jìn)行工作,那么運(yùn)行結(jié)果可能是用戶點(diǎn)擊的 12 個(gè)小時(shí)之后。高延遲性是批處理自身帶有的特性。
那么什么是流處理呢? 在我看來,流處理就是介于請(qǐng)求應(yīng)答和批處理之間的一種新型計(jì)算模型或者編程模型。流處理并不等待數(shù)據(jù)的完整性,或者說數(shù)據(jù)本沒有完整性這一講法,數(shù)據(jù)本身就是一個(gè)數(shù)據(jù)流,當(dāng)每個(gè)數(shù)據(jù)流每產(chǎn)生一個(gè)新數(shù)據(jù)的時(shí)候立刻被計(jì)算出、進(jìn)行返回,因此數(shù)據(jù)是源源不斷地通向計(jì)算,并且源源不斷有結(jié)果被輸出。你可以設(shè)想,與等待數(shù)據(jù)完全完成之后發(fā)布到計(jì)算上相比,流處理就是將計(jì)算移到你數(shù)據(jù)發(fā)生地進(jìn)行實(shí)時(shí)計(jì)算的方式。
為什么很多人之前有這樣一種錯(cuò)覺,他們認(rèn)為流處理可能存在有丟包的情況、或者說只可以得到近似的結(jié)果,其實(shí)這是早期的一些數(shù)據(jù)流處理系統(tǒng)所自帶的一些限制。因此以 Lamda 架構(gòu)為基礎(chǔ),在流處理上需要討論不同維度的取舍。接下里我將舉三個(gè)例子,延遲、、成本和正確性。正如很多人之前提及的,在進(jìn)行流處理時(shí)候,其大多數(shù)情況需要用時(shí)間來換取正確性,或者用更多的成本換取時(shí)間等等。
***個(gè)例子,說如果你需要做一個(gè)實(shí)時(shí)的 ETL 處理。而關(guān)于 ETL 處理不需要太小的延遲,為達(dá)到低成本的一種保證,我們可以忍受幾分鐘或者 1 分鐘的延遲;但是,如果你正在進(jìn)行一個(gè)實(shí)時(shí)的在線監(jiān)測(cè),存在著幾毫秒的延遲,那么這時(shí)候可能更愿意選擇花大量的金錢,或者采取一些可能不必要的 possibility 來達(dá)到一種低延遲的效果;第二個(gè)例子,假設(shè)你在做一個(gè)在線付費(fèi)協(xié)議,它也是一個(gè)流處理平臺(tái)。由于在線付費(fèi)協(xié)議可能關(guān)乎到其機(jī)構(gòu),或者其公司的利益所在,因此你會(huì)說,我需要保證***的正確性,我不希望有任何丟包情況;
第三個(gè)例子,如果你是做一個(gè)實(shí)時(shí)的日志處理,實(shí)時(shí)收集所有日志,并將其導(dǎo)入 root,在這種情況下,你可能會(huì)說,為了降低成本,我愿意付出一小部分正確性的代價(jià),即使不能達(dá)到 100%、達(dá)到 99.99%、達(dá)到 99.9%,這樣的結(jié)果都可以接受。這本是用戶在定義不同流處理應(yīng)用或者業(yè)務(wù)的時(shí)候應(yīng)該可以自己做出的選擇。但比較遺憾的是,多數(shù)早期的流處理平臺(tái)其實(shí)并沒有給予用戶該種選擇,他們自身的設(shè)計(jì)理念,那就是為了低延遲直接放棄掉正確性,或者說為了更高的吞吐量直接放棄低延遲。
以上是我想分享的關(guān)于流處理的一些誤會(huì)認(rèn)知,如果我的分享能夠讓大家?guī)ё邇蓚€(gè)答案的話,我希望這就是一個(gè)。我認(rèn)為流處理僅僅是一種不一樣的計(jì)算模型或者編程模型,它將計(jì)算帶到數(shù)據(jù)上,而不是將數(shù)據(jù)引用到計(jì)算上,并且在流處理的時(shí)候,用戶往往需要在正確性、延遲性、成本等不同的維度上做出選擇。
Kafka 的角色
為什么當(dāng)我們說到流處理的時(shí)候,很多人都在說 Kafka。大多數(shù)人在最早接觸 Kafka 時(shí)會(huì)說,Kafka 就是一個(gè)分布式發(fā)布訂閱的消息系統(tǒng),但是如果我們?nèi)ビ^察 Kafka 的最初一些設(shè)計(jì)特性可發(fā)現(xiàn)以下幾點(diǎn)內(nèi)容。***點(diǎn),它可以作為一個(gè)寫在磁盤上的緩存來使用,或者說,并不是僅基于內(nèi)存來存儲(chǔ)流數(shù)據(jù),它可以保證數(shù)據(jù)包不被及時(shí)消費(fèi)時(shí),依然可用且不被丟失;第二點(diǎn),由于位移的存在提供了邏輯上的順序,在同一個(gè)話題上,***個(gè)數(shù)據(jù)比第二個(gè)數(shù)據(jù)***被發(fā)布的時(shí)候,也可保證在消費(fèi)時(shí)也是永遠(yuǎn)***個(gè)數(shù)據(jù)比第二個(gè)數(shù)據(jù)先被消費(fèi);第三點(diǎn),因?yàn)? Kafka 是一個(gè)公有的大數(shù)據(jù)中轉(zhuǎn)站,就是說,所有的數(shù)據(jù)只要在 Kafka 上,永遠(yuǎn)可以在 Kafka 周圍進(jìn)行業(yè)務(wù)的開發(fā)或者認(rèn)知事物的開發(fā)。接下來我將花費(fèi)一些時(shí)間詳細(xì)介紹這三點(diǎn)之間的關(guān)系。
Kafka 不僅僅是一個(gè)訂閱消息系統(tǒng),同時(shí)也是一個(gè)大規(guī)模的流數(shù)據(jù)平臺(tái),那么它提供了什么呢?***,提供訂閱和發(fā)布消息;第二,提供一個(gè)緩存的流數(shù)據(jù)存儲(chǔ)平臺(tái);第三,提供流數(shù)據(jù)的處理平臺(tái)。今天,我將著重討論流式計(jì)算在 Kafka 上面的應(yīng)用。
流式計(jì)算在 Kafka 上的應(yīng)用主要有哪些選項(xiàng)呢?***個(gè)選項(xiàng)就是 DIY,Kafka 提供了兩個(gè)客戶端 —— 一個(gè)簡單的發(fā)布者和一個(gè)簡單的消費(fèi)者,我們可以使用這兩個(gè)客戶端進(jìn)行簡單的流處理操作。舉個(gè)簡單的例子,利用消息消費(fèi)者來實(shí)時(shí)消費(fèi)數(shù)據(jù),每當(dāng)?shù)玫叫碌南M(fèi)數(shù)據(jù)時(shí),可做一些計(jì)算的結(jié)果,再通過數(shù)據(jù)發(fā)布者發(fā)布到 Kafka 上,或者將它存儲(chǔ)到第三方存儲(chǔ)系統(tǒng)中。DIY 的流處理需要成本。打個(gè)比方,考慮數(shù)據(jù)的延遲性,考慮不同時(shí)間上的管理分配,正如很多人提到的 processing time,這將是我后文會(huì)重點(diǎn)提及的概念。以上這些都說明,利用 DIY 做流處理任務(wù)、或者做流處理業(yè)務(wù)的應(yīng)用都不是非常簡單的一件事情。
第二個(gè)選項(xiàng)是進(jìn)行開源、閉源的流處理平臺(tái)。比如,spark。關(guān)于流處理平臺(tái)的一個(gè)公有認(rèn)知的表示是,如果你想進(jìn)行流處理操作,首先拿出一個(gè)集群,且該集群包含所有必需內(nèi)容,比如,如果你要用 spark,那么必須用 spark 的 runtime。因?yàn)樗麄儎澏四阕鳛橐粋€(gè)流處理平臺(tái)使用者需要用到的所有行為,比如,資源管理系統(tǒng)、參數(shù)調(diào)配系統(tǒng)、容器配置、代碼封裝、分發(fā)等,以上行為都已被該平臺(tái)所限定。一旦你選擇使用甲就必須用甲套餐裝備,如果選擇使用乙就必須使用乙套餐裝備。有人不禁提出疑問,我能不能既選擇流處理平臺(tái),又使用自己選擇的,我能不能這樣做呢?
這個(gè)應(yīng)用場(chǎng)景其實(shí)很普遍,舉個(gè)例子,可異步式微服務(wù)處理。什么叫異步式微服務(wù)處理?假設(shè) Kafka 作為一個(gè)緩存數(shù)據(jù),在該緩存區(qū)含有很多不同的業(yè)務(wù)。打個(gè)比方,一個(gè)網(wǎng)店的機(jī)構(gòu)可以有不同的組、不同的員工,有人負(fù)責(zé)銷售、有人負(fù)責(zé)商品分發(fā),有人負(fù)責(zé)價(jià)格管理、有人負(fù)責(zé)在線實(shí)時(shí)的限流監(jiān)控,不同的組、不同的員工可能會(huì)以不同的時(shí)間,或者以不同的代碼來更新他們的產(chǎn)品,只要擁有一個(gè)異步式緩存機(jī)制,即 Kafka,便可擴(kuò)大該微服務(wù),而不需要他們的任何一個(gè)組之間進(jìn)行同步請(qǐng)求應(yīng)答機(jī)制。
在該微服務(wù)情況下,每個(gè)小組的喜好、特性并不一致,有的組表示我需要做流處理平臺(tái),從 Kafka 讀數(shù)據(jù),處理完再寫回 Kafka,并且想要使用 EWS 把我的應(yīng)用部署在云端大規(guī)模集群上;而另外小組表示我不需要那么復(fù)雜,我只是小規(guī)模數(shù)據(jù),不希望起一個(gè)集群,只需起三個(gè)機(jī)器,并且每個(gè)機(jī)器有 1GB 內(nèi)存足以,可進(jìn)行手動(dòng)控制操作,不需要資源管理器。那么我們能不能同時(shí)滿足他們不同的需求呢? 答案就是我接下來要說的第三種選項(xiàng)。
第三種選項(xiàng)是使用一個(gè)輕量級(jí)流處理的庫,而不需要使用一個(gè)廣泛、復(fù)雜的框架或者平臺(tái)來滿足他們不同的需求。在 Kafka 0.10 當(dāng)中已發(fā)布輕量級(jí)流處理內(nèi)容平臺(tái),我們可以設(shè)想,跟其他客戶端發(fā)布者和消費(fèi)者一樣,它也是一個(gè)客戶端,不同之處在于它是一個(gè)計(jì)算者客戶端,一個(gè)好用的、功能強(qiáng)大的客戶端,并且支持 state processing、Windows 延時(shí)的、異步的、甚至不同數(shù)據(jù)的調(diào)控。 最重要的是 Kafka 作為一個(gè)庫,可以采用多種方法來發(fā)布流處理平臺(tái)的使用。比如,你可以構(gòu)建一個(gè)集群;你可以把它作為一個(gè)手提電腦來使用;甚至還可以在黑莓上運(yùn)行 Kafka。以上都是尤其簡單的運(yùn)行庫的概念。
因此我們要做的事情與使用 Kafka 其他的客戶端類似,比如發(fā)布者、消費(fèi)者,只要在代碼里邊加入就可以使用各種各樣的 API。當(dāng)你要調(diào)配控制 Kafka Stream 應(yīng)用的時(shí)候,選擇最基礎(chǔ)的 War File 來運(yùn)行或者采用 Java、C,甚至資源管理器來運(yùn)行都是可行的。因?yàn)?Kafka Stream 是一個(gè)輕量級(jí)流處理的庫,可支持各種各樣的運(yùn)維方式。
在我們看來,簡單的就是美的,只有給用戶提供***的兼容性與***的延展性,用戶才能得到***的用戶體驗(yàn)。
Kafka Stream 的編程語言
如果接觸過 Storm、Spark 等流處理平臺(tái)的同學(xué)可以發(fā)現(xiàn),它們與 Kafka Stream 高階位 DSL 語言其實(shí)有相似之處。如上圖所示,首先定義一個(gè) Streams 流, Streams 是從 topic1 中的 topic 獲取得到,即定義 Streams、處理 Streams、得到新的 Streams。比如,從 topic1 里面得到兩個(gè)原始數(shù)據(jù)流,然后數(shù)據(jù)流進(jìn)行 countByKey 得到新的數(shù)據(jù)流叫做 Counts。那么 counts.to(“topic2”) 是什么意思呢?在獲取到新的數(shù)據(jù)流之后寫回 Kafka topic2 內(nèi),啟動(dòng) KafkaStreams 進(jìn)程,與 Kafka producer、Kafka consumer 類似,讓它來運(yùn)行已定義計(jì)算。
正如大家所了解的,API 的使用其實(shí)很簡單。提供一個(gè)簡單的 API,用戶簡單地寫入運(yùn)行邏輯即可運(yùn)行。但是編程應(yīng)用總是容易的,而它的復(fù)雜程度在于,一旦你開始運(yùn)維該應(yīng)用,當(dāng)你想要把業(yè)務(wù)拓展到更大規(guī)模,或者業(yè)務(wù)出現(xiàn)變化,或者集群不穩(wěn)定,需要強(qiáng)大的運(yùn)維時(shí),運(yùn)維的程度便顯得異常重要,最上面的編程可能只是冰山一角。Kafka Stream 的設(shè)計(jì)理念是最簡單的就是最美的,包括 API、運(yùn)維、debugging,以及各種各樣的方式,都是希望給用戶帶來最簡單的體驗(yàn)。它的核心思想就是把難問題直接給 Kafka 集群本身。
Kafka 的介紹
Kafka 的核心思想是什么?就是把這些消息全部存成一個(gè)有序日志,所有的消息發(fā)布者把消息發(fā)布到底端,從某一個(gè)邏輯上的位移開始順序讀取所有的消息。它的一個(gè)好處在于所有的讀和寫,盡管都是刷到磁盤上,但都是按照順序進(jìn)行,該方式對(duì)磁盤的使用比較有效,倘若消費(fèi)者和發(fā)布者隔得比較近,將利用 page cash 直接讀數(shù)據(jù)。
延展性。如上圖,提供 topic 以及 topic partitions,即話題與話題分區(qū)的機(jī)制。每個(gè)用戶有不同的 topic,每個(gè) topic 可以有多個(gè)分區(qū),每個(gè)分區(qū)可被裝載在不同的機(jī)器上,當(dāng)用戶提高規(guī)模之后,Kafka 只需要簡單地增加機(jī)器和 topic partitions 數(shù)量,或者采用 ROM balance 的方式到不同機(jī)器上,即可達(dá)到線性延展方式。
以上是 Kafka 最簡單的核心思想,接下來我將介紹 Kafka Streams 作為 Kafka 客戶端如何利用以上核心思想來設(shè)計(jì)流處理的平臺(tái)。數(shù)據(jù)流其實(shí)就是有序的記錄或消息,每個(gè)消息是一個(gè) Key 加一個(gè) Value,并且 record 與 Kafka 自身 massage 具有一一對(duì)應(yīng)關(guān)系。
用戶所提供的業(yè)務(wù)上的計(jì)算模型,其實(shí)可用拓補(bǔ)結(jié)構(gòu)進(jìn)行表達(dá)。如上圖,圖的左邊。用戶首先進(jìn)行定義數(shù)據(jù)流,然后對(duì)數(shù)據(jù)流進(jìn)行計(jì)算,得到新的數(shù)據(jù)流,最終將數(shù)據(jù)流寫回到 Kafka 內(nèi)。每當(dāng)用戶進(jìn)行定義的時(shí)候,每一步都會(huì)變成拓?fù)浣Y(jié)構(gòu)里面的一個(gè)點(diǎn),每個(gè)點(diǎn)通過流進(jìn)行計(jì)算,變成新的流來進(jìn)行新的連接,最終在 Kafka 內(nèi)部形成拓?fù)浣Y(jié)構(gòu)。用戶并不需要在意該拓補(bǔ)結(jié)構(gòu),只需明白定義流、計(jì)算流、得到新的流,寫回 Kafka。
連接每一個(gè)不同的運(yùn)算單元就是一個(gè) Stream,即 record stream,每一個(gè) Stream 都在源源不斷地實(shí)時(shí)產(chǎn)生 record,每一個(gè) record 是一個(gè) key 加一個(gè) value。利用 Stream Processor 連接 Stream,每個(gè)用戶定義的流的一個(gè)計(jì)算單位對(duì)應(yīng)著一個(gè) Stream Processor。
當(dāng)用戶定義每一步計(jì)算的時(shí)候,就是定義每個(gè)拓?fù)浣Y(jié)構(gòu)里面的每個(gè)點(diǎn),最終把整個(gè)拓補(bǔ)結(jié)構(gòu)定義完整到 Kafka Stream 來運(yùn)行。計(jì)算單元其實(shí)可分成兩個(gè)特殊的單元,一個(gè)叫做元的計(jì)算單元,只有輸出流,沒有輸入流,它們唯一的認(rèn)同就是從 Kafka 讀取數(shù)據(jù)形成數(shù)據(jù)流,傳遞給下方其他數(shù)據(jù)處理。而 Stream Processor 底端的數(shù)據(jù)流,沒有輸出流,只有輸入流,它們的功能是把所有輸入流寫回到 Kafka。Kafka 的運(yùn)行操作簡單,源數(shù)據(jù)從 Kafka log 讀取消息變成數(shù)據(jù)流,每個(gè)消息貫穿整個(gè)拓?fù)浣Y(jié)構(gòu),最終從 Stream Processor 寫回到 Kafka。以上為 Kafka Stream 運(yùn)行情況。
用戶進(jìn)行并行發(fā)布進(jìn)程、應(yīng)用或者多個(gè)計(jì)算的操作其實(shí)也非常簡單。Kafka 是一個(gè)庫,當(dāng)你用 Kafka 庫寫成應(yīng)用,當(dāng) record 寫入多臺(tái)機(jī)器時(shí),Kafka Stream 庫本身就會(huì)自動(dòng)調(diào)動(dòng) partitions 方式,假設(shè)你有兩臺(tái)機(jī)器,每臺(tái)機(jī)器上都運(yùn)行了 Kafka Streams,當(dāng)它同時(shí)進(jìn)行運(yùn)行時(shí),不同的 streams application instance 就會(huì)從不同的 Kafka partitions 內(nèi)讀取數(shù)據(jù)來達(dá)到并行任務(wù)的分發(fā)與執(zhí)行,任務(wù)之間沒有任何的數(shù)據(jù)重疊,當(dāng)你需要更多線性地增長任務(wù)時(shí),你只需要在不同的機(jī)器上運(yùn)行同樣的 record,所有的 instance 將會(huì)自動(dòng)進(jìn)行 rebalance,把新的 application 寫入,然后獲取到延展。
很多人看到不同的計(jì)算方式的時(shí)候會(huì)發(fā)現(xiàn),有的計(jì)算方式,比如說 fliter、map,沒有“計(jì)算狀態(tài)”需要保存,一個(gè)數(shù)據(jù)進(jìn)來計(jì)算、一個(gè)數(shù)據(jù)出去。但是有的計(jì)算,比如說 join、aggregate,就需要?jiǎng)討B(tài)維護(hù)一個(gè)“計(jì)算狀態(tài)”,每一次新的信息或者日志進(jìn)來的時(shí)候, Stream 就要進(jìn)行更新甚至進(jìn)行讀取。后者被稱為 Stateful Processing,前者為 Stateless Processing。
那么如何進(jìn)行管理流處理的 states 呢?有兩個(gè)通用的方式,一個(gè)方式是 remote State,利用遠(yuǎn)程的數(shù)據(jù)庫或者遠(yuǎn)程的 key value store 存儲(chǔ)所有流處理的 states,每一次計(jì)算的時(shí)候,發(fā)送一個(gè)遠(yuǎn)程請(qǐng)求來讀取 states。遠(yuǎn)程請(qǐng)求的缺點(diǎn)在于需要進(jìn)行遠(yuǎn)程的請(qǐng)求和應(yīng)答。因?yàn)? states 存在于 Remove State 上,states 之間可能會(huì)有 overlation,不能很好做到 accesstion. 比如我是團(tuán)隊(duì) A,只負(fù)責(zé) sell,另外一個(gè)是團(tuán)隊(duì) B,只負(fù)責(zé) ajustment, 兩個(gè)不同的流有著不同的 job,但是 state 存在一起,所以兩者會(huì)相互影響;
另外一個(gè)方式是 Local State,意味著所有的 state 和所有的處理單元是并發(fā)在一起的,每個(gè)單元上存著 state。在 Kafka Stream 里面,每個(gè)計(jì)算單元之間不需要有任何交互,state 之間亦如此。我們只要把 state 存到 Local 計(jì)算單元上就足矣。***,可以保證 better isolation,它們之間沒有任何的 access;第二,local state 可以做到更好的時(shí)效性,不需要遠(yuǎn)程讀寫。
如上圖,在 Kafka 內(nèi)有 aggregateByKey(…)語句,類似于 Stateful Processing。當(dāng)用戶定義 Stateful Processing 的時(shí)候,在 Kafka Stream 庫內(nèi)部就會(huì)自動(dòng)生成 State Strom,且與 aggregate opprate 進(jìn)行連接,只有該 opprate 能夠?qū)υ?State Strom 進(jìn)行讀寫,因?yàn)槊總€(gè) opprate 有自己獨(dú)有的 State Strom,可達(dá)到 State Strom 完全 Local 化。
當(dāng)我們有多個(gè)并發(fā)流處理任務(wù)的時(shí)候,每個(gè)計(jì)算單元除了有一個(gè)自己的拓?fù)浣Y(jié)構(gòu)進(jìn)行計(jì)算之外,也有一份 State Store。每個(gè) State Strom 之間是存儲(chǔ)完全不相干的流處理信息和數(shù)據(jù)。
接下來討論的是 Kafka Streams 里面另一個(gè)重要概念,流與數(shù)據(jù)庫表的關(guān)系?正如大家所看見的,在 Kafka Streams 內(nèi)部有兩種流—— KStream 與 Ktable,那么什么叫做 KStream?什么叫做 Ktable 呢?在開發(fā) Kafka Streams 時(shí)的一個(gè)核心出發(fā)點(diǎn)是流和它所對(duì)應(yīng)的表或者數(shù)據(jù)庫的 State 彼此之間具有一一影射關(guān)系。為什么一一影射呢?
舉個(gè)例子,假設(shè)你有一個(gè)上圖的數(shù)據(jù)流,該數(shù)據(jù)流代表著某張表,即變量的日志或者更新日志。更新日志內(nèi)含有 Key 和 Valve,比如第三條的更新日志(key1,value3)其實(shí)正在更新第 1 日志(key1,value1)的新信息,換句話說,原本 key1 所對(duì)應(yīng)的是 value1,但是在這一時(shí)刻被改成對(duì)應(yīng) value3,如果我們重復(fù)更新該日志,我們能夠得到什么呢?我們可以得到該表在任意時(shí)間段內(nèi)的一個(gè)實(shí)時(shí)的可視化圖。
同理,如果我們只有這樣一個(gè)表,并且正在不斷更新這個(gè)表,只要在每次更新時(shí)保留該日志,就能夠從表反推回該更新日志的數(shù)據(jù)流所應(yīng)的所有內(nèi)容,這就是流和表或者流和狀態(tài)之間的一一對(duì)應(yīng)關(guān)系。總而言之,只要你有一個(gè)日志更新流,即可重構(gòu)回你表狀態(tài)在任意時(shí)間內(nèi)的 value;如果你有一個(gè)表,也可以通過表的更新來找到該表所對(duì)應(yīng)的流。這就是我所說的 A Stream is a changelog of a table ;A table is a materialized view at tiome of a stream. 流和表具有對(duì)應(yīng)關(guān)系。
這促使我們定義兩種不同的——KStream 和 KTable。KStream 是很普通的數(shù)據(jù)流,在數(shù)據(jù)流之間不存在任何因果關(guān)系和邏輯關(guān)系,可以被認(rèn)為是 append only Stream。Typo 是更新日志流,每個(gè)日志里面相同的 key 所對(duì)應(yīng)的就是對(duì)表的更新。那么為什么要定義這兩種不同的數(shù)據(jù)流呢?我舉個(gè)例子。
如上圖,用戶購買歷史記錄。比如 Alice 曾經(jīng)買過雞蛋和牛奶,雞蛋和牛奶這兩者之間不存在任何因果關(guān)系,Alice 買過牛奶只是在 Alice 買過雞蛋上很簡單的增量。用戶雇傭狀態(tài)的更新日志,比如 Alice 曾經(jīng)在 LinkedIn 工作,之后信息被更新到 Alice 在微軟工作,現(xiàn)在 Alice 在微軟工作覆蓋了之前的工作信息。
如果以當(dāng)前的時(shí)間狀態(tài)進(jìn)行解讀這兩個(gè)流,***個(gè)流顯示的信息為 Alice 曾經(jīng)買過雞蛋,第二個(gè)流信息顯示為 Alice 在 LinkedIn 工作。如果將時(shí)間往前推,查看更新的數(shù)據(jù)流信息可以發(fā)現(xiàn),***個(gè) KStream 顯示 Alice 買了雞蛋又買了牛奶;但是在第二種情況下,Alice 并不是同時(shí)在 LinkedIn 和微軟工作,而是 Alice 已經(jīng)在微軟工作,不在 LinkedIn 工作了。
為什么兩種不同的流有兩種定義呢?因?yàn)楫?dāng)你做相同操作的時(shí)候,比方說簡單做一個(gè)合計(jì)操作,不同的流得出的結(jié)果是不一樣的。在上者,如果我們將時(shí)間往前推,可得出 Alice 的合計(jì)結(jié)果是 2+3;但是在下面,如果對(duì)其進(jìn)行 KTable 的 aggregate,顯示 Alice 的結(jié)果是將其原本數(shù)值 2 變成 3,而不是 +3 的關(guān)系。
在 Kafka Stream 的 DSL 里面有多種不同的 aggregate,reduce 操作等, 不同的數(shù)據(jù)流可能將 KStream 變成 KTable,也可能把 KTable 變回 KStream,在用戶定義如下不同的 operation 的時(shí)候,在后臺(tái)不同狀態(tài)的流可采用不同計(jì)算方式、計(jì)算模型。
如上圖,KTable。當(dāng)一條新消息進(jìn)來時(shí)該如何進(jìn)行拓?fù)溆?jì)算呢?舉個(gè)例子,在該拓?fù)浣Y(jié)構(gòu)內(nèi),Stream2 出現(xiàn)了一個(gè)新的 record,即紅顏色標(biāo)記,該標(biāo)記與***條 record 顏色相近,因?yàn)樗鼈兪峭瑐€(gè) key,不同 value。Stream2 和 Stream1 進(jìn)行 join 操作成為一個(gè)新的 record,該新 record 會(huì)被放入到 KStream joined 里面,然后 KStream joined 進(jìn)行 aggregate 操作,而 aggregate 操作得到的結(jié)果是 state 被更新,新 record 被 append 到 aggregate 流內(nèi),但是 append 操作將之前的紅顏色 record 復(fù)寫了,換句話說,因?yàn)橛辛嗽撔?record 的存在,之前紅顏色的 record 由于被復(fù)寫已經(jīng)不重要了。
Kafka Stream 運(yùn)維
如果我們有一個(gè) fault,那么我們?nèi)绾卧?Kafka Stream 上做 fault tolerance?
正如上文所提及的,Tables 和 Stream 之間存在一一影射關(guān)系,Kafka Stream 有效地利用了該特性。舉個(gè)例子,有個(gè) Kafka Stream 的應(yīng)用業(yè)務(wù),該業(yè)務(wù)有三個(gè)并發(fā) task,每個(gè) task 有自己的 local state,每當(dāng) State 進(jìn)行更新時(shí),Kafka Stream 就會(huì)自動(dòng)將更新消息寫到更新日志內(nèi),更新日志也自動(dòng)生成。每更新一個(gè)狀態(tài)時(shí),消息日志就被更新該日志上。
比如過了一段時(shí)間,中間的 task 壞掉了,那么 Kafka Stream 會(huì)做什么呢?首先它會(huì)檢測(cè)異常,自動(dòng)地在已有的 instance 上重新啟動(dòng)原本壞掉的 task,重新構(gòu)建 State,那么 State 怎么 build 呢?通過更新 changelog,直到 restore 整個(gè)原本正在進(jìn)行的狀態(tài)的 restoration,只有新狀態(tài)被 restore 完整之后才能繼續(xù) task 同步計(jì)算。
消息回溯也是類似的原理。比方說,某應(yīng)用已被運(yùn)行了很多年,發(fā)現(xiàn) stream 流處理計(jì)算里面存在 Bug,我們不得不將已計(jì)算的結(jié)果舍棄,回溯到一個(gè)更早的歷史時(shí)間重新進(jìn)行計(jì)算,即計(jì)算回溯。Reprocessing 在 Kafka Stream 也是一種簡單的方式,當(dāng)我們達(dá)到某一個(gè)位移,比如位移 5,需要進(jìn)行消息回溯時(shí),用戶可以簡單地起一個(gè)新的狀態(tài) -New State,該 State 完全沒有任何內(nèi)容,然后從最早的時(shí)間開始重新進(jìn)行計(jì)算,直到計(jì)算到趕上現(xiàn)有 task 時(shí)候。只需要 switch over 就可以完成消息回溯,且該整個(gè)消息回溯過程不需要關(guān)閉整個(gè)流處理任務(wù)。于是很多人便問,那么 Kafka Stream 能不能支持 Streaming processing 呢?
舉個(gè)例子,我不希望 Kafka Stream 一直在運(yùn)行,希望它可以每 6 個(gè)小時(shí) run 一次,并且每 run 一次可將當(dāng)前所有已累計(jì)的 Kafka massage 全部處理掉。這個(gè)操作也很簡單,從 outsite A 開始,一直位移到 B 結(jié)束或者到 C 結(jié)束,表示已停止整個(gè)應(yīng)用;6 個(gè)小時(shí)之后當(dāng)它重啟的時(shí)候,再從新的位移開始進(jìn)行下一段的位移,這是批處理計(jì)算結(jié)果,即從一個(gè) outsite 到另外一個(gè) outsite,緊接著是另外一個(gè) outsite…Kafka Stream 通過位移的控制和管理進(jìn)行批處理結(jié)果,而不需要運(yùn)行整個(gè) Kafka Stream。
時(shí)間的管理
時(shí)間管理是流處理上非常重要的觀念,同時(shí)也是區(qū)別于流處理和批量式處理非常重要的概念。很多人都已熟悉 Event Time 和 Processing Time 的區(qū)別,Event Time 是每個(gè)日志、消息、狀態(tài)發(fā)生的時(shí)候所發(fā)生的時(shí)間,而 Processing Time 是日志被計(jì)算和處理的時(shí)候所發(fā)生的時(shí)間。這兩者可能并不是完全融合的,可能存在位移,這便是所謂的時(shí)間延遲。
如上圖,以《星球大戰(zhàn)》故事時(shí)間和拍攝時(shí)間為例。《星球大戰(zhàn)》有七步曲,Processing Time 是電影真正拍攝時(shí)間,是在現(xiàn)實(shí)生活中的時(shí)間——1999 年到 2015 年;但是拍攝時(shí)間和星球大戰(zhàn)所發(fā)生時(shí)間并不一一對(duì)應(yīng),存在延遲。對(duì)其做流處理時(shí)候可以發(fā)現(xiàn),類似 out of order 的現(xiàn)象很常見,比如因?yàn)閿?shù)據(jù)量太大而導(dǎo)致數(shù)據(jù)發(fā)生延遲,或者說數(shù)據(jù)處理發(fā)生了延遲等,都會(huì)發(fā)生延時(shí)情況。
那么 Kafka Stream 怎么解決該問題呢? Kafka Stream 允許給每個(gè)日志定義時(shí)間戳,該時(shí)間戳可以是當(dāng)前系統(tǒng)時(shí)間,也可以是提取時(shí)間戳,也可以從當(dāng)前 record 被生成的時(shí)候所提取的時(shí)間戳,這些即被定義成 Event Time。類似的,如果 record 是一個(gè) Jason format,將其時(shí)間戳提取出來也可被定義成 Event Time。
有如此時(shí)間戳,我們可以基于該時(shí)間戳進(jìn)行各式計(jì)算,比方說 Windowing 的計(jì)算。舉個(gè)例子,每隔 5 分鐘計(jì)算一個(gè)平均值、總和或者合計(jì),每一個(gè) Windowing 正如上圖顏色所示,不同顏色代表不同的時(shí)間戳和不同的 Windowing。當(dāng)你收到一個(gè) record,而該 record 時(shí)間戳指向非常未來的時(shí)間,你便得到一個(gè)非常未來的日志。Kafka 不會(huì)直接更新當(dāng)前的 Windowing,而是會(huì)生成該時(shí)間戳所對(duì)應(yīng)的 Windowing 更新 aggregate。
同理,倘若你繼續(xù)計(jì)算,你會(huì)發(fā)現(xiàn)有個(gè)古老日志的時(shí)間戳指向很早以前的 Windowing。Kafka Stream 可以通過更新原本的 aggregate 來達(dá)到這樣延時(shí)結(jié)果。用戶在現(xiàn)在時(shí)間進(jìn)行如下定義,比方說定義 Window aggregation,每一個(gè) Windowing 是 5 分鐘,但是我希望每個(gè) Windowing 可保持整整一天時(shí)間,只要該 Windowing 在當(dāng)前 24 小時(shí)之內(nèi)依然存在即可做到。
寫在***
上文分享了較多內(nèi)容,從 ordering 到狀態(tài)、一直到 partitioning & scalability ,但其實(shí)最重要的是所有的這些都是由 Kafka Stream 庫自動(dòng)完成的。我們希望用戶不要受到以上任何問題的影響,只需定義自己的業(yè)務(wù),所有如上的問題都由 Kafka Stream 解決,盡管它只是一個(gè)庫,但依然有足夠強(qiáng)大的能力去處理所有事物。
我們?cè)?Kafka 0.10 里面公布 Kafka Stream 之后,把 Streams 延展到 Java 以外的語言,比如支持 python,或者像 SQL 一樣的更高階編程模型來讓用戶更方便地定義自己的流處理應(yīng)用。在 7 月份的 release 里面,我們也會(huì)增加正好一次(exactly-once)計(jì)算方式的 aggregate。
很多人可能會(huì)好奇,Kafka Stream 很好,可是我的數(shù)據(jù)原本不在 Kafka 內(nèi),而 Kafka Stream 只能從 Kafka 內(nèi)部獲取,如何將數(shù)據(jù)導(dǎo)入 Kafka 呢? 答案是 Kafka Connect,一個(gè)簡單的數(shù)據(jù)導(dǎo)入導(dǎo)出框架。 時(shí)至去年年底,Kafka Connect 已經(jīng)有 40 個(gè)不同規(guī)模的 Connect,包括從 JDBC 到 HDFS、一直到 MYSQL,以及所有可以想到的第三方系統(tǒng),用戶可以簡單地把數(shù)據(jù)從第三方系統(tǒng)導(dǎo)入和導(dǎo)出 Kafka。
總之,回到本源,Kafka 到底是什么? Kafka 是一個(gè)中央式的流處理平臺(tái),他們支持消息的發(fā)布、消費(fèi)、傳輸和存儲(chǔ),以及消息的計(jì)算和消息的處理。
以上是本文分享的全部內(nèi)容。關(guān)注兩個(gè) Take-aways,***個(gè) Take-away,流處理只是不同的計(jì)算模型,它不會(huì)只給你近似的結(jié)果,只能用來做增量的結(jié)果;第二個(gè) Take- away,因?yàn)?Kafka Stream 的存在使得 Stream processing 存在更加簡單。