選Redis做MQ的人,是腦子里缺根弦兒嗎?
一、前情提示
上一篇文章:??《RocketMQ消息中間件用起來真的可靠嗎?》??,我們分析了ack機(jī)制的底層實(shí)現(xiàn)原理(delivery tag機(jī)制),還有消除處理失敗時(shí)的nack機(jī)制如何觸發(fā)消息重發(fā)。
通過這個(gè),已經(jīng)讓大家進(jìn)一步對消費(fèi)端保證數(shù)據(jù)不丟失的方案的理解更進(jìn)一層了。
這篇文章,我們將會(huì)對ack底層的delivery tag機(jī)制進(jìn)行更加深入的分析,讓大家理解的更加透徹一些。
面試時(shí),如果被問到消息中間件數(shù)據(jù)不丟失問題的時(shí)候,可以更深入到底層,給面試官進(jìn)行分析。
二、unack消息的積壓問題
首先,我們要給大家介紹一下RabbitMQ的prefetch count這個(gè)概念。
大家看過上篇文章之后應(yīng)該都知道了,對每個(gè)channel(其實(shí)對應(yīng)了一個(gè)消費(fèi)者服務(wù)實(shí)例,你大體可以這么來認(rèn)為),RabbitMQ投遞消息的時(shí)候,都是會(huì)帶上本次消息投遞的一個(gè)delivery tag的,唯一標(biāo)識(shí)一次消息投遞。
然后,我們進(jìn)行ack時(shí),也會(huì)帶上這個(gè)delivery tag,基于同一個(gè)channel進(jìn)行ack,ack消息里會(huì)帶上delivery tag讓RabbitMQ知道是對哪一次消息投遞進(jìn)行了ack,此時(shí)就可以對那條消息進(jìn)行刪除了。
大家先來看一張圖,幫助大家回憶一下這個(gè)delivery tag的概念。
所以大家可以考慮一下,對于每個(gè)channel而言(你就認(rèn)為是針對每個(gè)消費(fèi)者服務(wù)實(shí)例吧,比如一個(gè)倉儲(chǔ)服務(wù)實(shí)例),其實(shí)都有一些處于unack狀態(tài)的消息。
比如RabbitMQ正在投遞一條消息到channel,此時(shí)消息肯定是unack狀態(tài)吧?
然后倉儲(chǔ)服務(wù)接收到一條消息以后,要處理這條消息需要耗費(fèi)時(shí)間,此時(shí)消息肯定是unack狀態(tài)吧?
同時(shí),即使你執(zhí)行了ack之后,你要知道這個(gè)ack他默認(rèn)是異步執(zhí)行的,尤其如果你開啟了批量ack的話,更是有一個(gè)延遲時(shí)間才會(huì)ack的,此時(shí)消息也是unack吧?
那么大家考慮一下,RabbitMQ他能夠無限制的不停給你的消費(fèi)者服務(wù)實(shí)例推送消息嗎?
明顯是不能的,如果RabbitMQ給你的消費(fèi)者服務(wù)實(shí)例推送的消息過多過快,比如都有幾千條消息積壓在某個(gè)消費(fèi)者服務(wù)實(shí)例的內(nèi)存中。
那么此時(shí)這幾千條消息都是unack的狀態(tài),一直積壓著,是不是有可能會(huì)導(dǎo)致消費(fèi)者服務(wù)實(shí)例的內(nèi)存溢出?內(nèi)存消耗過大?甚至內(nèi)存泄露之類的問題產(chǎn)生?
所以說,RabbitMQ是必須要考慮一下消費(fèi)者服務(wù)的處理能力的。
大家看看下面的圖,感受一下如果消費(fèi)者服務(wù)實(shí)例的內(nèi)存中積壓消息過多,都是unack的狀態(tài),此時(shí)會(huì)怎么樣。
三、如何解決unack消息的積壓問題
正是因?yàn)檫@個(gè)原因,RabbitMQ基于一個(gè)prefetch count來控制這個(gè)unack message的數(shù)量。
你可以通過 “channel.basicQos(10)” 這個(gè)方法來設(shè)置當(dāng)前channel的prefetch count。
舉個(gè)例子,比如你要是設(shè)置為10的話,那么意味著當(dāng)前這個(gè)channel里,unack message的數(shù)量不能超過10個(gè),以此來避免消費(fèi)者服務(wù)實(shí)例積壓unack message過多。
這樣的話,就意味著RabbitMQ正在投遞到channel過程中的unack message,以及消費(fèi)者服務(wù)在處理中的unack message,以及異步ack之后還沒完成ack的unack message,所有這些message加起來,一個(gè)channel也不能超過10個(gè)。
如果你要簡單粗淺的理解的話,也大致可以理解為這個(gè)prefetch count就代表了一個(gè)消費(fèi)者服務(wù)同時(shí)最多可以獲取多少個(gè)message來處理。所以這里也點(diǎn)出了prefetch這個(gè)單詞的意思。
prefetch就是預(yù)抓取的意思,就意味著你的消費(fèi)者服務(wù)實(shí)例預(yù)抓取多少條message過來處理,但是最多只能同時(shí)處理這么多消息。
如果一個(gè)channel里的unack message超過了prefetch count指定的數(shù)量,此時(shí)RabbitMQ就會(huì)停止給這個(gè)channel投遞消息了,必須要等待已經(jīng)投遞過去的消息被ack了,此時(shí)才能繼續(xù)投遞下一個(gè)消息。
老規(guī)矩,給大家上一張圖,我們一起來看看這個(gè)東西是啥意思。
四、高并發(fā)場景下的內(nèi)存溢出問題
好!現(xiàn)在大家對ack機(jī)制底層的另外一個(gè)核心機(jī)制:prefetch機(jī)制也有了一個(gè)深刻的理解了。
此時(shí),咱們就應(yīng)該來考慮一個(gè)問題了。就是如何來設(shè)置這個(gè)prefetch count呢?這個(gè)東西設(shè)置的過大或者過小有什么影響呢?
其實(shí)大家理解了上面的圖就很好理解這個(gè)問題了。
假如說我們把prefetch count設(shè)置的很大,比如說3000,5000,甚至100000,就這樣特別大的值,那么此時(shí)會(huì)如何呢?
這個(gè)時(shí)候,在高并發(fā)大流量的場景下,可能就會(huì)導(dǎo)致消費(fèi)者服務(wù)的內(nèi)存被快速的消耗掉。
因?yàn)榧偃缯f現(xiàn)在MQ接收到的流量特別的大,每秒都上千條消息,而且此時(shí)你的消費(fèi)者服務(wù)的prefetch count還設(shè)置的特別大,就會(huì)導(dǎo)致可能一瞬間你的消費(fèi)者服務(wù)接收到了達(dá)到prefetch count指定數(shù)量的消息。
打個(gè)比方,比如一下子你的消費(fèi)者服務(wù)內(nèi)存里積壓了10萬條消息,都是unack的狀態(tài),反正你的prefetch count設(shè)置的是10萬。
那么對一個(gè)channel,RabbitMQ就會(huì)最多容忍10萬個(gè)unack狀態(tài)的消息,在高并發(fā)下也就最多可能積壓10萬條消息在消費(fèi)者服務(wù)的內(nèi)存里。
那么此時(shí)導(dǎo)致的結(jié)果,就是消費(fèi)者服務(wù)直接被擊垮了,內(nèi)存溢出,OOM,服務(wù)宕機(jī),然后大量unack的消息會(huì)被重新投遞給其他的消費(fèi)者服務(wù),此時(shí)其他消費(fèi)者服務(wù)一樣的情況,直接宕機(jī),最后造成雪崩效應(yīng)。
所有的消費(fèi)者服務(wù)因?yàn)榭覆蛔∵@么大的數(shù)據(jù)量,全部宕機(jī)。
大家來看看下面的圖,自己感受一下現(xiàn)場的氛圍。
五、低吞吐量問題
那么如果反過來呢,我們要是把prefetch count設(shè)置的很小會(huì)如何呢?
比如說我們把prefetch count設(shè)置為1?此時(shí)就必然會(huì)導(dǎo)致消費(fèi)者服務(wù)的吞吐量極低。因?yàn)槟慵词固幚硗暌粭l消息,執(zhí)行ack了也是異步的。
給你舉個(gè)例子,假如說你的prefetch count = 1,RabbitMQ最多投遞給你1條消息處于unack狀態(tài)。
此時(shí)比如你剛處理完這條消息,然后執(zhí)行了ack的那行代碼,結(jié)果不幸的是,ack需要異步執(zhí)行,也就是需要100ms之后才會(huì)讓RabbitMQ感知到。
那么100ms之后RabbitMQ感知到消息被ack了,此時(shí)才會(huì)投遞給你下一條消息!
這就尷尬了,在這100ms期間,你的消費(fèi)者服務(wù)是不是啥都沒干啊?
這不就直接導(dǎo)致了你的消費(fèi)者服務(wù)處理消息的吞吐量可能下降10倍,甚至百倍,千倍,都有這種可能!
大家看看下面的圖,感受一下低吞吐量的現(xiàn)場。
六、合理的設(shè)置prefetch count
所以鑒于上面兩種極端情況,RabbitMQ官方給出的建議是prefetch count一般設(shè)置在100~300之間。
也就是一個(gè)消費(fèi)者服務(wù)最多接收到100~300個(gè)message來處理,允許處于unack狀態(tài)。
這個(gè)狀態(tài)下可以兼顧吞吐量也很高,同時(shí)也不容易造成內(nèi)存溢出的問題。
但是其實(shí)在我們的實(shí)踐中,這個(gè)prefetch count大家完全是可以自己去壓測一下的。
比如說慢慢調(diào)節(jié)這個(gè)值,不斷加大,觀察高并發(fā)大流量之下,吞吐量是否越來越大,而且觀察消費(fèi)者服務(wù)的內(nèi)存消耗,會(huì)不會(huì)OOM、頻繁FullGC等問題。
七、階段性總結(jié)
其實(shí)通過最近幾篇文章,基本上已經(jīng)把消息中間件的消費(fèi)端如何保證數(shù)據(jù)不丟失這個(gè)問題剖析的較為深入和透徹了。
如果你是基于RabbitMQ來做消息中間件的話,消費(fèi)端的代碼里,必須考慮三個(gè)問題:手動(dòng)ack、處理失敗的nack、prefetch count的合理設(shè)置。
這三個(gè)問題背后涉及到了各種機(jī)制:
- 自動(dòng)ack機(jī)制
- delivery tag機(jī)制
- ack批量與異步提交機(jī)制
- 消息重發(fā)機(jī)制
- 手動(dòng)nack觸發(fā)消息重發(fā)機(jī)制
- prefetch count過大導(dǎo)致內(nèi)存溢出問題
- prefetch count過小導(dǎo)致吞吐量過低
這些底層機(jī)制和問題,咱們都一步步分析清楚了。
所以到現(xiàn)在,單論消費(fèi)端這塊的數(shù)據(jù)不丟失技術(shù)方案,相信大家在面試的時(shí)候就可以有一整套自己的理解和方案可以闡述了。