事件流與事件溯源
事件流和事件溯源是事件驅動架構中兩個相關但不同的概念。
事件流是持續捕獲和存儲系統中發生的事件的過程。這些事件可以實時處理和分析,也可以存儲以供后續分析。事件流通常用于需要實時處理大量數據的系統,如金融交易系統或社交媒體平臺。
以下是使用流行的Kafka消息系統在Go中進行事件流處理的簡單示例:
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
)
func main() {
// 設置Kafka生產者以將事件發送到主題
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
})
// 發送一些事件到主題
writer.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("key1"),
Value: []byte("value1"),
},
kafka.Message{
Key: []byte("key2"),
Value: []byte("value2"),
},
)
// 設置Kafka消費者以從主題讀取事件
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
})
// 從主題讀取事件
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("Received message: key=%s, value=%s\n", string(msg.Key), string(msg.Value))
}
}
而事件溯源是一種構建系統的模式,將應用程序狀態的所有變化存儲為事件序列。這些事件然后可以用于在任何時間點重建應用程序的狀態。事件溯源通常用于需要可審計性、可追溯性或合規性的系統,如金融系統或醫療系統。
以下是在Go中使用內存事件存儲進行事件溯源的簡單示例:
package main
import (
"fmt"
)
type Event struct {
Type string
Data interface{}
}
type EventStore struct {
events []Event
}
func (store *EventStore) Append(event Event) {
store.events = append(store.events, event)
}
func (store *EventStore) GetEvents() []Event {
return store.events
}
type Account struct {
id string
balance int
store *EventStore
}
func NewAccount(id string, store *EventStore) *Account {
return &Account{
id: id,
balance: 0,
store: store,
}
}
func (account *Account) Deposit(amount int) {
event := Event{
Type: "deposit",
Data: amount,
}
account.store.Append(event)
account.balance += amount
}
func (account *Account) Withdraw(amount int) {
if account.balance >= amount {
event := Event{
Type: "withdraw",
Data: amount,
}
account.store.Append(event)
account.balance -= amount
}
}
func (account *Account) GetBalance() int {
return account.balance
}
func main() {
store := &EventStore{}
account := NewAccount("123", store)
account.Deposit(100)
account.Withdraw(50)
account.Deposit(25)
events := store.GetEvents()
for _, event := range events {
switch event.Type {
case "deposit":
amount := event.Data.(int)
fmt.Printf("Deposited %d\n", amount)
case "withdraw":
amount := event.Data.(int)
fmt.Printf("Withdrew %d\n", amount)
}
}
fmt.Printf("Final balance: %d\n", account.GetBalance())
}
事件溯源是通過將每個對聚合的修改記錄為事件并將其追加到連續流中的一種方法。要重建聚合的最終狀態,需要按順序讀取這些事件,然后將其應用于聚合。這與在創建、讀取、更新和刪除(CRUD)系統中執行的即時修改形成對比。在CRUD系統中,對記錄狀態的任何更改都存儲在數據庫中,實質上覆蓋了同
一聚合的先前版本。
一旦價格變化已保存到Products表中,只更新了價格本身,而行的其余部分保持不變。然而,如圖5.1所示,這種方法導致了先前價格和更改背后的上下文的丟失。
為了保留不僅新價格還包括關鍵元數據(如調整原因)的信息,將更改記錄為Events表中的事件。先前的價格在先前事件中保持不變,以便在需要時檢索。
為了實現有效的事件溯源,建議使用提供強大一致性保證并使用樂觀并發控制的事件存儲。在實踐中,這意味著當多個修改同時發生時,只有初始修改才能附加到流中。隨后的修改可能需要重試或可能會失敗。