成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

日常工作,MQ的八種常用使用場景

開發 前端
在用戶注冊場景中,當用戶信息保存成功后,系統需要發送一個短信、或者郵箱消息,通知用戶注冊成功。如果這個短信或者郵箱消息發送比較耗時,則可能拖垮注冊接口。又或者如果調用第三方短信、郵件發送接口失敗,也會影響注冊接口。

前言

大家好,我是田螺。

我們日常開發中,經常跟MQ(消息隊列)打交道。本文田螺哥梳理了MQ的8種使用場景。

圖片圖片

1. 異步處理

面試官在問我們MQ作用時,很多伙伴馬上想到異步處理、解耦、流量削鋒等等。

MQ 最常見的應用場景之一就是異步處理。

比如,在用戶注冊場景中,當用戶信息保存成功后,系統需要發送一個短信、或者郵箱消息,通知用戶注冊成功。如果這個短信或者郵箱消息發送比較耗時,則可能拖垮注冊接口。又或者如果調用第三方短信、郵件發送接口失敗,也會影響注冊接口。一般我們不希望一個通知類的功能,去影響注冊主流程,這時候,則可以使用MQ來實現異步處理。

簡要代碼如下:先保存用戶信息,然后發送注冊成功的MQ消息

// 用戶注冊方法
  public void registerUser(String username, String email, String phoneNumber) {
      // 保存用戶信息(簡化版)
      userService.add(buildUser(username,email,phoneNumber))
      // 發送消息
      String registrationMessage = "User " + username + " has registered successfully.";
      // 發送消息到隊列
      rabbitTemplate.convertAndSend("registrationQueue", registrationMessage);
  }

消費者從隊列中讀取消息并發送短信或郵件:

@Service
public class NotificationService {

    // 監聽消息隊列中的消息并發送短信/郵件
    @RabbitListener(queues = "registrationQueue")
    public void handleRegistrationNotification(String message) {
        // 這里可以進行短信或郵件的發送操作
        System.out.println("Sending registration notification: " + message);

        // 假設這里是發送短信的操作
        sendSms(message);

        // 也可以做其他通知(比如發郵件等)
        sendEmail(message);
    }
  }

2. 解耦

在微服務架構中,各個服務通常需要進行相互通信。使用 MQ 可以幫助解耦服務,避免直接調用導致的強耦合。

圖片圖片


一個電商平臺的庫存服務和支付服務。支付服務在處理支付后,需要向庫存服務發送扣庫存的請求,但不直接調用 API,而是通過 MQ 發送消息,讓庫存服務異步處理。

支付服務在支付成功后將消息發送到 RocketMQ:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class PaymentService {

    private DefaultMQProducer producer;

    public PaymentService() throws Exception {
        producer = new DefaultMQProducer("PaymentProducerGroup");
        producer.setNamesrvAddr("localhost:9876");  // RocketMQ NameServer 地址
        producer.start();
    }

    public void processPayment(String orderId, int quantity) throws Exception {
        // 1. 模擬調用支付接口(例如:支付寶、微信支付等)
        boolean paymentSuccessful = callPayment(orderId, quantity);

        if (paymentSuccessful) {
            // 2. 支付成功后,創建支付消息并發送到 RocketMQ
            String messageBody = "OrderId: " + orderId + ", Quantity: " + quantity;
            Message message = new Message("paymentTopic", "paymentTag", messageBody.getBytes());
            producer.send(message);    
        }
    }
}

庫存服務從 RocketMQ 中消費支付消息,并處理扣庫存的邏輯:

public class InventoryService {

    private DefaultMQPushConsumer consumer;

    public InventoryService() throws Exception {
        consumer = new DefaultMQPushConsumer("InventoryConsumerGroup");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("paymentTopic", "paymentTag");

        // 消息監聽器
        consumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                String messageBody = new String(msg.getBody());
                // 執行扣庫存操作
                reduceStock(messageBody);
            }
            return null; // 返回消費成功
        });

        consumer.start();
        System.out.println("InventoryService started...");
    }
}

3.流量削鋒

在高并發的情況下,有些請求可能會產生瞬時流量峰值,直接處理可能會導致服務過載。比如:

  • 春運快到了,12306的搶票就是這種案例。
  • 又或者雙12這種大促,訂單壓力會比較大。
  • 秒殺的時候,也需要避免流量暴漲,打垮應用系統的風險

這些場景,我們都可以使用MQ來進行流量的削峰填谷,確保系統平穩運行。

圖片圖片

假設秒殺系統每秒最多可以處理2k個請求,每秒卻有5k的請求過來,可以引入消息隊列,秒殺系統每秒從消息隊列拉2k請求處理得了。

4.延時任務

在電商平臺的訂單處理中,如果用戶下單后一定時間內未支付,需要自動取消訂單。通過MQ的延時隊列功能,可以設置消息延遲消費的時間,當消息到達延遲時間后,由消費者處理取消訂單的邏輯。

當用戶下單時,生成一個訂單并發送一條延遲消息到RocketMQ。延遲時間可以根據訂單的超時時間設置:

