使用 Go 與 Redis Streams 構建可靠的事件驅動系統
事件驅動架構在現代軟件系統中十分常見,它讓各組件能夠異步通信。傳統實現通常借助 Kafka、Google Pub/Sub 或 Amazon SQS 等消息中間件,但在某些場景下,我們可能想用更輕量又足夠可靠的方案進行學習或滿足定制需求。
本文演示如何基于 Golang + Redis Streams 搭建一個高可靠性的事件驅動系統。
為什么事件驅動系統需要“可靠性”
在很多業務里,丟失事件是不可接受的。以告警系統為例,若漏掉一次關鍵告警,可能導致宕機、數據泄露或交易失敗。因而系統必須滿足:
- 持久化(Durability):事件在被處理前必須保存下來;
- 確認與重試(Ack & Retry):消費失敗不能導致事件丟失;
- 可擴展(Scalability):支持多生產者、多消費者并發處理。
為什么選 Redis Streams 而非 Pub/Sub?
Redis 原生 Pub/Sub 只做即時推送,訂閱者離線時消息直接丟棄;而 Redis Streams 提供:
- 消息持久化;
- Consumer Group,便于水平擴展;
- 消息確認及重放;
- 高效處理大規模事件。
系統架構
- 事件生產者:產生事件并寫入 Redis Stream;
- Redis Streams:中央事件存儲與消息分發;
- 事件消費者:讀取、處理并確認事件。
Golang 實現
(1) 啟動 Redis
redis-server
(2) 安裝 Go 客戶端
go get github.com/redis/go-redis/v9
(3) 事件生產者
package main
import (
"context"
"fmt"
"log"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
func main() {
client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
event := map[string]interface{}{"message": "Critical alert! Server down."}
_, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "alerts",
Values: event,
}).Result()
if err != nil {
log.Fatalf("發布事件失敗: %v", err)
}
fmt.Println("事件發布成功")
}
(4) 事件消費者
package main
import (
"context"
"fmt"
"log"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
func main() {
client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
for {
res, err := client.XRead(ctx, &redis.XReadArgs{
Streams: []string{"alerts", "$"}, // "$" 表示從最新位置開始
Count: 1,
Block: 0, // 阻塞等待
}).Result()
if err != nil {
log.Fatalf("讀取事件失敗: %v", err)
}
for _, stream := range res {
for _, msg := range stream.Messages {
fmt.Printf("處理事件: %v\n", msg.Values)
}
}
}
}
走向生產的強化點
盡管這只是一個簡單的演示,真正用于生產的版本還應包含以下功能:
- 錯誤處理與重試:在失敗時實現指數退避重試機制;
- 消費者組:將負載分配給多個消費者以實現并行處理;
- 監控與日志:持續追蹤事件處理的各項指標;
- 持久化與備份:啟用磁盤持久化,防止數據丟失并支持備份。
結語
借助 Redis Streams + Golang,我們可以構建一個具備持久化、確認機制和水平擴展能力的輕量事件驅動系統,非常適合學習及小型高可用場景。