在 .NET 中實現發件箱模式的實戰
我們在分布式系統中經常面臨保持數據庫和外部系統同步的挑戰。想象一下,將訂單保存在數據庫中,然后將消息發布到消息代理。任何一個操作失敗,系統都將處于不一致的狀態。
發件箱模式通過將消息發布視為數據庫事務的一部分來解決此問題。我們并不直接發布消息,而是將消息保存到數據庫中的發件箱表中,以確保原子操作,然后通過單獨進程可靠的發布消息。
本文將深入探討如何在 .NET 中實現這種模式。
為什么需要發件箱模式?
事務性發件箱模式修復了分布式系統中的一個常見問題。當我們需要同時做兩件事時,就會出現這個問題:保存數據并與外部組件通信。
考慮這樣的場景:發送訂單確認郵件,通知其他系統有關新客戶注冊的信息,或在下訂單后更新庫存水平。每一種操作都涉及本地數據變更以及外部數據通信或更新。
例如,想象某個微服務需要:
- 在數據庫中保存新訂單
- 告訴其他系統這個新訂單
如果其中某個步驟失敗,系統可能最終處于不一致狀態。也許訂單被保存了,但是沒有其他人知道?;蛘呙總€人都認為有新訂單,但數據庫里卻沒有。
下面是一個沒有發件箱模式的 CreateOrderCommandHandler:
public class CreateOrderCommandHandler(
IOrderRepository orderRepository,
IProductInventoryChecker inventoryChecker,
IUnitOfWork unitOfWork,
IEventBus eventBus) : IRequestHandler<CreateOrderCommand, OrderDto>
{
public async Task<OrderDto> Handle(CreateOrderCommand request, CancellationToken cancellationToken)
{
var order = new Order(request.CustomerId, request.ProductId, request.Quantity, inventoryChecker);
await orderRepository.AddAsync(order);
await unitOfWork.CommitAsync(cancellationToken);
// 數據庫事務已完成
await eventBus.Send(new OrderCreatedIntegrationEvent(order.Id));
returnnew OrderDto { Id = order.Id, Total = order.Total };
}
}
這段代碼有潛在的一致性問題。在提交數據庫事務之后,有兩件事可能出錯:
- 應用程序可能在事務提交之后、事件發送之前崩潰。因此數據庫會創建訂單,但其他系統不會知道。
- 當我們嘗試發送事件時,事件總線可能已關閉或無法訪問。這將導致在沒有通知其他系統的情況下創建訂單。
事務性發件箱模式通過確保將數據庫更新和事件發布視為單個原子操作來幫助解決此問題。
流程圖說明了發件箱模式如何解決一致性挑戰。我們沒有嘗試分別保存數據和發送消息,而是將訂單和發件箱消息保存在單個數據庫事務中。這是一個全部成功或全部失敗的操作,該操作不能以不一致的狀態結束。
單獨的發件箱進程處理實際的消息發送。它持續檢查發件箱表中未發送的消息,并將其發布到消息隊列中。進程在成功發布消息后將消息標記為已發送,從而避免重復發送。
需要注意,發件箱模式提供了至少一次的交付。發件箱消息將至少發送一次,但也可以多次發送,用于重試。這意味著必須實現冪等的消息消費。
發件箱模式實現
首先,創建發件箱表,將在其中存儲消息:
CREATE TABLE outbox_messages (
idUUID PRIMARY KEY,
typeVARCHAR(255) NOTNULL,
content JSONB NOTNULL,
occurred_on_utc TIMESTAMPWITHTIME ZONE NOTNULL,
processed_on_utc TIMESTAMPWITHTIME ZONE NULL,
errorTEXTNULL
);
-- 因為將會經常查詢未處理的消息,因此可以考慮添加索引
-- 它將數據行按照我們查詢需要的正確順序排序。
CREATEINDEXIFNOTEXISTS idx_outbox_messages_unprocessed
ON outbox_messages (occurred_on_utc, processed_on_utc)
INCLUDE (id, type, content)
WHERE processed_on_utc ISNULL;
我用 PostgreSQL 作為示例數據庫。注意 content 列類型為 jsonb。如果將來需要,可以對 JSON 數據進行索引和查詢。
現在,我們創建一個表示發件箱條目的類:
public sealed class OutboxMessage
{
public Guid Id { get; init; }
public string Type { get; init; }
public string Content { get; init; }
public DateTime OccurredOnUtc { get; init; }
public DateTime? ProcessedOnUtc { get; init; }
public string? Error { get; init; }
}
下面將消息添加到發件箱:
public async Task AddToOutbox<T>(T message, NpgsqlDataSource dataSource)
{
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
OccurredOnUtc = DateTime.UtcNow,
Type = typeof(T).FullName, // 通過這種方式實現反序列化
Content = JsonSerializer.Serialize(message)
};
awaitusingvar connection = await dataSource.OpenConnectionAsync();
await connection.ExecuteAsync(
@"""
INSERT INTO outbox_messages (id, occurred_on_utc, type, content)
VALUES (@Id, @OccurredOnUtc, @Type, @Content::jsonb)
""",
outboxMessage);
}
一種優雅的實現方法是使用域事件來表示通知。當域中發生重大事件時,將觸發域事件。在完成事務之前,可以獲取所有事件并存儲為發件箱消息。我們可以通過工作單元或EF Core攔截器執行此操作。
發件箱進程
另一個組件是發件箱進程,可以是物理上獨立的進程,也可以是同一進程中的后臺工作線程。
我用 Quartz 來調度處理發件箱的后臺作業,這是一個健壯的庫,對調度循環作業提供了出色的支持。
我們來實現 OutboxProcessorJob:
[DisallowConcurrentExecution]
public class OutboxProcessorJob(
NpgsqlDataSource dataSource,
IPublishEndpoint publishEndpoint,
Assembly integrationEventsAssembly) : IJob
{
public async Task Execute(IJobExecutionContext context)
{
awaitusingvar connection = await dataSource.OpenConnectionAsync();
awaitusingvar transaction = await connection.BeginTransactionAsync();
// You can make the limit a parameter, to control the batch size.
// We can also select just the id, type, and content columns.
var messages = await connection.QueryAsync<OutboxMessage>(
@"""
SELECT id AS Id, type AS Type, content AS Content
FROM outbox_messages
WHERE processed_on_utc IS NULL
ORDER BY occurred_on_utc LIMIT 100
""",
transaction: transaction);
foreach (var message in messages)
{
try
{
var messageType = integrationEventsAssembly.GetType(message.Type);
var deserializedMessage = JsonSerializer.Deserialize(message.Content, messageType);
// We should introduce retries here to improve reliability.
await publishEndpoint.Publish(deserializedMessage);
await connection.ExecuteAsync(
@"""
UPDATE outbox_messages
SET processed_on_utc = @ProcessedOnUtc
WHERE id = @Id
""",
new { ProcessedOnUtc = DateTime.UtcNow, message.Id },
transaction: transaction);
}
catch (Exception ex)
{
// We can also introduce error logging here.
await connection.ExecuteAsync(
@"""
UPDATE outbox_messages
SET processed_on_utc = @ProcessedOnUtc, error = @Error
WHERE id = @Id
""",
new { ProcessedOnUtc = DateTime.UtcNow, Error = ex.ToString(), message.Id },
transaction: transaction);
}
}
await transaction.CommitAsync();
}
}
這種方法使用輪詢定期從數據庫獲取未處理的消息。因為需要頻繁查詢未處理消息,因此輪詢會增加數據庫負載。
處理發件箱消息的另一種方法是使用事務日志跟蹤,可以通過 Postgres邏輯復制來實現。數據庫把更改從預寫日志(Write-Ahead Log, WAL)流式傳輸到應用程序,然后處理這些消息并發布到消息代理。通過這種方式可以實現基于推送的發件箱處理進程。
權衡利弊
發件箱模式雖然有效,但引入了額外復雜性和數據庫寫入。在高吞吐量系統中,很重要的一點是需要監控性能以確保其不會成為瓶頸。
建議在發件箱處理進程中實現重試機制,以提高可靠性??紤]對瞬態故障使用指數回退,對持久性問題使用斷路器,以防止系統在中斷期間過載。
非常重要的一點是需要實現消息的冪等消費。網絡問題或處理器重啟可能導致多次傳遞同一消息,因此使用者必須安全的處理重復消息。
隨著時間推移,發件箱表可能會顯著增長,從而影響數據庫性能。盡早實現存檔策略是很重要的一點,可以考慮將處理過的消息移動到冷存儲或在一段時間后刪除。
擴展發件箱處理進程
隨著系統增長,可能單個發件箱處理進程無法跟上消息數量的增長,從而導致發生錯誤以及增加處理延遲。
一種直接的方法是增加發件箱處理作業的頻率,考慮每隔幾秒鐘運行一次,可以顯著減少消息處理中的延遲。
另一種有效的策略是在獲取未處理消息時增加批處理大小。通過在每次運行中處理更多消息,可以提高吞吐量。但是,要小心不要使批處理太大,以免導致長時間運行的事務。
對于大容量系統,發件箱的并行處理可能非常有效。實現鎖定機制以聲明消息批次,從而允許多個處理進程同時工作而不發生沖突。可以 SELECT…FOR UPDATE SKIP LOCKED 聲明一批消息。這種方法可以顯著提高處理能力。
總結
發件箱模式是維護分布式系統數據一致性的強大工具。通過將數據庫操作與消息發布分離,發件箱模式可確保系統即使在出現故障時也保持可靠。
記住保持消費者冪等,實現適當的擴容策略,并管理好發件箱表的增長。
雖然增加了一些復雜性,但保證消息傳遞的好處使其成為許多場景中有價值的模式。