成本低誤差小,攜程基于 Kafka 的 Serverless 延遲隊列的實踐
作者簡介
Pin,關注 RPC、Service Mesh、Serverless 等云原生技術。
一、背景
隨著上云項目的不斷推進,大量的應用需要部署到 aws 上,其中有很多應用都依賴延遲隊列的功能。而在 aws 上,我們選擇以 Kafka 作為消息隊列,但是 Kafka 本身不支持延遲隊列,這就需要思考如何基于 Kafka 來實現延遲隊列。
二、需求
統計了一下所有需要使用到延遲隊列的場景,有以下幾大特點:
- 延遲時間不固定。有的 topic 需要支持 5 分鐘的延遲,有的卻要求支持 7 天的延遲。
- 延遲消息數量小。所有的場景中涉及到的每天延遲消息的數量不超過 1 億條,每條消息的大小不超過 1MB。
- 延遲消息不能丟失,可以不保證有序。
- 延遲誤差小。延遲誤差是指實際消費消息的時間和希望消費消息之間的時間差值。根據統計的業務場景來看,要求延遲誤差在 2s 以內。
- 生產延遲消息的峰值比較高。很多情況下,業務會一次性創建 1000 萬條延遲消息,并且這些延遲消息的延遲時長都是一致的。
三、目標
由于實現延遲隊列的方式有很多,我們在滿足需求的前提下,制定了幾個目標:云上成本低、運維成本低、開發成本低、穩定性高和延遲誤差小。
四、產品選型
在 aws 上支持消息隊列的產品有 RabbitMQ、Apache ActiveMQ 和 SQS。其中 RabbitMQ 和 Apache ActiveMQ aws 主要是托管其安裝部署,并非是以 Serverless 的方式對外提供服務。另外,我們當前已經選擇使用 Kafka 作為消息隊列,若僅僅為了滿足延遲隊列的功能而去更換消息隊列,成本顯然是巨大的。
除此之外,aws 還提供了 SQS 來支持延遲隊列,雖然 SQS 是 Serverless 的,但是 SQS 有他自身的局限性:SQS 最多支持 15 分鐘以內的延遲,明顯無法滿足我們的需求。
可見,僅僅基于云上已有的產品已無法滿足我們的需求,基于這個原因,我們開始調研延時消息的實現方案,看看能否通過少量的開發來實現我們的需求。
五、方案調研
業界實現延時隊列功能的方案比較多,我們對其進行了簡單的分析,具體如下:
5.1 RabbitMQ
RabbitMQ 是基于 TTL+ 死信隊列的方式來實現的。具體來說,通過設置消息的 TTL,當達到 TTL 時消息還沒有被消費,此時會投遞到死信隊列。TTL 分兩種:
- Queue 級別的 TTL:所有消息統一的 TTL
- Message 級別的 TTL:每條消息可以是不同的 TTL,但是存在隊頭阻塞問題
該方案的優點是實現簡單,但是延遲誤差不確定。
5.2 Apache ActiveMQ
Apache ActiveMQ 是基于定時調度的方式來實現的。具體來說,配置延遲時間或者 cron 表達式表示消息的投遞策略,基于 Java 的 Timer 實現,將消息分級存儲在文件和內存中。
該方案的優點是實現簡單,延遲誤差可控,但是可能會占用大量內存。
5.3 RocketMQ
RocketMQ 是基于定時調度+延遲等級的方式來實現的。具體來說,將延時消息發送到指定的延時等級隊列(一共有 18 個等級),然后通過一個定時器進行輪詢這些 ConsumeQueue 實現延時的效果。具體實現如下:
- 修改消息 topic 名稱和隊列信息投遞到對應等級的延時消息的 ConsumeQueue 中
- ScheduleMessageService消費ConsumeQueue中的消息再重新投遞到 CommitLog 中
- 將 CommitLog 中的消息投遞到目標 topic 中,消費者消費目標 topic 中的消息
該方案的優點是延遲誤差可控,但是實現復雜。
5.4 Redis
基于 Redis 實現延遲隊列的方式有很多,在這里簡單描述兩種:
1)定時輪詢
該方案的大致步驟如下:
- 將消息的延時時間戳作為 zset 的 key,消息的 ID 作為 zset 的 value
- 消息 ID 作為 key,消息體序列化成 String 作為 value 存儲在 Redis 中
- 定時輪詢 zset,大于當前時間則投遞到 Redis 的 List 中供消費者消費
2)Key 過期監聽器
每條消息設置一個過期時間,監聽過期事件然后將消息投遞到 target topic。
基于 Redis 實現延時隊列的優點是實現簡單,但是都可能存在丟消息的情況,并且存儲成本高。
六、實現方案
既然使用單一的云上產品不能滿足我們的需求,那就只能考慮通過少量的開發并結合云上產品的特性來實現基于 Kafka 的延遲隊列的功能。具體的實現方案有如下幾種:
6.1 RabbitMQ 或 Apache ActiveMQ
RabbitMQ 或者 Apache ActiveMQ 都是 aws 上支持的產品,從功能層面來看是可以滿足需求。當前的消息隊列是基于 Kafka 實現的,如果再結合 RabbitMQ 或者 Apache ActiveMQ 來實現延遲隊列的功能,主要面臨的問題是:缺少對 RabbitMQ 或者 Apache ActiveMQ 相關的技術儲備,由于 aws 上對 RabbitMQ 或者 Apache ActiveMQ 僅僅只是部署層面的托管,當出現問題時,是需要有研發人員自己去 troubleshooting 的。所以,該方案就不考慮了。
6.2 基于 SQS 的多級隊列
既然 SQS 已經支持 15 分鐘內的延時隊列,那么如果要實現更長時間的延遲隊列是不是可以考慮通過多級延遲隊列來實現?具體實現方案如下:
- 在延遲消息中增加一個字段 times 用來表示當前是第幾輪(借鑒時間輪算法的思路)。
- 如果延遲消息的延遲時間小于 15 分鐘,將延遲消息的 times 設置為 0,直接投遞到 SQS 中。
- 如果延遲消息的延遲時間大于 15 分鐘,計算一下 times 的值(延遲時間/15 分鐘),然后直接投遞到 SQS 中。
- 如果 Consumer 從 SQS 中消費到了一個延遲消息且 times 大于 0,則將 times 的值減去 1,再次投遞到 SQS 中。如此反復,直到 times 為 0。
- 如果 Consumer 從 SQS 中消費到了一個延遲消息且 times 為 0,則表示該消息已經達到了延遲時間,則 Consumer 會直接將該消息投遞到對應的目標 topic。
這種方案雖然能夠實現延遲隊列的功能,且 SQS 本身也是 Serverless 的,維護成本也比較低。
但是我們調研了一下 SQS 的計費標準發現,SQS 主要是根據消息數量來收費的。這樣一來,如果延遲時間越長,消息數量會被放大的越嚴重。而我們實際業務中延遲時間在 15 分鐘以內的沒有,一般是 1 小時到 7 天,所以這種方案不可行。
6.3 基于 SQS 和定時調度策略
使用基于 SQS 的多級隊列的方式最大的問題是云上的成本問題,更具體一點是云上的存儲成本問題。因為該方案將所有的延遲消息都存儲在 SQS 中,這是導致費用增加的最主要原因。既然如此,那我們是不是可以考慮將大于 15 分鐘延遲時間的消息寫入到一個成本低的存儲上,然后在時間延遲時間小于 15 分鐘的時候將其查詢出來投遞到 SQS 中即可。這樣一來,延遲時間的長短不會對 SQS 的費用有影響,僅僅只需要考慮如何選擇一個存儲成本低、讀寫方便的 Serverless 產品作為延遲消息的存儲即可。
基于這一思路,設計了一個基于 SQS 和定時調度策略的實現方案:
具體流程如下:
- 生產者 Producers 生產的正常消息直接投遞到 Kafka 的目標 topic,如果是延遲消息投遞到 Kafka 的一個延遲消息的 Delay Message Topic 中。
- Consumer 消費 Delay Message Topic 中的消息,如果該消息的延遲時間小于 15 分鐘,直接投遞到 SQS(Delay Queue)中。如果消息的延遲時間大于 15 分鐘,直接將消息寫入到 Message Store 中。
- Scheduler 會定時掃描 Message Store 中的消息,如果發現延遲時間小于 15 分鐘,則直接投遞到 SQS(Delay Queue)中,Scheculer 是通過 Event Bridge 來觸發的。
- Emitter 會消費 SQS(Delay Queue)中的消息,并將該消息投遞到目標 topic 中。
整個流程不算復雜,里面涉及到的 aws 服務都是 Serverless 的,但是涉及的服務太多以后 troubleshooting 就會比較復雜。
基于以上問題,我們對該方案的實進行了改進和簡化,具體如下:
具體流程如下:
- 生產者 Producers 生產的正常消息直接投遞到 Kafka 的目標 topic,如果是延遲消息投遞到 Kafka 的一個延遲消息的 Delay Message Topic 中。
- Service 消費 Delay Message Topic 中的消息,如果該消息的延遲時間小于 15 分鐘,直接投遞到SQS(Delay Queue)中。如果消息的延遲時間大于 15 分鐘,直接將消息寫入到 Message Store 中。
- Service 會定時掃描 Message Store 中的消息,如果發現延遲時間小于 15 分鐘,則直接投遞到 SQS(Delay Queue)中。
- Service 會消費 SQS(Delay Queue)中的消息,并將該消息投遞到目標 topic 中。
簡化后的方案將 Consumer、Emitter 和 Scheduler 的邏輯都集中在 Service 這個服務中,Service 服務是集群部署的,這種方案所有的邏輯都在 Service 這個服務中,在 troubleshooting 時相對來說要方便一些。整體實現方案的大方向確定好以后,還需要細化以下幾個問題:
1)消息如何存儲
我們可以看到 Message Store的主要功能是存儲延遲時間大于 15 分鐘的延遲消息, 并供 Scheduler 進行查詢,查詢的時候是根據時間來查詢的。支持 Serverless 方式存儲的服務也比較多,經過調研最后選擇 DynamoDB。
DynamoDB 中的 partition key 是延遲時間,sorted key 選擇 message id,這樣可以保證通過 partition key 和 sorted key 能夠唯一定位到一條消息,不會出現沖突。同時,在查詢的時候只需要根據 partition key 就可以查詢出該時間片段內的所有消息,也不會出現熱點或者 partition 不均勻的問題。
假設 partition key 為 1677400776(是 2023-02-26 16:39:35 的時間戳,精確到秒),則該 partition key 中對應的所有消息都是延遲時間從 2023-02-26 16:39:35 到 2023-02-26 16:39:36 之間的消息。因為每個消息都有唯一的 message id,所以將 sorted key 設置為 message id 就不會導致消息沖突的問題。Scheduler 在查詢的時候只需要傳入需要查詢的時間戳就可以拉取該時間段內所有的消息,如果沒有查詢到,則表示該時間段內沒有延遲消息。
同時,對于 DynamoDB 中的消息也設置了 TTL 用來自動刪除數據的,設置的 TTL 時間比延遲時間大 24 小時,主要是方便 troubleshooting 的。當 DynamoDB 中的延遲消息被投遞到 SQS 以后,會調用 API 去刪除該消息。DynamoDB 中消息的數據結構還包括 topic、消息體等信息。
2)單點問題
單點問題主要是因為對于存儲在 DynomaDB 中大于 15 分鐘的延遲消息進行掃描的時候,接收到掃描通知的 Scheduler 出現了問題,則該時間段的消息沒有被投遞到 SQS中,從而導致消息丟失。現在 Scheduler 的功能都集成在 Service 服務中,而 Service 服務是集群部署,所以 Scheduler 不存在單點的問題。
但是需要解決另外一個問題:如何保證集群中只有一個 Scheduler 掃描 DynamoDB 中的數據,并且當 Scheduler 出現了問題以后,集群中其他 Scheduler 也可以繼續接著執行?
為了解決這個問題:我們使用了 SQS 的 FIFO 隊列。SQS 支持兩種隊列,一種是 Standard 對列,一種是 FIFO 隊列。FIFO 隊列可以嚴格保證消息的有序,同時支持消息的可見性,也就是說在一段時間內該消息只能有一個消費者可見,其他消費者無法訪問。同時,SQS 的 FIFO 隊列還支持去重的功能。基于 SQS 的 FIFI 隊列的這些特性,解決單點問題就比較容易了。具體實現方案如下:
- 在 Service 服務中啟動一個 Timer 定時向 SQS 的 FIFO 隊列投遞通知消息,一分鐘投遞一次。通知消息的消息體是當前時間的時間戳,精度到分鐘。這樣即使有 n 個 Timer 在同一分鐘內向 SQS 的 FIFO 隊列投遞 n 次消息,也只會有一條消息被成功投遞到 SQS 的 FIFO 隊列中,n-1 條消息被 SQS 的 FIFO 隊列的去重功能過濾掉了。
- 投遞到 SQS 的 FIFO 隊列中的可見性設置為 5分鐘(可以配置)。可以保證在 5 分鐘內只有一個 Scheduler 可以消費到通知消息,如果該 Scheduler 出現了故障,后續的其他 Scheduler 也可以接著繼續消費。當 Scheduler 消費到通知消息時,會根據消息內容轉換成時間戳,并在 DynamoDB 中查詢這一時間戳范圍內的所有消息,修改消息的延遲時間,投遞到 SQS 的 Standard 隊列中,最后刪除 SQS 的 FIFO 隊列中的這一條通知消息。
基于上面的方案,能夠很好的解決單點問題。
3)消息丟失問題
因為 Timer 和 Schduler 都在 Service 服務中,都是集群問題,不存在單點問題。并且,SQS 的 FIFO 隊列能夠保證消息嚴格有序,所以不存在消息丟失的問題。唯一可能存在的問題是,因為消息量大積壓導致的消息延遲過長。
4)如何查詢延遲消息
Scheduler 查詢的消息要滿足該消息的延遲時間小于 15 分鐘,所以在接收到通知消息并轉換成對應的時間戳以后,查詢當前時間戳 +14 分鐘(延遲消息不能超過 15 分鐘)的消息即可。
5)如何部署 Service 服務
對于 Service 服務,我們采用了 ECS+Fargate 的方式來部署。整個代碼的部署都是通過 Terraform 腳本來創建 Code Pipeline、DynamoDB、SQS 和 ECS 等資源實現的,所有的資源都是通過代碼來實現的,整個部署方案的設計全部都是基于 gitOps 的思想。
經過多以上方案的綜合評估,最后我們選擇基于 SQS 和定時調度策略的方案來實現延遲消息。
6.4 性能優化
以上方案在實踐的過程中,做了很多優化,大致可以歸納成以下幾點:
1)消息積壓
由于需要處理的延遲消息會因為消費能力不足的情況導致消息積壓的問題。優化這一問題主要從以下幾個方面入手:
- Delay Message Topic 的 partition 設置成 64 個。提高 Kafka 消費的消費能力可以通過增加 consumer 來實現,但是前提是要保證 partition 的數量大于等于 consumer 的數量。
- 降低 Service 的服務配置,增加 Service 服務的副本數。Service 集群消費 Delay Message Topic 中的消息,副本數越多,消費能力越強。
2)DynamoDB 中 WCU 和 RCU
DynamoDB 的費用有很大一部分是通過 WCU 和 RCU 來統計的。WCU 是指單位時間內消息寫入的數量,RCU 是指單位時間內消息讀取的數量。如果單位時間內寫入消息的數量超過了 WCU 的限制會導致消息寫入失敗,同理也會導致讀取消息失敗。
如果將 WCU 和 RCU 都設置成峰值肯定不會導致讀寫失敗的問題,但是會產生巨大的成本浪費。為此,我們將 WCU 和 RCU 設置成動態擴縮容的方式。在擴容期間如果產生失敗,則進行重試。經過相關參數的優化,現在已經可以達到一個最佳現狀。
3)ECS 擴縮容設置
ECS 中最小的運行單元是 task,對于每一個 task 要求擴容要快,縮容要緩慢。task 快速擴容遇到的最大的問題是,拉起 Service 的耗時比較長。對于 Service 服務我們采用 golang 來實現,擴一個 task 能夠基本上可以在 8s 內完成。擴縮容是基于 CPU 的使用峰值來設置的,每次擴容會擴 4 個 task,每次所容會縮 1 個 task。
4)消息平滑處理
由于寫入 Delay Message Topic 中的消息峰值可能會比較大,如果快速消費這些消息,會導致后續對 DynamoDB 的讀寫壓力比較大。因此,在消費 Kafka 的 Delay Message Topic 中的消息時,會將控制每個 Service 消費消息的數量。盡管有多個 Service 會同時消費,但是對于單個 Service 來說,寫入消息的數量較少,對 DynamoDB 來說,每一次的寫入比較平穩,并非一次性寫入大量的數據,從而寫入失敗的概率會小很多。
6.5 實踐效果
目前已經在生產環境穩定運行了 6 個月,各項指標都比較健康,拉取了最近 4 周的數據。
1)延遲消息成功率
如上圖所示,延遲誤差在 2 秒以內的延遲消息成功率基本上是 100%。
2)延遲消息的數量
如果上圖所示,延遲消息在 5 分鐘內的峰值達到 15 萬,也就是峰值每秒處理 500 個延遲消息。
3)DynamoDB 性能指標
從 PutItem ThrottledRequests 這個指標可以看出,通過 DynamoDB 寫入消息沒有發生寫入失敗的情況。從 QueryThrottledRequests 這個指標可以看出,通過 DynamoDB 查詢消息也沒有發生查詢失敗的情況。從 QueryReturnedItemCount 指標可以看出,延遲消息的峰值是 5 分鐘內 3350 條,每秒低于 60 條。這是因為我們在 Service 中對寫入消息進行了緩沖,從而降低了并發讀寫壓力。
4)Kafka 消息積壓
如上圖所示,Kafka 在 5 分鐘內消息積壓的峰值是 6 萬,積壓的消息都能很快被消費掉。
5)Timer 性能指標
Timer 會每分鐘向 SQS 的 FIFO 隊列中投遞一個消息,消息的數量與 Service 的副本數相同。從上圖可以看出,5 分鐘內最多投遞了 300 個消息(因為 Service 的副本數最大為 64)。但是最后接收的消息是5分鐘內僅僅接收了 5 個消息,也就是 1 分鐘接收 1 條消息。
七、總結
由于該實現方案完全是基于 Serverless 的方式實現的,所以維護成本非常低。盡管開發起來有些復雜,但這是一次性的成本投入。從近幾個月的數據來看,云上的使用成本大約每個月不超過 200 美元,誤差延遲比較小,到目前為止整體運行起來比較穩定。