成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

SpringBoot 集成 RocketMQ:異步消息隊(duì)列實(shí)戰(zhàn),讓系統(tǒng)飛起來(lái)!

開(kāi)發(fā) 前端
本文將手把手教你如何在 SpringBoot 中集成 RocketMQ,實(shí)現(xiàn)完整的異步消息處理流程!

引言:為什么需要異步消息隊(duì)列?

在現(xiàn)代高并發(fā)系統(tǒng)中,解耦服務(wù)、削峰填谷、異步處理已成為架構(gòu)設(shè)計(jì)的核心需求。RocketMQ 作為阿里巴巴開(kāi)源的分布式消息中間件,憑借其高吞吐、低延遲、高可用的特性,成為企業(yè)級(jí)應(yīng)用的首選解決方案。

本文將手把手教你如何在 SpringBoot 中集成 RocketMQ,實(shí)現(xiàn)完整的異步消息處理流程!

一、環(huán)境準(zhǔn)備與項(xiàng)目搭建

1. 技術(shù)棧

  • JDK 1.8+
  • SpringBoot 2.7.x
  • RocketMQ 4.9.3
  • RocketMQ-Spring-Boot-Starter 2.2.2

2. 添加依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

二、核心實(shí)現(xiàn):生產(chǎn)者與消費(fèi)者

1. 配置文件 (application.yml)

rocketmq:
  name-server: 127.0.0.1:9876  # RocketMQ NameServer地址
  producer:
    group: order_producer_group # 生產(chǎn)者組名
    send-message-timeout: 3000  # 發(fā)送超時(shí)時(shí)間(ms)

2. 消息生產(chǎn)者服務(wù)

@Service
@RequiredArgsConstructor
public class OrderProducer {
    private final RocketMQTemplate rocketMQTemplate;


    // 發(fā)送普通消息
    public void sendOrderMessage(Order order) {
        Message<Order> message = MessageBuilder.withPayload(order)
                .setHeader(RocketMQHeaders.KEYS, order.getOrderId())
                .build();


        rocketMQTemplate.send("order_topic", message);
        log.info("訂單消息已發(fā)送: {}", order);
    }


    // 發(fā)送延遲消息(30秒后消費(fèi))
    public void sendDelayMessage(Order order) {
        rocketMQTemplate.syncSend("delay_order_topic", 
            MessageBuilder.withPayload(order).build(),
            2000,  // 發(fā)送超時(shí)
            3      // 延遲級(jí)別(對(duì)應(yīng)30秒)
        );
        log.info("延遲訂單消息已發(fā)送: {}", order);
    }
}

3. 消息消費(fèi)者服務(wù)

@Slf4j
@Service
@RocketMQMessageListener(
    topic = "order_topic",
    consumerGroup = "order_consumer_group",
    selectorType = SelectorType.TAG,
    selectorExpression = "normal || vip"  // 過(guò)濾標(biāo)簽
)
public class OrderConsumer implements RocketMQListener<Order> {


    @Override
    public void onMessage(Order order) {
        log.info("收到訂單消息,開(kāi)始處理: {}", order);
        // 業(yè)務(wù)處理邏輯...
        processOrder(order);
    }


    private void processOrder(Order order) {
        // 模擬業(yè)務(wù)處理
        log.info("訂單處理完成: {}", order.getOrderId());
    }
}

4. 訂單實(shí)體類

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order implements Serializable {
    private String orderId;
    private BigDecimal amount;
    private LocalDateTime createTime;
    private String userId;
    private String orderType; // 用于消息過(guò)濾
}

三、高級(jí)特性實(shí)現(xiàn)

1. 事務(wù)消息處理(解決分布式事務(wù))

@Slf4j
@Service
@RocketMQTransactionListener
public class OrderTransactionListenerImpl implements RocketMQLocalTransactionListener {


    @Autowired
    private OrderService orderService;


    // 執(zhí)行本地事務(wù)
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            Order order = (Order) msg.getPayload();
            orderService.createOrder(order); // 本地?cái)?shù)據(jù)庫(kù)操作
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("本地事務(wù)執(zhí)行失敗", e);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }


    // 本地事務(wù)狀態(tài)回查
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String orderId = msg.getHeaders().get("orderId").toString();
        return orderService.checkOrderExists(orderId) ? 
            RocketMQLocalTransactionState.COMMIT : 
            RocketMQLocalTransactionState.ROLLBACK;
    }
}

2. 消息重試與死信隊(duì)列

// 消費(fèi)者配置重試策略
@RocketMQMessageListener(
    topic = "important_order_topic",
    consumerGroup = "important_order_group",
    maxReconsumeTimes = 3  // 最大重試次數(shù)
)
public class ImportantOrderConsumer implements RocketMQListener<Order> {


