記一次 Golang 踩坑 RabbitMQ
大家好,我是Z哥。
最近在項目中遇到了一個使用 RabbitMQ 時的問題,這個問題我覺得還是有一定普適性的,和大家分享一下,避免大家后續在同一個問題上犯錯。
消息隊列(MQ)是在軟件開發中很常用的中間件,如果一個程序需要協調另一個程序進行數據的“write”操作,并且不關心“write”的結果,則便會選擇它。它是一個保存消息(數據)的容器,由它來確保消息一定被送達到目標程序。
打個比喻來說,消息隊列就是一個郵差,它負責將信件(消息)從源頭送往目的地,并且根據信件重要性的不同,提供當面簽收確認或者直接投放兩種服務。
RabbitMQ 就是一個典型的消息隊列,以 AMQP 為標準。歷史也比較悠久,大概是從 2007 年研發出來的,用的編程語言Erlang也同樣具有年代感。
需要簡單介紹一下 Erlang 的特點,它對我們理解 RabbitMQ 有很大的幫助。
Erlang 是一種運行于“虛擬機”(類似 JVM)的解釋性語言。是一個結構化,動態類型編程語言,內建并行計算支持。使用 Erlang 編寫出的應用運行時通常由成千上萬個輕量級“進程”(并非傳統意義上的進程)組成,并通過消息傳遞相互通訊。進程間上下文切換對于 Erlang 來說僅僅 只是一兩個環節,比起 C 程序的線程切換要高效得多得多了。
基于百度百科資料進行整理
不管是什么 MQ 中間件,作為消息的生產方和消費方都需要和 MQ 的服務端建立連接進行通訊。
一般這個連接都會使用 TCP 協議,在 RabbitMQ 里也不例外。大多數 RabbitMQ 的 SDK 都會將連接封裝為一個「Connection」對象。
還沒完,大多數的 MQ 中間件還會在「Connection」的基礎上增加一個「Channel」的概念,以通過復用的方式提高 TCP 連接的利用率,因為建立和銷毀 TCP 連接是非常昂貴的開銷。在 RabbitMQ 中的復用 TCP 連接方式是「Non-blocking I/O」的模式。
關于NIO,「Non-blocking I/O」的概念,有感興趣的話可以跳轉去看之前寫的這篇文章。(分布式系統關注點——阻塞與非阻塞有什么區別?)
多說一句,任何方案都不是“銀彈”。當每個 Channel 的流量不是很大時,復用單一的 Connection 可以在產生性能瓶頸的情況下有效地節省 TCP 連接資源。但是 Channel 本身的流量很大時,這時候多個 Channel 復用一個 Connection 就會產生性能瓶頸,進而使整體的流量被限制了。此時就需要開辟多個 Connection,將這些 Channel 均攤到這些 Connection 中,至于哪些 Channel 使用那個 Connection 以及Connection 與 Channel 之間的數量關系是多少,需要根據業務自身的實際情況進行調節。
Channel 在 AMQP 中是一個很重要的概念,大多數操作都是在信道這個層面展開的。比如, channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish、channel.basicConsume 等方法。RabbitMQ 相關的 API 與 AMQP 緊密相連,比如 channel.basicPublish 對應 AMQP 的 Basic.Publish 命令。
可能你要問了,Channel 是不是也能像 Connection 一樣被復用?這是個好問題,也是我們這次遇到問題的關鍵點。
結論是:可以,但是需要自己保證客戶端對 Channel 訪問的線程安全問題,因為在 Channel 的另一端,在 RabbitMQ 的服務端,每個 Channel 由一個單獨的“進程”所管理,如果由于多線程復用Channel 導致數據幀亂序了,RabbitMQ 的服務端會主動關閉整個 Connection 。
因此,我們這次犯的錯誤就是多線程復用了同一個 Channel 導致的問題。所以,如果你也用到 streadway/amqp 這個庫的話,需要特別注意這點。
不過,不同語言的SDK內部實現不同,我們分別使用 Golang 的 AMQP 庫 streadway/amqp,和 RabbitMQ 官方提供的 C# 版本的庫分別模擬過同樣的場景,前者出現問題,后者卻沒有問題。
受限于時間原因,沒有具體去核實 C# 庫的源碼,主觀猜測是 C# 庫內部多做了一些對于單個 Channel 的線程安全處理。
最后,我整理了三點使用 streadway/amqp 庫的最佳實踐,你可以看看:
01
golang 中使用 streadway/amqp 時,需要保證每一個線程單獨一個 Channel。
streadway/amqp 庫中的獲取一個 Channel 的方法「Connection.channel()」是線程安全的。但是內部有一個 defaultChannelMax 的參數對 Channel 的數量進行了限制,默認是 (2 << 10) - 1,2047。這個需要注意:
02
我們可以通過調用 amqp.DialConfig(url string, config Config) 來調整個限制。
但是,并不是你調整了多少就是多少,還需要和 RabbitMQ 服務端的配置進行 min() 函數的處理,最終為兩者的最小值。
Tips:特別是用云廠商的 MQ 產品,因為階梯收費的原因會對很多性能參數做限制,需要格外關注這點,比如某版本的阿里云 RabbitMQ 實例限制是單個 Connection 最多 64 個 Channel)
03
正如前面對 Erlang 的簡單介紹,Erlang 是一個天然支持多“進程”設計的語言,所以在 RabbitMQ 的服務端設計中,每一個 Queue,每一個 Connection 都是單獨的一個“進程”。因此如果你想盡可能地壓榨 RabbitMQ 性能,可以通過建立更多的 Connection 或者創建更多的 Queue 來實現,當然需要注意到 Connection 的創建和銷毀的性能開銷問題。