成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

使用Kafka和MongoDB進行Go異步處理

開發 后端 其他數據庫 Kafka MongoDB
在我前面的博客文章 “我的第一個 Go 微服務:使用 MongoDB 和 Docker 多階段構建” 中,我創建了一個 Go 微服務示例,它發布一個 REST 式的 http 端點,并將從 HTTP POST 中接收到的數據保存到 MongoDB 數據庫。

[[240575]]

在我前面的博客文章 “我的***個 Go 微服務:使用 MongoDB 和 Docker 多階段構建” 中,我創建了一個 Go 微服務示例,它發布一個 REST 式的 http 端點,并將從 HTTP POST 中接收到的數據保存到 MongoDB 數據庫。

在這個示例中,我將數據的保存和 MongoDB 分離,并創建另一個微服務去處理它。我還添加了 Kafka 為消息層服務,這樣微服務就可以異步處理它自己關心的東西了。

如果你有時間去看,我將這個博客文章的整個過程錄制到 這個視頻中了 :)

下面是這個使用了兩個微服務的簡單的異步處理示例的上層架構圖。

rest-kafka-mongo-microservice-draw-io

rest-kafka-mongo-microservice-draw-io

微服務 1 —— 是一個 REST 式微服務,它從一個 /POST http 調用中接收數據。接收到請求之后,它從 http 請求中檢索數據,并將它保存到 Kafka。保存之后,它通過 /POST 發送相同的數據去響應調用者。

微服務 2 —— 是一個訂閱了 Kafka 中的一個主題的微服務,微服務 1 的數據保存在該主題。一旦消息被微服務消費之后,它接著保存數據到 MongoDB 中。

在你繼續之前,我們需要能夠去運行這些微服務的幾件東西:

  1. 下載 Kafka —— 我使用的版本是 kafka_2.11-1.1.0
  2. 安裝 librdkafka —— 不幸的是,這個庫應該在目標系統中
  3. 安裝 Kafka Go 客戶端
  4. 運行 MongoDB。你可以去看我的 以前的文章 中關于這一塊的內容,那篇文章中我使用了一個 MongoDB docker 鏡像。

我們開始吧!

首先,啟動 Kafka,在你運行 Kafka 服務器之前,你需要運行 Zookeeper。下面是示例:

  1. $ cd /<download path>/kafka_2.11-1.1.0
  2. $ bin/zookeeper-server-start.sh config/zookeeper.properties

接著運行 Kafka —— 我使用 9092 端口連接到 Kafka。如果你需要改變端口,只需要在 config/server.properties 中配置即可。如果你像我一樣是個新手,我建議你現在還是使用默認端口。

  1. $ bin/kafka-server-start.sh config/server.properties

Kafka 跑起來之后,我們需要 MongoDB。它很簡單,只需要使用這個 docker-compose.yml 即可。

  1. version: '3'
  2. services:
  3. mongodb:
  4. image: mongo
  5. ports:
  6. - "27017:27017"
  7. volumes:
  8. - "mongodata:/data/db"
  9. networks:
  10. - network1
  11.  
  12. volumes:
  13. mongodata:
  14.  
  15. networks:
  16. network1:

使用 Docker Compose 去運行 MongoDB docker 容器。

  1. docker-compose up

這里是微服務 1 的相關代碼。我只是修改了我前面的示例去保存到 Kafka 而不是 MongoDB:

rest-to-kafka/rest-kafka-sample.go

  1. func jobsPostHandler(w http.ResponseWriter, r *http.Request) {
  2.  
  3. //Retrieve body from http request
  4. b, err := ioutil.ReadAll(r.Body)
  5. defer r.Body.Close()
  6. if err != nil {
  7. panic(err)
  8. }
  9.  
  10. //Save data into Job struct
  11. var _job Job
  12. err = json.Unmarshal(b, &_job)
  13. if err != nil {
  14. http.Error(w, err.Error(), 500)
  15. return
  16. }
  17.  
  18. saveJobToKafka(_job)
  19.  
  20. //Convert job struct into json
  21. jsonString, err := json.Marshal(_job)
  22. if err != nil {
  23. http.Error(w, err.Error(), 500)
  24. return
  25. }
  26.  
  27. //Set content-type http header
  28. w.Header().Set("content-type", "application/json")
  29.  
  30. //Send back data as response
  31. w.Write(jsonString)
  32.  
  33. }
  34.  
  35. func saveJobToKafka(job Job) {
  36.  
  37. fmt.Println("save to kafka")
  38.  
  39. jsonString, err := json.Marshal(job)
  40.  
  41. jobString := string(jsonString)
  42. fmt.Print(jobString)
  43.  
  44. p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
  45. if err != nil {
  46. panic(err)
  47. }
  48.  
  49. // Produce messages to topic (asynchronously)
  50. topic := "jobs-topic1"
  51. for _, word := range []string{string(jobString)} {
  52. p.Produce(&kafka.Message{
  53. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
  54. Value: []byte(word),
  55. }, nil)
  56. }
  57. }

這里是微服務 2 的代碼。在這個代碼中最重要的東西是從 Kafka 中消費數據,保存部分我已經在前面的博客文章中討論過了。這里代碼的重點部分是從 Kafka 中消費數據:

