如何在 .NET 中使用 Kafka
本文轉載自微信公眾號「碼農讀書」,作者 碼農讀書。轉載本文請聯系碼農讀書公眾號。
Kafka 是一個開源的,分布式的,可擴展的,高性能的發布訂閱模式的消息中間件,如果你要構建一個處理海量數據的系統,那么 Kafka 將會是一個非常好的選擇,這篇文章我們將會討論如何基于 Kakfa 構建一個發布訂閱模式的程序。
Kafka 架構
這一節中,先來看看 Kafka 的基礎架構以及相關術語,大體來說 Kafka 由下面幾個組件組成。
- Kafka Cluster 一個或者多個服務器組成的集群
- Producer 一個用于發布消息的組件。
- Consumer 一個用于獲取并處理消息的組件。
- ZooKeeper 一個中心化的協調組件,常用于保存分布式環境下各個節點的配置信息。
在 Kafka 中,數據的基本單元是 message,它是一個 key-value 鍵值對,kafka 會將所有的 message 轉換為 byte[],值得注意的是:生產者 和 消費者 以及 cluster 集群之間都是采用 tcp 協議通訊的,kafka 集群中的每一臺機器都被稱為代理(broker),你可以非常容易的向集群添加機器實現容量的橫向擴展。
下面的圖展示了 kafka 的基礎架構。
kafka 中的 topic 表示 message 的邏輯集合,如果不明白的話,你可以認為 topic 就是 category (分類),category 下自然就是歸類的 message,這些 message 是由 生產者 產生。
kafka server 中會包含一個或者多個 topics,每一個 topics 又可以包含一個或者多個 partitions(分區),partition 被定義為一個有序的消息序列,值得注意的是 partitions 是 kafka 能夠動態擴展的關鍵,換句話說 partition 可以分布在多個 kafka server 上,具體操作流程為:kafka 中的 生產者 將 message 推送到指定的 topic,訂閱該 topic 的 消費者 就可以拿到該消息。
Kafka 和 RabbitMQ 比較
Kafka 和 RabbitMQ 都是非常流行的,開源的 消息中間件,那什么時候應該選擇 Kakfa 而不是 RabbitMQ 呢?主要考慮如下幾點。
- RabbitMQ 是由高性能語言 Erlang 編寫的,它擁有豐富的 路由機制 和強大的 消息確認機制, 同時 RabbitMQ 還提供了一個可視化的 WebUI 界面,可以通過它監視 RabbitMQ 的運行狀態,但如果你有大規模部署的需求,RabbitMQ 就沒有 Kafka 好使了,因為后者的擴容只需要增加 partitions 就可以了。
- RabbitMQ Cluster 會存在經典的 腦裂問題,需要使用單獨的插件支持(federations)。
- Kafka 在性能上遠超 RabbitMQ,單節點的 Kafka 能夠處理 10w/s 條記錄,而 RabbitMQ 大概只能處理 2w/s 條記錄。
構建 生產者 和 消費者
這一節我們來討論如何為 Kafka 構建生產者和消費者,這就需要構建兩個 Console 程序分別充當各自角色,大家可以用 nuget 安裝一下 kafka-net,命令如下:
- Install-Package kafka-net
構建 生產者 Console
- static void Main(string[] args)
- {
- string payload ="Welcome to Kafka!";
- string topic ="IDGTestTopic";
- Message msg = new Message(payload);
- Uri uri = new Uri("http://localhost:9092");
- var options = new KafkaOptions(uri);
- var router = new BrokerRouter(options);
- var client = new Producer(router);
- client.SendMessageAsync(topic, new List<Message> { msg }).Wait();
- Console.ReadLine();
- }
構建 消費者 Console
- static void Main(string[] args)
- {
- string topic ="IDGTestTopic";
- Uri uri = new Uri("http://localhost:9092");
- var options = new KafkaOptions(uri);
- var router = new BrokerRouter(options);
- var consumer = new Consumer(new ConsumerOptions(topic, router));
- foreach (var message in consumer.Consume())
- {
- Console.WriteLine(Encoding.UTF8.GetString(message.Value));
- }
- Console.ReadLine();
- }
最后可以依次將 生產者 和 消費者 程序啟動起來,然后你就會看到 消費者 Console 上顯示:Welcome to Kafka! 。
其實在開源世界中有太多的消息中間件,比如:RabbitMQ, MSMQ, IBM MQ Series 等等,現在的 Kafka 不僅僅是 消息中間件 了,而是用于大數據的 流式處理平臺,Kafka 也常常用于 IOT 程序,日志聚合 和 其他低延遲,強消息保證 等場景,如果你的應用程序需要一個快速并可擴展的消息中間件,kafka 將會是一個非常好的選擇,后續我會分享更多的關于 kafka 的文章。
譯文鏈接:https://www.infoworld.com/article/3215165/how-to-use-apache-kafka-messaging-in-net.html