工作中這樣用MQ,很香!
前言
消息隊列(MQ)是分布式系統中不可或缺的技術之一。
對很多小伙伴來說,剛接觸MQ時,可能覺得它只是個“傳話工具”,但用著用著,你會發現它簡直是系統的“潤滑劑”。
無論是解耦、削峰,還是異步任務處理,都離不開MQ的身影。
下面我結合實際場景,從簡單到復雜,逐一拆解MQ的10種經典使用方式,希望對你會有所幫助。
圖片
1. 異步處理:讓系統輕松一點
場景
小伙伴們是不是經常遇到這樣的情況:用戶提交一個操作,比如下單,然后要發送短信通知。
如果直接在主流程里調用短信接口,一旦短信服務響應慢,就會拖累整個操作。
用戶等得不耐煩,心態直接崩了。
解決方案
用MQ,把非關鍵流程抽出來異步處理。下單時,直接把“發短信”這件事丟給MQ,訂單服務就能立刻響應用戶,而短信的事情讓MQ和消費者去搞定。
示例代碼
// 訂單服務:生產者
Order order = createOrder(); // 訂單生成邏輯
rabbitTemplate.convertAndSend("order_exchange", "order_key", order);
System.out.println("訂單已生成,發短信任務交給MQ");
// 短信服務:消費者
@RabbitListener(queues = "sms_queue")
public void sendSms(Order order) {
System.out.println("發送短信,訂單ID:" + order.getId());
// 調用短信服務接口
}
深度解析
這種方式的好處是:主流程解耦,不受慢服務的拖累。訂單服務只管自己的事,短信服務掛了也沒關系,MQ會把消息暫存,等短信服務恢復后繼續處理。
2. 流量削峰:穩住系統別崩
場景
每年的“雙十一”電商大促,用戶秒殺商品時一窩蜂沖進來。
突然涌入的高并發請求,不僅會壓垮應用服務,還會直接讓數據庫“趴窩”。
解決方案
秒殺請求先寫入MQ,后端服務以穩定的速度從MQ中消費消息,處理訂單。
這樣既能避免系統被瞬時流量壓垮,還能提升處理的平穩性。
示例代碼
// 用戶提交秒殺請求:生產者
rabbitTemplate.convertAndSend("seckill_exchange", "seckill_key", userRequest);
System.out.println("用戶秒殺請求已進入隊列");
// 秒殺服務:消費者
@RabbitListener(queues = "seckill_queue")
public void processSeckill(UserRequest request) {
System.out.println("處理秒殺請求,用戶ID:" + request.getUserId());
// 執行秒殺邏輯
}
深度解析
MQ在這里相當于一個緩沖池,把瞬時流量均勻分布到一段時間內處理。系統穩定性提升,用戶體驗更好。
3. 服務解耦:減少相互牽制
場景
比如一個訂單系統需要通知庫存系統扣減庫存,還要通知支付系統完成扣款。
如果直接用同步接口調用,服務間的依賴性很強,一個服務掛了,整個鏈條都會被拖垮。
解決方案
訂單服務只負責把消息丟到MQ里,庫存服務和支付服務各自從MQ中消費消息。
這樣訂單服務不需要直接依賴它們。
示例代碼
// 訂單服務:生產者
rabbitTemplate.convertAndSend("order_exchange", "order_key", order);
System.out.println("訂單生成消息已發送");
// 庫存服務:消費者
@RabbitListener(queues = "stock_queue")
public void updateStock(Order order) {
System.out.println("扣減庫存,訂單ID:" + order.getId());
}
// 支付服務:消費者
@RabbitListener(queues = "payment_queue")
public void processPayment(Order order) {
System.out.println("處理支付,訂單ID:" + order.getId());
}
深度解析
通過MQ,各個服務之間可以實現松耦合。
即使庫存服務掛了,也不會影響訂單生成的流程,大幅提升系統的容錯能力。
4. 分布式事務:保證數據一致性
場景
訂單服務需要同時生成訂單和扣減庫存,這涉及兩個不同的數據庫操作。
如果一個成功一個失敗,就會導致數據不一致。
解決方案
通過MQ實現分布式事務。
訂單服務生成訂單后,將扣減庫存的任務交給MQ,最終實現數據的一致性。
示例代碼
// 訂單服務:生產者
rabbitTemplate.convertAndSend("order_exchange", "order_key", order);
System.out.println("訂單創建消息已發送");
// 庫存服務:消費者
@RabbitListener(queues = "stock_queue")
public void updateStock(Order order) {
System.out.println("更新庫存,訂單ID:" + order.getId());
// 執行扣減庫存邏輯
}
深度解析
通過“最終一致性”解決了分布式事務的難題,雖然短時間內可能有數據不一致,但最終狀態一定是正確的。
5. 廣播通知:一條消息,通知多個服務
場景
比如商品價格調整,庫存、搜索、推薦服務都需要同步更新。
如果每個服務都要單獨通知,工作量會很大。
解決方案
MQ的廣播模式(Fanout)可以讓多個消費者訂閱同一條消息,實現消息的“一發多收”。
示例代碼
// 生產者:廣播消息
rabbitTemplate.convertAndSend("price_update_exchange", "", priceUpdate);
System.out.println("商品價格更新消息已廣播");
// 消費者1:庫存服務
@RabbitListener(queues = "stock_queue")
public void updateStockPrice(PriceUpdate priceUpdate) {
System.out.println("庫存價格更新:" + priceUpdate.getProductId());
}
// 消費者2:搜索服務
@RabbitListener(queues = "search_queue")
public void updateSearchPrice(PriceUpdate priceUpdate) {
System.out.println("搜索價格更新:" + priceUpdate.getProductId());
}
深度解析
這種模式讓多個服務都能接收到同一條消息,擴展性非常強。
6. 日志收集:分布式日志集中化
場景
多個服務產生的日志需要統一存儲和分析。
如果直接寫數據庫,可能導致性能瓶頸。
解決方案
各服務將日志寫入MQ,日志分析系統從MQ中消費消息并統一處理。
示例代碼
// 服務端:生產者
rabbitTemplate.convertAndSend("log_exchange", "log_key", logEntry);
System.out.println("日志已發送");
// 日志分析服務:消費者
@RabbitListener(queues = "log_queue")
public void processLog(LogEntry log) {
System.out.println("日志處理:" + log.getMessage());
// 存儲或分析邏輯
}
7. 延遲任務:定時觸發操作
場景
用戶下單后,如果30分鐘內未支付,需要自動取消訂單。
解決方案
使用MQ的延遲隊列功能,設置消息延遲消費的時間。
示例代碼
// 生產者:發送延遲消息
rabbitTemplate.convertAndSend("delay_exchange", "delay_key", order, message -> {
message.getMessageProperties().setDelay(30 * 60 * 1000); // 延遲30分鐘
return message;
});
System.out.println("訂單取消任務已設置");
// 消費者:處理延遲消息
@RabbitListener(queues = "delay_queue")
public void cancelOrder(Order order) {
System.out.println("取消訂單:" + order.getId());
// 取消訂單邏輯
}
8. 數據同步:跨系統保持數據一致
場景
在一個分布式系統中,多個服務依賴同一份數據源。
例如,電商平臺的訂單狀態更新后,需要同步到緩存系統和推薦系統。
如果讓每個服務直接從數據庫拉取數據,會增加數據庫壓力,還可能出現延遲或不一致的問題。
解決方案
利用MQ進行數據同步。訂單服務更新訂單狀態后,將更新信息發送到MQ,緩存服務和推薦服務從MQ中消費消息并同步數據。
示例代碼
訂單服務:生產者
// 更新訂單狀態后,將消息發送到MQ
Order order = updateOrderStatus(orderId, "PAID"); // 更新訂單狀態為已支付
rabbitTemplate.convertAndSend("order_exchange", "order_status_key", order);
System.out.println("訂單狀態更新消息已發送:" + order.getId());
緩存服務:消費者
@RabbitListener(queues = "cache_update_queue")
public void updateCache(Order order) {
System.out.println("更新緩存,訂單ID:" + order.getId() + " 狀態:" + order.getStatus());
// 更新緩存邏輯
cacheService.update(order.getId(), order.getStatus());
}
推薦服務:消費者
@RabbitListener(queues = "recommendation_queue")
public void updateRecommendation(Order order) {
System.out.println("更新推薦系統,訂單ID:" + order.getId() + " 狀態:" + order.getStatus());
// 更新推薦服務邏輯
recommendationService.updateOrderStatus(order);
}
深度解析
通過MQ實現數據同步的好處是:
- 減輕數據庫壓力:避免多個服務同時查詢數據庫。
- 最終一致性:即使某個服務處理延遲,MQ也能保障消息不丟失,最終所有服務的數據狀態一致。
9. 分布式任務調度
場景
有些任務需要定時執行,比如每天凌晨清理過期訂單。
這些訂單可能分布在多個服務中,如果每個服務獨立運行定時任務,可能會出現重復處理或任務遺漏的問題。
解決方案
使用MQ統一分發調度任務,每個服務根據自身的業務需求,從MQ中消費任務并執行。
示例代碼
任務調度服務:生產者
// 定時任務生成器
@Scheduled(cron = "0 0 0 * * ?") // 每天凌晨觸發
public void generateTasks() {
List<Task> expiredTasks = taskService.getExpiredTasks();
for (Task task : expiredTasks) {
rabbitTemplate.convertAndSend("task_exchange", "task_routing_key", task);
System.out.println("任務已發送:" + task.getId());
}
}
訂單服務:消費者
@RabbitListener(queues = "order_task_queue")
public void processOrderTask(Task task) {
System.out.println("處理訂單任務:" + task.getId());
// 執行訂單清理邏輯
orderService.cleanExpiredOrder(task);
}
庫存服務:消費者
@RabbitListener(queues = "stock_task_queue")
public void processStockTask(Task task) {
System.out.println("處理庫存任務:" + task.getId());
// 執行庫存釋放邏輯
stockService.releaseStock(task);
}
深度解析
分布式任務調度可以解決:
- 重復執行:每個服務只處理自己隊列中的任務。
- 任務遺漏:MQ確保任務可靠傳遞,防止任務丟失。
10. 文件處理:異步執行大文件任務
場景
用戶上傳一個大文件后,需要對文件進行處理(如格式轉換、壓縮等)并存儲。
如果同步執行這些任務,前端頁面可能會一直加載,導致用戶體驗差。
解決方案
用戶上傳文件后,立即將任務寫入MQ,后臺異步處理文件,處理完成后通知用戶或更新狀態。
示例代碼
上傳服務:生產者
// 上傳文件后,將任務寫入MQ
FileTask fileTask = new FileTask();
fileTask.setFileId(fileId);
fileTask.setOperation("COMPRESS");
rabbitTemplate.convertAndSend("file_task_exchange", "file_task_key", fileTask);
System.out.println("文件處理任務已發送,文件ID:" + fileId);
文件處理服務:消費者
@RabbitListener(queues = "file_task_queue")
public void processFileTask(FileTask fileTask) {
System.out.println("處理文件任務:" + fileTask.getFileId() + " 操作:" + fileTask.getOperation());
// 模擬文件處理邏輯
if ("COMPRESS".equals(fileTask.getOperation())) {
fileService.compressFile(fileTask.getFileId());
} else if ("CONVERT".equals(fileTask.getOperation())) {
fileService.convertFileFormat(fileTask.getFileId());
}
// 更新任務狀態
taskService.updateTaskStatus(fileTask.getFileId(), "COMPLETED");
}
前端輪詢或回調通知
// 前端輪詢文件處理狀態
setInterval(() => {
fetch(`/file/status?fileId=${fileId}`)
.then(response => response.json())
.then(status => {
if (status === "COMPLETED") {
alert("文件處理完成!");
}
});
}, 5000);
深度解析
異步文件處理的優勢:
- 提升用戶體驗:主線程迅速返回,減少用戶等待時間。
- 后臺任務靈活擴展:支持多種操作邏輯,適應復雜文件處理需求。
總結
消息隊列不只是傳遞消息的工具,更是系統解耦、提升穩定性和擴展性的利器。
在這10種經典場景中,每一種都能解決特定的業務痛點。