SpringBoot 集成 RocketMQ:異步消息隊(duì)列實(shí)戰(zhàn),讓系統(tǒng)飛起來(lái)!
作者:fareboy
本文將手把手教你如何在 SpringBoot 中集成 RocketMQ,實(shí)現(xiàn)完整的異步消息處理流程!
引言:為什么需要異步消息隊(duì)列?
在現(xiàn)代高并發(fā)系統(tǒng)中,解耦服務(wù)、削峰填谷、異步處理已成為架構(gòu)設(shè)計(jì)的核心需求。RocketMQ 作為阿里巴巴開(kāi)源的分布式消息中間件,憑借其高吞吐、低延遲、高可用的特性,成為企業(yè)級(jí)應(yīng)用的首選解決方案。
本文將手把手教你如何在 SpringBoot 中集成 RocketMQ,實(shí)現(xiàn)完整的異步消息處理流程!
一、環(huán)境準(zhǔn)備與項(xiàng)目搭建
1. 技術(shù)棧
- JDK 1.8+
- SpringBoot 2.7.x
- RocketMQ 4.9.3
- RocketMQ-Spring-Boot-Starter 2.2.2
2. 添加依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
二、核心實(shí)現(xiàn):生產(chǎn)者與消費(fèi)者
1. 配置文件 (application.yml)
rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ NameServer地址
producer:
group: order_producer_group # 生產(chǎn)者組名
send-message-timeout: 3000 # 發(fā)送超時(shí)時(shí)間(ms)
2. 消息生產(chǎn)者服務(wù)
@Service
@RequiredArgsConstructor
public class OrderProducer {
private final RocketMQTemplate rocketMQTemplate;
// 發(fā)送普通消息
public void sendOrderMessage(Order order) {
Message<Order> message = MessageBuilder.withPayload(order)
.setHeader(RocketMQHeaders.KEYS, order.getOrderId())
.build();
rocketMQTemplate.send("order_topic", message);
log.info("訂單消息已發(fā)送: {}", order);
}
// 發(fā)送延遲消息(30秒后消費(fèi))
public void sendDelayMessage(Order order) {
rocketMQTemplate.syncSend("delay_order_topic",
MessageBuilder.withPayload(order).build(),
2000, // 發(fā)送超時(shí)
3 // 延遲級(jí)別(對(duì)應(yīng)30秒)
);
log.info("延遲訂單消息已發(fā)送: {}", order);
}
}
3. 消息消費(fèi)者服務(wù)
@Slf4j
@Service
@RocketMQMessageListener(
topic = "order_topic",
consumerGroup = "order_consumer_group",
selectorType = SelectorType.TAG,
selectorExpression = "normal || vip" // 過(guò)濾標(biāo)簽
)
public class OrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到訂單消息,開(kāi)始處理: {}", order);
// 業(yè)務(wù)處理邏輯...
processOrder(order);
}
private void processOrder(Order order) {
// 模擬業(yè)務(wù)處理
log.info("訂單處理完成: {}", order.getOrderId());
}
}
4. 訂單實(shí)體類
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order implements Serializable {
private String orderId;
private BigDecimal amount;
private LocalDateTime createTime;
private String userId;
private String orderType; // 用于消息過(guò)濾
}
三、高級(jí)特性實(shí)現(xiàn)
1. 事務(wù)消息處理(解決分布式事務(wù))
@Slf4j
@Service
@RocketMQTransactionListener
public class OrderTransactionListenerImpl implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
// 執(zhí)行本地事務(wù)
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Order order = (Order) msg.getPayload();
orderService.createOrder(order); // 本地?cái)?shù)據(jù)庫(kù)操作
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事務(wù)執(zhí)行失敗", e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
// 本地事務(wù)狀態(tài)回查
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderId = msg.getHeaders().get("orderId").toString();
return orderService.checkOrderExists(orderId) ?
RocketMQLocalTransactionState.COMMIT :
RocketMQLocalTransactionState.ROLLBACK;
}
}
2. 消息重試與死信隊(duì)列
// 消費(fèi)者配置重試策略
@RocketMQMessageListener(
topic = "important_order_topic",
consumerGroup = "important_order_group",
maxReconsumeTimes = 3 // 最大重試次數(shù)
)
public class ImportantOrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
try {
processImportantOrder(order);
} catch (Exception e) {
throw new RuntimeException("處理失敗,觸發(fā)重試");
}
}
// 達(dá)到最大重試次數(shù)后,消息進(jìn)入死信隊(duì)列
// 死信隊(duì)列命名: %DLQ%consumerGroup
}
四、性能優(yōu)化技巧
1. 批量消息發(fā)送 - 提升吞吐量
List<Message<Order>> messages = orders.stream()
.map(order -> new Message<>("batch_order_topic", order))
.collect(Collectors.toList());
SendResult result = rocketMQTemplate.syncSend("batch_order_topic", messages, 3000);
2. 消費(fèi)端并發(fā)配置
@RocketMQMessageListener(
topic = "high_concurrency_topic",
consumerGroup = "high_concurrency_group",
consumeThreadNumber = 32, // 消費(fèi)線程數(shù)
consumeTimeout = 15L // 消費(fèi)超時(shí)(分鐘)
)
3. 消息過(guò)濾優(yōu)化 - 使用SQL表達(dá)式
@RocketMQMessageListener(
topic = "filtered_order_topic",
consumerGroup = "filtered_order_group",
selectorType = SelectorType.SQL92,
selectorExpression = "amount > 100 AND userId LIKE 'VIP%'"
)
五、部署與監(jiān)控
1. RocketMQ集群部署建議
圖片
2. 監(jiān)控方案
- RocketMQ控制臺(tái):實(shí)時(shí)查看隊(duì)列情況
- Prometheus + Grafana:監(jiān)控關(guān)鍵指標(biāo)
消息堆積量
發(fā)送/消費(fèi)TPS
消費(fèi)延遲
- 日志監(jiān)控:ELK收集分析日志
結(jié)語(yǔ)
責(zé)任編輯:武曉燕
來(lái)源:
小林聊編程