成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Go 語言微服務框架 Kratos 集成第三方庫 kafka-go 操作消息隊列 Kafka

開發(fā) 架構
Go 語言微服務框架 Kratos 不限制使用任何第三方庫,Go 語言操作消息隊列 Kafka 有很多優(yōu)秀的第三方庫,比如 sarama 和 kafka-go,我們在之前的文章中介紹過 Go 語言怎么使用 sarama 操作消息隊列 Kafka。

1.介紹

Go 語言微服務框架 Kratos 不限制使用任何第三方庫,Go 語言操作消息隊列 Kafka 有很多優(yōu)秀的第三方庫,比如 sarama 和 kafka-go,我們在之前的文章中介紹過 Go 語言怎么使用 sarama 操作消息隊列 Kafka。

本文我們介紹 Go 微服務框架 Kratos 怎么集成第三方庫 kafka-go[1] 操作消息隊列 Kafka。

2.Kratos 集成第三方庫 kafka-go

我們在本地搭建 Go 運行環(huán)境,并安裝 kratos 工具,使用 kratos 工具創(chuàng)建項目 blog。

在 blog 項目中,集成第三方庫 kafka-go。

創(chuàng)建項目

示例代碼:

kratos new blog

安裝 kafka-go

go get github.com/segmentio/kafka-go

集成 Kafka Producer(生產者)和 Kafka Consumer(消費者)

編寫文件 blog/internal/data/data.go

導入第三方庫:

import (
 "github.com/segmentio/kafka-go"
)

添加 Kafka Producer(生產者)和 Kafka Consumer(消費者):

// Data .
type Data struct {
 // TODO wrapped database client
 dbEngine *xorm.Engine
 kp       *kafkaProducer
 kc       *KafkaConsumer
}

// NewData .
func NewData(c *conf.Data, logger log.Logger, dbEngin *xorm.Engine, kp *kafkaProducer, kc *KafkaConsumer) (*Data, func(), error) {
 cleanup := func() {
  log.NewHelper(logger).Info("closing the data resources")
 }
 return &Data{
  dbEngine: dbEngin,
  kp:       kp,
  kc:       kc,
 }, cleanup, nil
}

Kafka Producer(生產者):

type kafkaProducer struct {
 writer *kafka.Writer
}

func NewKafkaProducer(c *conf.Data) *kafkaProducer {
 brokers := c.Kafka.Brokers
 topic := c.Kafka.Topic
 writer := &kafka.Writer{
  Addr:     kafka.TCP(brokers...),
  Topic:    topic,
  Balancer: &kafka.LeastBytes{},
 }
 return &kafkaProducer{writer: writer}
}

func (p *kafkaProducer) SendMessage(ctx context.Context, key, value []byte) error {
 err := p.writer.WriteMessages(ctx, kafka.Message{
  Key:   key,
  Value: value,
 })
 if err != nil {
  return err
 }
 return nil
}

func (p *kafkaProducer) Close() error {
 return p.writer.Close()
}

Kafka Consumer(消費者):

type KafkaConsumer struct {
 reader *kafka.Reader
}

func NewKafkaConsumer(c *conf.Data) *KafkaConsumer {
 brokers := c.Kafka.Brokers
 topic := c.Kafka.Topic
 groupId := c.Kafka.GroupId
 reader := kafka.NewReader(kafka.ReaderConfig{
  Brokers: brokers,
  Topic:   topic,
  GroupID: groupId,
 })
 return &KafkaConsumer{
  reader: reader,
 }
}

func (c *KafkaConsumer) Start(ctx context.Context) {
 for {
  msg, err := c.reader.ReadMessage(ctx)
  if err != nil {
   return
  }
  log.Debugf("key=%s || value=%s", string(msg.Key), string(msg.Value))
 }
}

func (c *KafkaConsumer) Close() error {
 return c.reader.Close()
}

生產 kafka 消息的方法:

創(chuàng)建文件 blog/internal/data/kafka.go。

示例代碼:

func (u *userRepository) KafkaSendMessage(ctx context.Context, key []byte, value []byte) (err error) {
 defer u.data.kp.Close()
 // 設置超時時間
 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 defer cancel()
 err = u.data.kp.SendMessage(ctx, key, value)
 if err != nil {
  log.Errorf("KafkaSendMessage() || err=%v", err)
  return
 }
 return
}

閱讀上面這段代碼,我們可以發(fā)現 KafkaSendMessage 方法封裝了生產 kafka 消息的方法 u.data.kp.SendMessage。

需要注意的是,我們需要設置超時時間,否則,會返回錯誤消息 context deadline exceeded。

添加 wire 提供者:

