成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

如何使用Python、Apache Kafka和云平臺構(gòu)建健壯的實(shí)時數(shù)據(jù)管道

譯文
開發(fā)
Apache Kafka的綜合指南涵蓋了架構(gòu)、在云中的部署、使用 Python構(gòu)建數(shù)據(jù)管道、PySpark擴(kuò)展以及實(shí)際示例。

譯者 | 李睿

審校 | 重樓

在當(dāng)今競爭激烈的市場環(huán)境中,為了生存和發(fā)展,企業(yè)必須能夠?qū)崟r收集、處理和響應(yīng)數(shù)據(jù)。無論是檢測欺詐、個性化用戶體驗(yàn)還是監(jiān)控系統(tǒng),現(xiàn)在都需要接近即時的數(shù)據(jù)。

然而,構(gòu)建和運(yùn)行任務(wù)關(guān)鍵型實(shí)時數(shù)據(jù)管道具有挑戰(zhàn)性。基礎(chǔ)設(shè)施必須具有容錯性、無限可擴(kuò)展性,并與各種數(shù)據(jù)源和應(yīng)用程序集成。這就是ApacheKafka、Python和云平臺的用武之地。

這個綜合指南中將介紹:

  • 概述Apache Kafka架構(gòu)
  • 在云中運(yùn)行Kafka集群
  • 使用Python構(gòu)建實(shí)時數(shù)據(jù)管道
  • 使用PySpark進(jìn)行擴(kuò)展處理
  • 實(shí)際示例,例如用戶活動跟蹤、物聯(lián)網(wǎng)數(shù)據(jù)管道,并支持聊天分析

這里將包括大量的代碼片段、配置示例和文檔鏈接,以便獲得這些非常有用的技術(shù)的實(shí)踐經(jīng)驗(yàn)。

Apache Kafka架構(gòu)介紹

Apache Kafka是一個分布式、分區(qū)、復(fù)制的提交日志,用于可靠且大規(guī)模地存儲數(shù)據(jù)流。Apache Kafka的核心是提供以下功能:

  • 發(fā)布-訂閱消息:Kafka允許廣播來自生產(chǎn)者的數(shù)據(jù)流,例如頁面瀏覽量、交易、用戶事件等,并支持消費(fèi)者實(shí)時消費(fèi)。
  • 消息存儲:Kafka在消息到達(dá)時將其持久保存在磁盤上,并在指定的時間內(nèi)保留它們。消息通過指示日志中位置的偏移量來存儲和索引。
  • 容錯:數(shù)據(jù)在可配置數(shù)量的服務(wù)器上復(fù)制。如果一臺服務(wù)器宕機(jī),另一臺服務(wù)器可以保證持續(xù)運(yùn)行。
  • 橫向可擴(kuò)展性:Kafka集群可以通過簡單地添加更多的服務(wù)器來彈性擴(kuò)展。這允許無限的存儲和處理能力。

Kafka架構(gòu)由以下主要組件組成:

(1)主題

消息被發(fā)布到名為“主題”的類別中。每個主題都充當(dāng)消息提要或消息隊(duì)列。常見的場景是每個消息類型或數(shù)據(jù)流的一個主題。Kafka主題中的每條消息都有一個唯一的標(biāo)識符,稱為偏移量,它代表了在主題中的位置。一個主題可以分為多個分區(qū),這些分區(qū)是可以存儲在不同代理上的主題片段。分區(qū)允許Kafka通過在多個消費(fèi)者之間分配負(fù)載來擴(kuò)展和并行化數(shù)據(jù)處理。

(2)生產(chǎn)者

生產(chǎn)者是向Kafka主題發(fā)布消息的應(yīng)用程序。它們連接到Kafka集群,序列化數(shù)據(jù)(例如JSON或Avro),分配一個密鑰,并將其發(fā)送到適當(dāng)?shù)闹黝}。

例如,一個Web應(yīng)用程序可以產(chǎn)生點(diǎn)擊流事件,或者一個移動應(yīng)用程序可以產(chǎn)生使用統(tǒng)計(jì)。

(3)消費(fèi)者

消費(fèi)者從Kafka主題中讀取消息并進(jìn)行處理。處理可能涉及解析數(shù)據(jù)、驗(yàn)證、聚合、過濾、存儲到數(shù)據(jù)庫等。

消費(fèi)者連接到Kafka集群,并訂閱一個或多個主題來獲取消息提要,然后根據(jù)用例需求進(jìn)行處理。

(4)代理

這是一個Kafka服務(wù)器,它接收來自生產(chǎn)者的消息,分配偏移量,將消息提交到存儲中,并將數(shù)據(jù)提供給消費(fèi)者。Kafka集群由多個代理組成,以實(shí)現(xiàn)可擴(kuò)展性和容錯性。

(5)ZooKeeper