    @Override
    public void onMessage(Order order) {
        try {
            processImportantOrder(order);
        } catch (Exception e) {
            throw new RuntimeException("處理失敗,觸發(fā)重試");
        }
    }


    // 達(dá)到最大重試次數(shù)后,消息進(jìn)入死信隊(duì)列
    // 死信隊(duì)列命名: %DLQ%consumerGroup
}

四、性能優(yōu)化技巧

1. 批量消息發(fā)送 - 提升吞吐量

List<Message<Order>> messages = orders.stream()
    .map(order -> new Message<>("batch_order_topic", order))
    .collect(Collectors.toList());


SendResult result = rocketMQTemplate.syncSend("batch_order_topic", messages, 3000);

2. 消費(fèi)端并發(fā)配置

@RocketMQMessageListener(
    topic = "high_concurrency_topic",
    consumerGroup = "high_concurrency_group",
    consumeThreadNumber = 32,  // 消費(fèi)線程數(shù)
    consumeTimeout = 15L       // 消費(fèi)超時(shí)(分鐘)
)

3. 消息過(guò)濾優(yōu)化 - 使用SQL表達(dá)式

@RocketMQMessageListener(
    topic = "filtered_order_topic",
    consumerGroup = "filtered_order_group",
    selectorType = SelectorType.SQL92,
    selectorExpression = "amount > 100 AND userId LIKE 'VIP%'"
)

五、部署與監(jiān)控

1. RocketMQ集群部署建議

圖片圖片

2. 監(jiān)控方案

  • RocketMQ控制臺(tái):實(shí)時(shí)查看隊(duì)列情況
  • Prometheus + Grafana:監(jiān)控關(guān)鍵指標(biāo)

消息堆積量

發(fā)送/消費(fèi)TPS

消費(fèi)延遲

  • 日志監(jiān)控:ELK收集分析日志

結(jié)語(yǔ)

異步消息隊(duì)列是構(gòu)建高并發(fā)系統(tǒng)的基石,合理使用RocketMQ能讓你的系統(tǒng)在流量洪峰面前游刃有余!
責(zé)任編輯:武曉燕 來(lái)源: 小林聊編程
相關(guān)推薦

2019-03-25 08:05:35

Elasticsear優(yōu)化集群

2020-09-29 07:54:05

Express 飛起

2011-04-13 10:51:58

MATLAB

2025-03-28 03:20:00

MySQL數(shù)據(jù)庫(kù)搜索

2019-11-05 10:35:57

SpringBoot調(diào)優(yōu)Java

2022-10-09 18:14:31

訂單系統(tǒng)分庫(kù)分表

2021-07-13 07:52:03

SQL面試COUNT(*)

2025-01-17 09:23:31

2013-01-07 09:34:43

CodeLoveBAT

2011-02-25 08:39:11

QFabric數(shù)據(jù)中心Juniper

2016-01-19 17:03:59

數(shù)據(jù)中心網(wǎng)絡(luò)華為

2025-04-15 00:00:00

2024-11-27 09:46:34

2023-03-01 23:59:23

Java開(kāi)發(fā)

2011-09-27 13:25:05

Web

2024-06-12 12:28:23

2009-03-20 14:18:38

機(jī)房數(shù)據(jù)傳輸安全

2016-05-11 09:18:21

AWS云數(shù)據(jù)倉(cāng)庫(kù)Redshift

2023-11-10 18:03:04

業(yè)務(wù)場(chǎng)景SQL

2024-11-25 18:00:00

C#代碼編程
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 在线观看av中文字幕 | av网站在线免费观看 | 91色在线视频 | 欧美亚洲网站 | 日本91av视频 | 国产91在线 | 中日 | 一区欧美 | 99热.com| 久久91 | 国产亚洲二区 | 日韩成人影院 | 成人免费大片黄在线播放 | 一区二区影视 | 中文字幕日韩在线观看 | 黄色一级片视频 | 黄色大片视频 | 久久久久久久久久久久久久久久久久久久 | 亚洲三区在线 | 在线免费国产 | 日本精品视频 | 国产91在线播放 | 美女在线观看av | 成人欧美一区二区 | 婷婷在线免费 | 草久久| 久久av综合 | 亚洲a视 | 嫩草视频在线免费观看 | 黄色毛片在线看 | 免费亚洲视频 | 天堂av中文 | 成人免费视频7777777 | 午夜视频精品 | 欧美2区 | 精品欧美一区二区在线观看欧美熟 | 国产九九九九 | 自拍视频在线观看 | 91社影院在线观看 | av网站免费观看 | 日韩精品在线看 | 国产一区二区在线视频 |