// ProviderSet is data providers.
var ProviderSet = wire.NewSet(NewData, NewGreeterRepo, NewDbEngine, NewUserRepository, NewKafkaProducer, NewKafkaConsumer)

生成 wire 代碼:

cd blog/cmd/blog
wire

3.操作 Kafka

在 Kratos 項目中,一般在項目的 biz 或 service 層使用 Kafka 的生產邏輯;在 service 層使用 Kafka 的消費邏輯。

限于篇幅,我們以 Kafka 的生產邏輯為例,介紹怎么在 biz 層生產 Kafka 消息。

編寫文件 blog/internal/biz/user.go,在 CreateUser 方法中添加生產 Kafka 消息的代碼。

type UserRepository interface {
 Create(ctx context.Context, user *User) (int64, error)
 KafkaSendMessage(ctx context.Context, key []byte, value []byte) (err error)
}

func (u *UserUsecase) CreateUser(ctx context.Context, user *User) (id int64, err error) {
 id, err = u.userRepo.Create(ctx, user)
 if err != nil {
  return
 }
 if id > 0 {
  var b []byte
  b, err = json.Marshal(user)
  if err != nil {
   return
  }
  err = u.userRepo.KafkaSendMessage(ctx, []byte(user.Name), b)
  if err != nil {
   return
  }
 }
 return
}

閱讀上面這段代碼,我們可以發(fā)現 UserRepository 接口中的方法 KafkaSendMessage,就是我們在 blog/internal/data/kafka.go 文件中實現的方法。

項目運行和測試:

Kratos 運行:

kratos run

curl 請求示例:

curl -H "Content-Type: application/json" -X POST -d '{"name":"mac", "email":"mac@gmail.com", "password":"123456"}' http://192.168.110.209:8000/user/create

kafka 消費者:

kafka_2.13-3.9.0/bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
{"Id":10,"Name":"mac","Email":"mac@gmail.com","Password":"123456","Created":1735972949,"Updated":1735972949}

4.總結

本文我們通過示例代碼,介紹 Kratos 微服務框架怎么集成第三方庫 kafka-go,操作 Kafka。

參考資料

[1]kafka-go: https://github.com/segmentio/kafka-go

責任編輯:武曉燕 來源: Golang語言開發(fā)棧
相關推薦

2024-12-30 00:38:23

Go語言微服務

2025-01-13 00:00:07

Go語言微服務

2024-12-23 00:22:55

2025-01-20 00:10:00

Go語言Kratos

2021-10-11 06:38:52

Go開源庫語言

2015-04-27 19:32:16

Moxtra

2019-07-30 11:35:54

AndroidRetrofit

2015-11-05 16:44:37

第三方登陸android源碼

2021-09-13 07:23:53

KafkaGo語言

2021-09-26 10:43:08

注冊Istio集成

2020-06-04 07:48:08

Istio服務注冊API Server

2014-07-22 10:56:45

Android Stu第三方類庫

2021-08-03 10:07:41

鴻蒙HarmonyOS應用

2022-08-15 23:09:53

jsonGo語言

2010-11-08 09:51:34

jQueryJavaScript

2014-07-23 08:55:42

iOSFMDB

2022-01-14 09:57:14

鴻蒙HarmonyOS應用

2011-07-25 14:14:49

iPhone SQLITE Pldatabase

2015-10-22 10:36:09

OracleRimini StreOracle訴訟

2011-05-07 14:20:25

加密方案Transcoder BlackBerry
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产精品a免费一区久久电影 | 成年人的视频免费观看 | 一区二区三区免费 | 欧美激情精品久久久久久变态 | 午夜网址| 久久综合久久久 | 久久久精 | 国产视频精品在线观看 | 精品免费国产视频 | 天天色天天 | 欧美一区久久 | 一级h片| 国产精品欧美一区二区三区 | 中文字幕 国产精品 | 成人福利电影 | 欧美一区二区免费 | 欧美成人精品一区 | 99国产精品99久久久久久 | 999免费视频 | 一级在线视频 | 久久久精品一区二区三区 | 欧美区精品 | 91av免费看 | 亚洲国产一区二区在线 | 在线色 | 亚洲精品一区二区三区中文字幕 | 日韩精品一区二区三区 | 亚洲中午字幕 | 久久免费精品 | 福利视频三区 | 91毛片在线看 | 免费午夜视频 | 日韩 欧美 综合 | 久久av网 | 亚洲国产精品一区二区三区 | 亚洲欧美中文日韩在线v日本 | www.日韩高清 | 一区二区成人 | 精品综合| 欧美日韩一区二区在线观看 | 欧美精品1区2区3区 免费黄篇 |