ZooKeeper處理代理之間的協(xié)調(diào)和共識,例如控制器選舉和主題配置。它維護(hù)Kafka操作所需的集群狀態(tài)和配置信息。

這涵蓋了Kafka的基礎(chǔ)知識。要深入了解,可以參考一些Kafka文檔。

以下了解如何通過在云中運(yùn)行Kafka來簡化管理。

在云中運(yùn)行Kafka

雖然Kafka具有高度可擴(kuò)展性和可靠性,但它的運(yùn)行涉及部署、基礎(chǔ)設(shè)施管理、監(jiān)控、安全、故障處理、升級等方面的大量工作。

值得慶幸的是,Kafka現(xiàn)在是所有主要云計(jì)算提供商提供的完全托管服務(wù):

服務(wù)

描述

定價

AWS MSK

在AWS上完全托管、高可用的Apache Kafka集群。處理基礎(chǔ)設(shè)施,擴(kuò)展,安全,故障處理等。

基于代理的數(shù)量

Google Cloud Pub/Sub

基于Kafka的無服務(wù)器實(shí)時消息服務(wù)。自動擴(kuò)展,至少一次交付保證。

基于使用指標(biāo)

Confluent Cloud

完全管理的事件流平臺,由Apache Kafka提供支持。提供免費(fèi)層。

基于功能的分層定價

Azure Event Hubs

Apache Kafka的高吞吐量事件攝取服務(wù)。與Azure數(shù)據(jù)服務(wù)的集成。

基于吞吐量單位

托管服務(wù)抽象了Kafka操作的復(fù)雜性,可以讓用戶專注數(shù)據(jù)管道。

接下來,將使用Python、Kafka和云平臺構(gòu)建一個實(shí)時管道。也可以參考以下的指南作為另一個示例。

構(gòu)建實(shí)時數(shù)據(jù)管道

Kafka的基本實(shí)時管道有兩個主要組件:向Kafka發(fā)布消息的生產(chǎn)者和訂閱主題并處理消息的消費(fèi)者。

其架構(gòu)遵循以下流程:

為了進(jìn)行簡化,將使用Confluent Kafka Python客戶端庫。

1. Python生產(chǎn)者

生產(chǎn)者應(yīng)用程序從數(shù)據(jù)源收集數(shù)據(jù)并將其發(fā)布到Kafka主題。作為一個例子,假設(shè)有一個Python服務(wù)從一個Web應(yīng)用程序收集用戶點(diǎn)擊流事件。

Web應(yīng)用程序中,當(dāng)用戶的行為像是頁面瀏覽或產(chǎn)品評級時,可以捕獲這些事件并將它們發(fā)送給Kafka。

可以抽象出Web應(yīng)用程序如何收集數(shù)據(jù)的實(shí)現(xiàn)細(xì)節(jié)。

Python 
 from confluent_kafka import Producer
 import json

 # User event data
 event = {
 "timestamp": "2022-01-01T12:22:25", 
 "userid": "user123",
 "page": "/product123", 
 "action": "view"
 }

 # Convert to JSON
 event_json = json.dumps(event)

 # Kafka producer configuration 
 conf = {
 'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092',
 'client.id': 'clickstream-producer' 
 }

 # Create producer instance
 producer = Producer(conf)

 # Publish event 
 producer.produce(topic='clickstream', value=event_json)

 # Flush and close producer
 producer.flush()
 producer.close()

這將事件發(fā)布到云托管Kafka集群上的clickstream主題。

Confluent_Kafka Python客戶端在將消息發(fā)送到Kafka之前使用內(nèi)部緩沖區(qū)來批處理消息。與單獨(dú)發(fā)送每條消息相比,這提高了效率。

在默認(rèn)情況下,消息會在緩沖區(qū)中累積,直到:

(1)已達(dá)到緩沖區(qū)大小限制(默認(rèn)為32MB)。

(2)調(diào)用flush()方法。

當(dāng)調(diào)用flush()時,緩沖區(qū)中的任何消息都會立即發(fā)送到Kafka代理。

如果不調(diào)用flush(),而是依賴于緩沖區(qū)大小限制,那么在下一次自動刷新之前,如果發(fā)生故障,就有丟失事件的風(fēng)險(xiǎn)。調(diào)用flush()能夠更好地控制最小化潛在的消息丟失。

但是,在每次生產(chǎn)后調(diào)用flush()會帶來額外的開銷。找到合適的緩沖配置取決于特定的可靠性需求和吞吐量需求。

可以在事件發(fā)生時不斷添加事件來構(gòu)建實(shí)時流。這為下游數(shù)據(jù)消費(fèi)者提供了連續(xù)的事件提要。

2.Python消費(fèi)者

接下來,有一個消費(fèi)者應(yīng)用程序來從Kafka攝取事件并處理它們。