@Service
public class OrderService {
 
 @Autowired
 private RocketMQTemplate rocketMQTemplate;
 
 public void createOrder(Order order) {
  // 保存訂單邏輯(省略)
 
  // 計算延遲時間(單位:毫秒)
  long delay = order.getTimeout();
 
  // 發送延遲消息
  rocketMQTemplate.syncSend("orderCancelTopic:delay" + delay,
    MessageBuilder.withPayload(order).build(),
    10000, // 消息發送超時時間(單位:毫秒)
    (int) (delay / 1000) // RocketMQ的延遲級別是以秒為單位的,因此需要轉換為秒
  );
 }
}

注意:RocketMQ的延遲級別是固定的,如1s、5s、10s等。如果訂單的延遲時間不是RocketMQ支持的延遲級別的整數倍,那么消息將不會精確地在預期的延遲時間后被消費。為了解決這個問題,你可以選擇最接近的延遲級別,或者根據業務需求進行適當的調整。

創建一個用來消費延遲消息的消費者,處理取消訂單的邏輯。例如:

@Component
@RocketMQMessageListener(topic = "orderCancelTopic", consumerGroup = "order-cancel-consumer-group")
public class OrderCancelListener implements RocketMQListener<Order> {
 
 @Override
 public void onMessage(Order order) {
  // 取消訂單邏輯
  // 檢查訂單狀態,如果訂單仍處于未支付狀態則進行取消
  System.out.println("Cancelling order: " + order.getOrderId());
  // (省略實際的取消訂單邏輯)
 }
}

5. 日志收集

消息隊列常用于日志系統中,將應用生成的日志異步地發送到日志處理系統,進行統一存儲和分析。

假設你有一個微服務架構,每個微服務都會生成日志。你可以將這些日志通過消息隊列(如Kafka)發送到一個集中式的日志收集系統(如 ELK(Elasticsearch, Logstash, Kibana) 或 Fluentd),從而實現日志的統一管理。

生產者(發送日志到 Kafka)

// 配置和發送日志到 Kafka 主題 "app-logs"
KafkaProducer<String, String> producer = new KafkaProducer<>(config);
String logMessage = "{\"level\": \"INFO\", \"message\": \"Application started\", \"timestamp\": \"2024-12-29T20:30:59\"}";
producer.send(new ProducerRecord<>("app-logs", "log-key", logMessage));

消費者(收集日志信息)

@Service
public class LogConsumer {
    // 使用 @KafkaListener 注解來消費 Kafka 中的日志
    @KafkaListener(topics = "app-logs", groupId = "log-consumer-group")
    public void consumeLog(String logMessage) {
        // 打印或處理收到的日志
        System.out.println("Received log: " + logMessage);
    }
}

6.分布式事務

業界經常使用MQ來實現分布式事務。

我舉個下訂單的場景,使用MQ實現分布式事務的例子吧。

我們先來看,一條普通的MQ消息,從產生到被消費,大概流程如下:

圖片圖片

  • 生產者產生消息,發送帶MQ服務器
  • MQ收到消息后,將消息持久化到存儲系統。
  • MQ服務器返回ACk到生產者。
  • MQ服務器把消息push給消費者
  • 消費者消費完消息,響應ACK
  • MQ服務器收到ACK,認為消息消費成功,即在存儲中刪除消息。

回到下訂單這個例子,訂單系統創建完訂單后,再發送消息給下游系統。如果訂單創建成功,然后消息沒有成功發送出去,下游系統就無法感知這個事情,出導致數據不一致。

這時候就可以使用MQ實現分布式事務消息。大家看下這個流程:

圖片圖片

  • 生產者產生消息,發送一條半事務消息到MQ服務器
  • MQ收到消息后,將消息持久化到存儲系統,這條消息的狀態是待發送狀態。
  • MQ服務器返回ACK確認到生產者,此時MQ不會觸發消息推送事件
  • 生產者執行本地事務
  • 如果本地事務執行成功,即commit執行結果到MQ服務器;如果執行失敗,發送rollback。
  • 如果是正常的commit,MQ服務器更新消息狀態為可發送;如果是rollback,即刪除消息。
  • 如果消息狀態更新為可發送,則MQ服務器會push消息給消費者。消費者消費完就回ACK。
  • 如果MQ服務器長時間沒有收到生產者的commit或者rollback,它會反查生產者,然后根據查詢到的結果執行最終狀態。

7. 遠程調用

我以前公司(微眾)基于MQ(RocketMQ),自研了遠程調用框架。

RocketMQ 作為遠程調用框架,主要就是金融場景的適配性。

  • 消息查詢功能:RocketMQ提供了消息查詢功能,方便微眾銀行在需要時進行消息對賬或問題排查。
  • 金融級穩定性:RocketMQ在金融領域的應用非常廣泛,得到了眾多金融機構的認可。其穩定性和可靠性能夠滿足微眾銀行對金融級消息服務的需求。

