RabbitMQ非常實用技巧,動態調整消息并發處理能力
環境:SpringBoot2.7.16 + RabbitMQ3.8.35
1. 簡介
RabbitMQ 是一個開源的消息代理和隊列服務器,用于通過輕量級和可靠的消息傳遞,在服務器之間進行通信。在Spring Boot項目中我們一般都是通過@RabbitListener進行消息監聽。可以通過配置消息監聽器并發數來提高系統的消息處理能力。
在實際應用中,根據業務場景的不同,我們可能需要動態調整 RabbitMQ 消息監聽的并發數。例如,當RabbitMQ消息積壓過多時,這時候我們就可以考慮通過動態調整并發數,以提高消息處理速度;而在系統自身負載過高時,這時候可以考慮通過減少并發數來減輕系統的整體壓力。本篇文章將通過具體的示例來展示如何調整運行中消息監聽處理器的并發數。
注意:動態調整并發監聽數還可以幫助我們更好地控制系統的穩定性和可靠性。通過實時監測系統的負載情況和消息處理速度,我們可以及時發現潛在的問題并進行調整,從而確保系統的正常運行。
2. 實戰案例
2.1依賴管理
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 配置管理
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtualHost: test
publisherConfirmType: correlated
publisherReturns: true
listener:
simple:
# 手動應答
acknowledgeMode: manual
concurrency: 2
max-concurrency: 2
2.3 創建交換機及隊列
通過管理界面創建交換機及隊列。
- 交換機名:test.exchange類型為topic
- 隊列名: test
- 將交換機與隊列進行綁定路由key:akf.#
2.4 消息隊列準備消息
通過如下接口,先往隊列中插入100條消息
@Resource
private RabbitTemplate rabbitTemplate ;
@GetMapping("/send")
public String send() {
new Thread(() -> {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend("test.exchange", "akf.a", "message - " + i) ;
}
}).start() ;
return "success" ;
}
圖片
2.5 消息監聽器
@RabbitListener(queues = "test")
public void listener1(String message) {
System.out.printf("%s - 接收到消息:%s%n", Thread.currentThread().getName(), message) ;
try {
TimeUnit.SECONDS.sleep(2) ;
} catch (InterruptedException e) {}
}
2.6 測試
測試上面的消息監聽器是正常的
圖片
2.7 調整并發數
在上一步的測試中我們發現控制臺打印的始終是一個線程在執行消息處理。但是在一開始的配置文件中我們將concurrency屬性設置的為2,起碼這里應該是2個線程交替執行才對,這是為什么呢?
Spring監聽RabbitMQ的消息時默認并不是一條一條的從RabbitMQ中去,是一次預期一批數據,這一批消費完后才進行下一批的獲取,默認預期250條。而我們向隊列中存入的數據才100條,所以控制臺中你只能看到一個線程打印,因為你沒有足夠的消息供其它線程去獲取處理。我們可以通過如下配置進行預期數的設置:
spring:
rabbitmq:
listener:
simple:
prefetch: 5
重新啟動服務,測試如下
圖片
2個線程交替執行;接下來該如何實現動態調整并發數呢?
首先,修改消息監聽器配置
@RabbitListener(id = "test-queue", queues = "test", ackMode = "AUTO")
public void listener1(String message) {
// ...
}
id: 這里最好是設置唯一的id值,我們是要通過該id值來獲取當前隊列的消息監聽容器。ackMode: AUTO 這里設置的應答模式,用來覆蓋配置文件中的設置。
其次,通過RabbitListenerEndpointRegistry操作
@Resource
private RabbitListenerEndpointRegistry registry ;
@GetMapping("/modify/{count}")
public Object modify(@PathVariable("count") Integer count) {
// 這里通過id獲取對應的隊列監聽器;所以上面一定要定義唯一的id值
MessageListenerContainer listenerContainer = registry.getListenerContainer("test-queue") ;
if (listenerContainer instanceof SimpleMessageListenerContainer container) {
container.setConcurrentConsumers(count) ;
}
return String.format("并發接收消息:%d%n", count) ;
}
最后,測試。
首先將服務啟動,控制輸出如下(當前只有2個線程處理)
圖片
目前只有2個線程。
調用上面的接口修改并發數為3個后,控制臺輸出。
圖片
成功增加了一個消費者線程。
接下來再測試,如果修改的數量大于最大數(spring.rabbitmq.listener.simple.max-concurrency)
圖片
控制臺拋出如下異常。
圖片
不能超過最大數;再看看調小是否可以。
圖片
可以動態調小。
我們也可以對消息監聽器進行暫停消費和重新啟動消息監聽,這里就不在演示了,非常簡單調用相應start/stop即可。
總結:在 Spring Boot 中動態調整 RabbitMQ 消息監聽的并發數是一個重要的優化手段。通過合理設置并發數并根據系統負載情況進行動態調整,我們可以提高消息處理效率、節省系統資源、確保系統的穩定性和可靠性。在實際應用中,我們應該根據具體的業務場景和需求來選擇合適的并發數調整策略,以達到最佳的性能和效果。