例如,可能想要解析事件,篩選特定的子類型,并驗(yàn)證模式。

Python 
 from confluent_kafka import Consumer
 import json

 # Kafka consumer configuration
 conf = {'bootstrap.servers': 'my_kafka_cluster-xyz.cloud.provider.com:9092',
  'group.id': 'clickstream-processor',
 'auto.offset.reset': 'earliest'}

 # Create consumer instance 
 consumer = Consumer(conf)

 # Subscribe to 'clickstream' topic
 consumer.subscribe(['clickstream'])

 # Poll Kafka for messages infinitely 
 while True:
 msg = consumer.poll(1.0)
 if msg is None:
 continue
 
 # Parse JSON from message value
 event = json.loads(msg.value())
 
 # Process event based on business logic
 if event['action'] == 'view':
 print('User viewed product page')
 
 elif event['action'] == 'rating':
 # Validate rating, insert to DB etc
  pass
 
 print(event) # Print event 
 
 # Close consumer
 consumer.close()

這個輪詢clickstream主題以獲取新消息,使用它們,并根據(jù)事件類型采取行動——打印、更新數(shù)據(jù)庫等。

對于一個簡單的管道來說,這很有效。但如果每秒事件數(shù)增加100倍呢?消費(fèi)者將無法跟上其增長。這就是像PySpark這樣的工具可以幫助擴(kuò)展處理的地方。

3.使用PySpark進(jìn)行擴(kuò)展

PySpark為Apache Spark提供了一個Python API,Apache Spark是一個為大規(guī)模數(shù)據(jù)處理優(yōu)化的分布式計(jì)算框架。

使用PySpark,可以利用Spark的內(nèi)存計(jì)算和并行執(zhí)行來更快地使用Kafka流。

首先,將Kafka數(shù)據(jù)加載到DataFrame中,DataFrame可以使用Spark SQL或Python進(jìn)行操作。

Python 
 from pyspark.sql import SparkSession

 # Initialize Spark session
 spark = SparkSession.builder \
 .appName('clickstream-consumer') \
 .getOrCreate()

 # Read stream from Kafka 'clickstream' 
 df = spark.readStream \
 .format("kafka") \
 .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
 .option("subscribe", "clickstream") \
 .load()

 # Parse JSON from value
 df = df.selectExpr("CAST(value AS STRING)")
 df = df.select(from_json(col("value"), schema).alias("data"))
 Next, we can express whatever processing logic we need using DataFrame transformations:
 from pyspark.sql.functions import *

 # Filter for 'page view' events 
 views = df.filter(col("data.action") == "view")

 # Count views per page URL 
 counts = views.groupBy(col("data.page"))
 .count()
 .orderBy("count")

 # Print the stream 
 query = counts.writeStream \
 .outputMode("complete") \ 
 .format("console") \
 .start() 
 
 query.awaitTermination()

它利用Spark的分布式運(yùn)行時,在數(shù)據(jù)流上實(shí)時應(yīng)用過濾、聚合和排序等操作。

還可以使用多個消費(fèi)者組并行化消費(fèi),并將輸出接收器寫入數(shù)據(jù)庫、云存儲等。

這允許在Kafka的數(shù)據(jù)上構(gòu)建可擴(kuò)展的流處理。

現(xiàn)在已經(jīng)介紹了端到端管道,以下了解應(yīng)用它的一些實(shí)際例。

實(shí)際用例

以下探索一些實(shí)際用例,在這些用例中,這些技術(shù)可以幫助大規(guī)模地處理大量實(shí)時數(shù)據(jù)。

1.用戶活動跟蹤

許多現(xiàn)代網(wǎng)絡(luò)和移動應(yīng)用程序跟蹤用戶的行為,例如頁面瀏覽量、按鈕點(diǎn)擊、交易等,以收集使用情況分析。

(1)問題

  • 數(shù)據(jù)量可以隨著數(shù)百萬活躍用戶而大規(guī)模擴(kuò)展。
  • 需要實(shí)時洞察以檢測問題并個性化內(nèi)容。
  • 希望為歷史報(bào)表存儲匯總數(shù)據(jù)。

(2)解決方案

  • 使用Python或任何語言將點(diǎn)擊流事件攝取到Kafka主題中。
  • 使用PySpark進(jìn)行清理、聚合和分析。
  • 將輸出保存到數(shù)據(jù)庫,例如Cassandra的儀表板。
  • 使用Spark ML實(shí)時警報(bào)檢測異常。

2.物聯(lián)網(wǎng)數(shù)據(jù)管道

物聯(lián)網(wǎng)傳感器產(chǎn)生大量的實(shí)時遙測數(shù)據(jù),例如溫度、壓力、位置等。

(1)問題

  • 每秒產(chǎn)生數(shù)百萬個傳感器事件。
  • 需要清洗、改造、豐富。
  • 需要實(shí)時監(jiān)控和歷史存儲。

