Apache Kafka與Spark Streaming的兩種整合方法及其優缺點
譯文【51CTO.com快譯】Kafka與Spark Streaming的整合
我們在將Apache Kafka與Spark Streaming整合的實戰過程中,一般可以選用兩種方面來配置Spark Streaming,并接收來自Kafka的數據。***種是利用接收器和Kafka的高級API;而第二種新的方法則并不使用接收器。這兩種方法在性能特征和語義保持上,有著不同的編程模式。
下面讓我們來詳細探究一下這兩種方法。
一、基于接收器的方法
此法運用接收器(Receiver)來接收數據。而接收器是利用Kafka的高級消費者(consumer)API來實現的。此外,接收到的數據會被存儲在Spark的各個執行器(executor)中。然后由Spark Streaming所啟動的作業來處理數據。
但是在出現失敗時,這種方法的默認配置可能會丟失數據。因此,我們必須在Spark Streaming中額外地啟用預寫日志(write-ahead log),以確保數據的零丟失。它將所有接收到的Kafka數據,同步地保存到某個分布式文件系統的預寫日志中,以便在出現失敗時恢復所有的數據。
下面,我們將討論如何在Kafka-Spark Streaming應用中,使用該基于接收器的方法。
1.鏈接
現在,先將您的Kafka Streaming應用與如下的artifact相鏈接,對于Scala和Java類型的應用,我們會用到SBT(Simple Build Tool)和Maven(一種構建工具)的各種項目定義。
- groupId = org.apache.spark
- artifactId = spark-streaming-kafka-0-8_2.11
- version = 2.2.0
而對于Python類型的應用,我們必須在部署自己的應用時,額外添加上述庫、及其各種依賴項。
2.編程
隨后,我們在streaming應用的代碼中,通過導入KafkaUtils,來創建一項DStream輸入:
- import org.apache.spark.streaming.kafka._
- val kafkaStream = KafkaUtils.createStream(streamingContext,
- [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
同樣,通過使用createStream的各種變形方式,我們可以制定出不同的鍵/值類,及其對應的解碼類。
3.部署
通常情況下,對于任何Spark應用而言,您都可以使用spark-submit來發布自己的應用。當然,就具體的Scala、Java和Python應用來說,它們在細節上會略有不同。
其中,由于Python應用缺少SBT和Maven的項目管理,我們可以使用–packages spark-streaming-kafka-0-8_2.11、及其各個依賴項,直接添加到spark-submit處。
- ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ...
此外,我們還可以從Maven的存儲庫中下載Maven artifact的spark-streaming-Kafka-0-8-assembly所對應的JAR包,然后使用-jars,將其添加到spark-submit處。
二、直接方法(無接收器)
在基于接收器的方法之后,新的一種無接收器式“直接”方法誕生了。此法提供了更強大的端到端保證。它定期查詢Kafka在每個topic+分區(partition)中的***偏移量,而不再使用接收器去接收數據。同時,它也定義了要在每個批次中處理的不同偏移范圍。特別是在那些處理數據的作業被啟動時,其簡單消費者(consumer)API就會被用于讀取Kafka中預定義的偏移范圍。可見,此過程類似于從某個文件系統中讀取各種文件。
注:針對Scala和Java API,Spark在其1.3版本中引入了此功能;而針對Python API,它在其1.4版本中同樣引入了該功能。
下面,我們將討論如何在Streaming應用中使用該方法,并深入了解更多有關消費者API的鏈接:
1.鏈接
當然,這種方法僅被Scala和Java應用所支持,并且通過如下artifact來鏈接STB和Maven項目。
- groupId = org.apache.spark
- artifactId = spark-streaming-kafka-0-8_2.11
- version = 2.2.0
2.編程
隨后,我們在Streaming應用的代碼中,通過導入KafkaUtils,來創建一項DStream輸入:
- import org.apache.spark.streaming.kafka._
- val directKafkaStream = KafkaUtils.createDirectStream[
- [key class], [value class], [key decoder class], [value decoder class] ](
- streamingContext, [map of Kafka parameters], [set of topics to consume])
我們必須在Kafka的參數中,指定metadata.broker.list或bootstrap.servers,以便它能夠在默認情況下,從各個Kafka分區的***偏移量開始消費。當然,如果您在Kafka的參數中將auto.offset.reset配置為最小,那么它就會從最小的偏移開始消費。
此外,通過使用KafkaUtils.createDirectStream的各種變形方式,我們能夠從任意偏移量開始消費。當然,我們也可以在每一個批次中,按照如下的方式去消費Kafka的偏移量。
- // Hold a reference to the current offset ranges, so downstream can use it
- var offsetRanges = Array.empty[OffsetRange]
- directKafkaStream.transform { rdd =>
- offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
- rdd
- }.map {
- ...
- }.foreachRDD { rdd =>
- for (o <- offsetRanges) {
- println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
- }
- ...
- }
如果您想使用基于Zookeeper的Kafka監控工具(https://data-flair.training/blogs/zookeeper-in-kafka/),來顯示Streaming應用的進度,那么您也可以自行將其更新到Zookeeper中。
3.部署
該方面的部署過程與基于接收器的方法類似,此處就不贅述了。
三、直接方法的優點
就Spark Streaming與Kafka整合的角度而言,第二種方法較***種方法有著如下的優點:
1.簡化并行
無需創建與合并多個輸入的Kafka Streams(https://data-flair.training/blogs/kafka-streams/)。但是,Sparking Streaming會創建同樣多的RDD(Resilient Distributed Datasets,彈性分布式數據集)分區,以供多個Kafka分區使用直接的方法進行消費。這些分區也會并行地從Kafka中讀取數據。因此我們可以說:在Kafka和RDD分區之間存在更容易被理解和調整的、一對一的映射關系。
2.效率
為了實現數據的零丟失,***種方法需要將數據存儲在預寫日志中,以供進一步復制數據。此方法的效率實際上是比較低的,因為數據被Kafka和預寫日志實際復制了兩次。而在直接的方法中,由于沒有了接收器,因此不需要預先寫入日志,此問題也就迎刃而解了。只要您擁有足夠多的Kafka數據保留,各種消息就能夠從Kafka中被恢復回來。
3.準確到位的語義
在***種方法中,我們使用Kafka的高級API,在Zookeeper中存儲被消費的偏移量。然而,這種傳統的、從Kafka中消費數據的方式,雖然能夠確保數據的零丟失,但是在某些失敗情況下,數據可能會被小概率地消費兩次。實際上,這種情況源自那些被Spark Streaming可靠地接收到的數據,與Zookeeper跟蹤到的偏移量之間所產生的不一致。因此在第二種方法中,我們不再使用Zookeeper,而只是使用一個簡單的Kafka API。Spark Streaming通過其各個檢查點(checkpoints),來跟蹤不同的偏移量,籍此消除了Spark Streaming和Zookeeper之間的不一致性。
可見,就算出現了失敗的情況,那些記錄也都會被Spark Streaming有效地、準確地一次性接收。它能夠確保我們的輸出操作,即:將數據保存到外部數據存儲庫時,各種保存結果和偏移量的冪等性、和原子事務性,這同時也有助于實現準確到位的語義。
不過,這種方法也有一個缺點:由于它不會在Zookeeper中更新各種偏移量,因此那些基于Zookeeper的Kafka監控工具將無法顯示進度。當然,您也可以自行訪問每個批次中由此方法處理的偏移量,并更新到Zookeeper之中。
結論
通過上述討論,我們學到了Kafka與Spark Streaming整合的全體概念。同時,我們也討論了Kafka-Spark Streaming的兩種不同配置方法:接收器方法和直接方法,以及直接方法的幾項優點。
原文標題:Apache Kafka + Spark Streaming Integration,作者:Rinu Gour
【51CTO譯稿,合作站點轉載請注明原文譯者和出處為51CTO.com】