kafka-to-mongo/kafka-mongo-sample.go

  1. func main() {
  2.  
  3. //Create MongoDB session
  4. session := initialiseMongo()
  5. mongoStore.session = session
  6.  
  7. receiveFromKafka()
  8.  
  9. }
  10.  
  11. func receiveFromKafka() {
  12.  
  13. fmt.Println("Start receiving from Kafka")
  14. c, err := kafka.NewConsumer(&kafka.ConfigMap{
  15. "bootstrap.servers": "localhost:9092",
  16. "group.id": "group-id-1",
  17. "auto.offset.reset": "earliest",
  18. })
  19.  
  20. if err != nil {
  21. panic(err)
  22. }
  23.  
  24. c.SubscribeTopics([]string{"jobs-topic1"}, nil)
  25.  
  26. for {
  27. msg, err := c.ReadMessage(-1)
  28.  
  29. if err == nil {
  30. fmt.Printf("Received from Kafka %s: %s\n", msg.TopicPartition, string(msg.Value))
  31. job := string(msg.Value)
  32. saveJobToMongo(job)
  33. } else {
  34. fmt.Printf("Consumer error: %v (%v)\n", err, msg)
  35. break
  36. }
  37. }
  38.  
  39. c.Close()
  40.  
  41. }
  42.  
  43. func saveJobToMongo(jobString string) {
  44.  
  45. fmt.Println("Save to MongoDB")
  46. col := mongoStore.session.DB(database).C(collection)
  47.  
  48. //Save data into Job struct
  49. var _job Job
  50. b := []byte(jobString)
  51. err := json.Unmarshal(b, &_job)
  52. if err != nil {
  53. panic(err)
  54. }
  55.  
  56. //Insert job into MongoDB
  57. errMongo := col.Insert(_job)
  58. if errMongo != nil {
  59. panic(errMongo)
  60. }
  61.  
  62. fmt.Printf("Saved to MongoDB : %s", jobString)
  63.  
  64. }

我們來演示一下,運行微服務 1。確保 Kafka 已經運行了。

  1. $ go run rest-kafka-sample.go

我使用 Postman 向微服務 1 發送數據。

Screenshot-2018-04-29-22.20.33

Screenshot-2018-04-29-22.20.33

這里是日志,你可以在微服務 1 中看到。當你看到這些的時候,說明已經接收到了來自 Postman 發送的數據,并且已經保存到了 Kafka。

Screenshot-2018-04-29-22.22.00

Screenshot-2018-04-29-22.22.00

因為我們尚未運行微服務 2,數據被微服務 1 只保存在了 Kafka。我們來消費它并通過運行的微服務 2 來將它保存到 MongoDB。

  1. $ go run kafka-mongo-sample.go

現在,你將在微服務 2 上看到消費的數據,并將它保存到了 MongoDB。

Screenshot-2018-04-29-22.24.15

Screenshot-2018-04-29-22.24.15

檢查一下數據是否保存到了 MongoDB。如果有數據,我們成功了!

Screenshot-2018-04-29-22.26.39

Screenshot-2018-04-29-22.26.39

完整的源代碼可以在這里找到:

https://github.com/donvito/learngo/tree/master/rest-kafka-mongo-microservice 

責任編輯:龐桂玉 來源: Linux中國
相關推薦

2023-10-11 14:37:21

工具開發

2015-12-11 13:39:56

GoiOSAndroid

2021-06-15 15:03:21

MongoDBNode.jsCRUD

2023-11-08 15:04:55

事務GORM

2023-10-30 23:25:48

FuturesGo語言

2015-06-16 11:06:42

JavaCompletable

2021-04-26 05:33:54

Python異步編程

2024-02-07 11:44:20

NestJSRxJS異步編程

2023-09-27 15:34:48

數據編程

2023-11-06 08:01:09

Go同步異步

2023-10-28 16:22:21

Go接口

2021-11-29 22:59:34

Go Dockertest集成

2023-06-15 13:01:07

JavaPythonJavaScript

2012-04-19 10:04:20

ibmdw

2018-09-11 09:41:19

2019-07-02 14:05:23

Go語言高并發

2022-05-05 08:13:16

Go數組類型

2024-01-15 06:05:05

DockerGol ang應用程序

2024-05-06 13:34:28

WireGoogleGo

2022-08-12 08:38:52

FFmpegLinux命令
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 超碰97av | 欧美精品a∨在线观看不卡 欧美日韩中文字幕在线播放 | 岛国精品 | 欧美日韩免费视频 | 国产精品国产三级国产aⅴ中文 | 日韩精品一区二区三区 | 日本a∨精品中文字幕在线 亚洲91视频 | 精精精精xxxx免费视频 | 国产精品一区二区无线 | 一区二区三区国产精品 | 久久国内精品 | 日日夜夜免费精品视频 | 日韩一区二区三区四区五区 | 天天综合网7799精品 | 91综合网| 伊人伊人网 | 亚洲国产精品网站 | 在线婷婷| 欧美精品在线播放 | 久久精品亚洲一区二区三区浴池 | 精品国产乱码一区二区三区 | 成人av免费 | 日本精品一区二区三区在线观看视频 | 久久午夜视频 | 日韩一区二区在线播放 | 人人九九精 | 国产一区二区不卡 | 九九热精品在线 | 精品欧美一区二区在线观看欧美熟 | 99热精品在线 | 北条麻妃一区二区三区在线视频 | 日韩播放 | 欧美激情一区 | 日韩精品一区二区三区在线播放 | 五月天激情综合网 | 中文字幕日本一区二区 | 天天操天天射天天 | 久久久久亚洲国产| 91精品久久久久久久99 | 国产一区二区三区在线免费 | 九九综合 |