(2)解決方案

  • 使用語言SDK收集Kafka主題中的傳感器數(shù)據(jù)。
  • 使用PySpark進(jìn)行數(shù)據(jù)整理和連接外部數(shù)據(jù)。
  • 將數(shù)據(jù)流輸入機(jī)器學(xué)習(xí)模型進(jìn)行實(shí)時預(yù)測。
  • 將聚合數(shù)據(jù)存儲在時間序列數(shù)據(jù)庫中以實(shí)現(xiàn)可視化。

3.客戶支持聊天分析

像Zendesk這樣的聊天平臺捕獲了大量的客戶支持對話。

(1)問題

  • 每月產(chǎn)生數(shù)百萬條聊天信息。
  • 需要了解客戶痛點(diǎn)和代理表現(xiàn)。
  • 必須發(fā)現(xiàn)負(fù)面情緒和緊急問題。

(2)解決方案

  • 使用連接器將聊天記錄導(dǎo)入Kafka主題。
  • 使用PySpark SQL和DataFrames進(jìn)行聚合和處理。
  • 將數(shù)據(jù)輸入NLP模型,對情緒和意圖進(jìn)行分類。
  • 存儲洞察到數(shù)據(jù)庫的歷史報(bào)告。
  • 為聯(lián)絡(luò)中心操作提供實(shí)時儀表板。

這個例演示了如何將這些技術(shù)應(yīng)用于涉及大量快速移動數(shù)據(jù)的實(shí)際業(yè)務(wù)問題。

結(jié)論

綜上所述, Python、Kafka和云平臺為構(gòu)建健壯的、可擴(kuò)展的實(shí)時數(shù)據(jù)管道提供了一個很好的組合。

原文標(biāo)題:Building Robust Real-Time Data Pipelines With Python, Apache Kafka, and the Cloud,作者:Dmitrii Mitiaev

責(zé)任編輯:華軒 來源: 51CTO
相關(guān)推薦

2023-12-11 08:00:00

架構(gòu)FlinkDruid

2021-07-29 08:00:00

開源數(shù)據(jù)技術(shù)

2022-03-07 07:18:18

Netflix機(jī)器學(xué)習(xí)架構(gòu)

2021-09-13 13:46:29

Apache HudiB 站數(shù)據(jù)湖

2022-08-01 15:58:48

數(shù)據(jù)倉庫架構(gòu)數(shù)據(jù)

2023-10-11 14:37:21

工具開發(fā)

2023-07-20 08:00:00

可視化數(shù)據(jù)Python

2023-12-13 09:00:00

2023-05-25 08:24:46

Kafka大數(shù)據(jù)

2019-08-19 14:24:39

數(shù)據(jù)分析Spark操作

2022-09-22 10:53:38

實(shí)時數(shù)據(jù)ML 模型

2024-07-25 08:12:11

2022-06-28 09:47:05

數(shù)據(jù)倉庫

2017-08-09 13:30:21

大數(shù)據(jù)Apache Kafk實(shí)時處理

2016-11-29 09:27:22

Apache SparDashboard構(gòu)建

2012-08-24 08:51:27

IBMdW

2012-08-28 10:52:58

IBMdW

2023-10-23 10:06:53

數(shù)據(jù)性能

2020-12-01 15:06:46

KafkaFlink數(shù)據(jù)倉庫

2018-08-03 15:28:51

數(shù)據(jù)平臺數(shù)據(jù)倉庫OLTP
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 亚洲精品美女视频 | 玩丰满女领导对白露脸hd | 国产精品国产精品国产专区不片 | 在线视频中文字幕 | 日日操夜夜操天天操 | 亚洲第一成人影院 | 欧美色性| 欧美国产视频 | 亚洲成人午夜电影 | 国产成人免费 | 自拍视频一区二区三区 | 96国产精品久久久久aⅴ四区 | 亚洲中字在线 | 91免费观看 | 96国产精品久久久久aⅴ四区 | 国产重口老太伦 | 欧美a在线看 | 99热.com| 亚洲精品18 | 欧美婷婷 | 亚洲第一av | 9191成人精品久久 | 久草在线青青草 | 国产精品久久久久久久一区二区 | 在线精品一区二区 | 国产在线一区二区三区 | 精精国产xxxx视频在线播放 | 日韩欧美在线免费观看 | 欧美一区二区三区久久精品 | 日本不卡免费新一二三区 | 亚洲精品区 | 欧美亚洲第一区 | 亚洲精品自拍 | 日本亚洲精品成人欧美一区 | 欧美一区视频 | 久久久精品一区二区三区 | 中文字幕在线免费视频 | 国际精品鲁一鲁一区二区小说 | 国产精品入口 | 爱草在线| 国产精品区一区二区三 |