還有可以基于RocketMQ的定制開發:多中心多活、灰度發布、流量權重與消息去重、背壓模式(能夠根據后續服務的治理能力決定拉取的消息數量,確保系統的穩定運行。)

8. 廣播通知:事件驅動的消息通知

消息隊列(MQ) 可以非常適合用于 廣播通知。在廣播通知場景中,消息隊列可以將消息推送到多個訂閱者,讓不同的服務或者應用接收到通知。

  • 系統通知:向所有用戶廣播應用更新、系統維護、公告通知等。
  • 事件驅動的消息通知:如庫存更新、用戶狀態變化、訂單支付成功等事件通知,多個系統可以訂閱這個事件。

針對事件驅動的消息通知,我們以 訂單支付成功 事件為例,假設多個系統(如庫存管理系統、用戶積分系統、財務系統等)都需要監聽這個事件來進行相應處理。

圖片

當訂單支付成功 事件發生時,系統會通過消息隊列廣播一個事件通知(比如消息內容是訂單ID、支付金額等),其他系統可以根據這個事件來執行相應的操作,如:

  • 庫存系統:根據訂單信息減少庫存。
  • 用戶積分系統:增加用戶積分。
  • 財務系統:記錄支付流水。

發送訂單支付成功事件:

// 創建訂單支付成功事件消息
String orderEventData = "{\"orderId\": 12345, \"userId\": 67890, \"amount\": 100.0, \"event\": \"ORDER_PAYMENT_SUCCESS\"}";
Message msg = new Message("order_event_topic", "order_payment_success", orderEventData.getBytes());

// 發送消息
producer.send(msg);

事件消費者(接收并處理訂單支付成功事件):

  • 庫存系統:
// 注冊消息監聽器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (Message msg : msgs) {
                String eventData = new String(msg.getBody());
                System.out.println("Inventory system received: " + eventData);
                
                // 處理庫存減少邏輯
                // 解析消息(假設是 JSON 格式)
                // updateInventory(eventData);  // 假設調用庫存更新方法
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
  • 積分系統:
// 注冊消息監聽器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (Message msg : msgs) {
                String eventData = new String(msg.getBody());
                System.out.println("Points system received: " + eventData);

                // 處理用戶積分增加邏輯
                // updateUserPoints(eventData);  // 假設調用積分更新方法
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
  • 財務系統:
// 注冊消息監聽器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (Message msg : msgs) {
                String eventData = new String(msg.getBody());
                System.out.println("Finance system received: " + eventData);

                // 處理財務記錄邏輯
                // recordPaymentTransaction(eventData);  // 假設調用財務記錄方法
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });


責任編輯:武曉燕 來源: 撿田螺的小男孩
相關推薦

2020-07-15 07:53:41

VSCode Task腳本命令

2023-11-20 13:52:00

Redis數據庫

2024-03-13 14:57:37

2009-03-27 10:25:24

OracleDBA職責

2022-05-31 08:21:07

MQ使用場景消費消息

2023-02-02 09:37:59

消息隊列MQ

2024-11-27 08:15:50

2011-07-30 13:01:23

2023-01-05 13:36:41

Script優化任務

2024-12-11 08:20:57

設計模式源碼

2022-07-29 07:48:15

HTTP常用狀態碼

2025-02-11 09:49:12

2023-05-16 07:47:18

RabbitMQ消息隊列系統

2020-02-14 13:50:32

JavaScript前端技術

2019-12-23 08:48:24

Java技術全局變量

2017-11-24 12:35:14

數據科學統計學習機器學習

2018-04-09 12:44:45

Docker使用場景開發

2015-01-06 09:48:34

Docker多租戶docker應用

2024-10-29 09:42:50

2018-12-03 12:20:52

Systemd定時器Linux
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久久精品成人 | 欧美一a| 日韩福利| 欧美在线精品一区 | 欧美精品在线一区 | 亚洲精品日韩精品 | 免费av直接看 | 久久久久久久久毛片 | 中文字幕一区在线 | 免费观看一级特黄欧美大片 | 久久久久久毛片免费观看 | 欧美色影院 | 久久精品国产一区二区三区 | 国产成人短视频在线观看 | 国外成人在线视频网站 | 日韩欧美一区二区三区免费看 | 欧美va大片| 视频一区 亚洲 | 国产精品亚洲精品 | 91精品国产乱码久久久久久久久 | 中文字幕亚洲视频 | 国产高清视频一区 | 国产精品日韩欧美一区二区三区 | www成人免费视频 | 午夜精品久久久久久久久久久久 | 欧美精品在线免费观看 | 91在线电影 | 久久综合av | 国产美女视频黄a视频免费 国产精品福利视频 | 久久午夜精品 | 一区二区三区日韩 | 亚洲精品欧美一区二区三区 | 久久精品国产久精国产 | 天天操天天干天天曰 | 欧美在线日韩 | 亚洲黄色国产 | 日韩精品一区二区三区视频播放 | 国产午夜三级一区二区三 | 国产精品jizz在线观看老狼 | 一区二区三区在线免费观看 | 在线激情视频 |