成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Springboot實現Rabbitmq死信隊列以及延遲隊列的優化

開發 前端
由于特定原因導致隊列中的消息不能被消費,這樣的消息如果沒有后續處理就可以放入死信隊列中,例如一個訂單如果超時未被支付從而自動失效,就將這個訂單放到死信隊列中。

導入依賴:

后續延遲隊列優化用Springboot整合,先理解死信隊列

<!--RabbitMQ依賴-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.12.0</version>
        </dependency>

死信隊列

由于特定原因導致隊列中的消息不能被消費,這樣的消息如果沒有后續處理就可以放入死信隊列中,例如一個訂單如果超時未被支付從而自動失效,就將這個訂單放到死信隊列中。(死信隊列中的消息是可以被消費的)

死信隊列產生的原因

消息TTL過期

就是在規定的時間內消息沒有被消費,(和延遲隊列不同,延遲隊列時表示到達時間消息才可以被消費)

在生產者代碼中設置消息過期時間:

//生產者發送消息,將消息設置為TTL消息
        AMQP.BasicProperties properties =
                new AMQP.BasicProperties().builder().expiration("10000").build();

修改隊列參數argument的特殊屬性:

arguments.put("x-dead-letter-exchange", EXCHANGE_DIRECT_DEAD);//死信交換機
arguments.put("x-dead-letter-routing-key", "routingkey_direct-dead");//死信rotingkey
arguments.put("x-message-TTL", 10000);//設置過期時間(單位毫秒)  
//將死信交換機與死信隊列綁定

模擬代碼:

消費者1

public class Consumer01 {
    public static final String EXCHANGE_DIRECT = "exchange_direct";//普通交換機的名稱
    public static final String EXCHANGE_DIRECT_DEAD = "exchange_direct_dead";//死信交換機的名稱
    public static final String QUEUE_PLAIN = "queue_plain";//普通隊列的名稱
    public static final String QUEUE_PLAIN_DEAD = "queue_plain_dead";//死信隊列的名稱
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMqUtils.createChannel();
        //聲明死信交換機和普通交換機
        channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(EXCHANGE_DIRECT_DEAD, BuiltinExchangeType.DIRECT);
        //聲明普通隊列(綁定普通隊列與死信交換機的關系,在通過rotingkey綁定死信隊列
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", EXCHANGE_DIRECT_DEAD);//死信交換機
        arguments.put("x-dead-letter-routing-key", "routingkey_direct-dead");//死信rotingkey
        //設置過期時間(單位毫秒)
        arguments.put("x-message-TTL", 10000);
        channel.queueDeclare(QUEUE_PLAIN, false, false, false, arguments);
        //聲明死信隊列
        channel.queueDeclare(QUEUE_PLAIN_DEAD, false, false, false, null);
        //普通交換機和隊列的綁定
        channel.queueBind(QUEUE_PLAIN, EXCHANGE_DIRECT, "routingkey_direct");
        //死信交換機和死信隊列的綁定
        channel.queueBind(QUEUE_PLAIN_DEAD, EXCHANGE_DIRECT_DEAD, "routingkey_direct-dead");
        //模擬超時時間消息未被消費
        Thread.sleep(1000000);
        channel.basicConsume(QUEUE_PLAIN, true, (consumerTag, message) -> {
            System.out.println("Consumer01.main接受到消息:" + new String(message.getBody()));
        }, (consumerTag, sig) -> {
        });
    }
}

生產者

public class Produce {
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.createChannel();
        //生產者發送消息,將消息設置為TTL消息
        AMQP.BasicProperties properties =
                new AMQP.BasicProperties().builder().expiration("10000").build();

        for (int i = 0; i < 10; i++) {
            String message = i + "";
            channel.basicPublish(Consumer01.EXCHANGE_DIRECT,"routingkey_direct",properties,message.getBytes(StandardCharsets.UTF_8));

        }

    }
}

消費者2

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.createChannel();
        channel.basicConsume(Consumer01.QUEUE_PLAIN_DEAD, true, (consumerTag, message) -> {
            System.out.println("Consumer2.main接受死信隊列的消息:" + new String(message.getBody()));
        }, (consumerTag, sig) -> {
        });

    }
}
/**輸出結果:
Consumer2.main接受死信隊列的消息:0
Consumer2.main接受死信隊列的消息:1
Consumer2.main接受死信隊列的消息:2
Consumer2.main接受死信隊列的消息:3
Consumer2.main接受死信隊列的消息:4
Consumer2.main接受死信隊列的消息:5
Consumer2.main接受死信隊列的消息:6
Consumer2.main接受死信隊列的消息:7
Consumer2.main接受死信隊列的消息:8
Consumer2.main接受死信隊列的消息:9
    */

隊列達到了最大長度

將RabbiMQ的隊列的argument屬性的鍵設置為 x-max-length 表示隊列可以容納的最大條數

消息被拒絕

將自動應答設為false

在消費者調一個Channel.basicReject,設置參數requeue為false,表示不重新排隊,將消息丟到死信隊列

延遲隊列優化

延遲隊列就是講一個消息延遲發送,例如消息在隊列中10s后才能被取出,可以通過RabbitMQ的插件或者死信隊列來實現

用死信隊列實現延遲隊列的思路:

在于死信隊列綁定的普通隊列不設置消費者,利用TTL延遲消息,當TTL時間過期后,到達死信隊列被消費這樣就形成一個延遲隊列。

延遲隊列的使用場景:①典型的就是流量削峰,對于不重要的消息,可以延遲消費,有助于減輕數據庫的壓力,強化分布式系統的高可用和并發性能。②還可以實現一個消息提醒,例如用戶三天未登錄發送一個消息提醒。

在實際生產中可能存在很多不同的延遲時間要求,不可能每一個延遲要求就創造一個隊列,我們可以用生產者實現延遲信息,而隊列不設置TTL就可以根據生產的延遲消息進行延遲發送。

但是此方法雖然實現了一個隊列就可以轉發不同延時時間的消息,但是有缺陷,隊列中的消息是排隊發送的,也就是說如果我第一條消息發送20s延時,接著第二條消息發送2s延時。最后卻是20s消息先消費,而2s消息后消費,因為RabbitMQ在檢測一條消息時發生了20s的阻塞。如下:

###
GET http://localhost:8080/ttl/sendExpirationMessage/aaaaa/20000
###
GET http://localhost:8080/ttl/sendExpirationMessage/bbbbb/2000
最后輸出結果是先消費aaaa后消費bbbb

可以通過RabbitMQ的插件實現延時隊列,此方法沒有這缺陷

從官網上下載對應版本的延遲插件,下載后如圖:交換機類型會多出一個 x-delayed-message


在我們自定義的交換機中,這是一種新的交換機類型,該類型消息支持延遲投遞機制,消息傳遞后并不會立即投遞到目標隊列中,而是存儲在mnesia(一個分布式數據系統)表中,當達到投遞時間時,才會投遞到目標隊列中。

代碼實例:

配置類:

@Configuration
public class RabbitDelayedConfig {
    //延遲交換機
    public static final String DELAYED_EXCHANGE = "delayed.exchange";
    //延遲隊列b
    public static final String DELAYED_QUEUE = "delayed.queue";
    //延遲交換機和隊列的routingkey
    public static final String DELAYED_ROTINGKEY = "delayed.routingkey";

    //public CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
    //		super(name, durable, autoDelete, arguments);
    //		this.type = type;
    //	}
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> arguments = new HashMap<>();
        //定義延遲消息類型由那種交換機規則處置
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, arguments);
    }

    @Bean
    public Queue delayedQueue() {
        return QueueBuilder
                .nonDurable(DELAYED_QUEUE)
                .build();
    }

    @Bean
    public Binding delayedBinding() {
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROTINGKEY).noargs();

    }
}

生產者:

/*延遲交換機發送消息*/
    @GetMapping("/sendDelayedMessage/{message}/{delayedTTL}")
    public void sendDelayedMessage(@PathVariable String message, @PathVariable Integer delayedTTL) {
        log.info("當前時間:{},發送一條延遲時間為{}的延遲消息給延遲隊列:{}", new Date().toString(), delayedTTL, message);
        rabbitTemplate.convertAndSend(RabbitDelayedConfig.DELAYED_EXCHANGE,
                RabbitDelayedConfig.DELAYED_ROTINGKEY,
                message,
                msg -> {
                    msg.getMessageProperties().setDelay(delayedTTL);//設置消息的延遲消息時間
                    return msg;
                });
    }

消費者:

@Slf4j
@Component
public class DelayedQueueConsumer {

    @RabbitListener(queues = RabbitDelayedConfig.DELAYED_QUEUE)
    public void queue(Message message) {
        log.info("接受到延遲隊列的消息,當前時間:{},消息:{}",new Date().toString(),new String(message.getBody()));
    }
}
責任編輯:武曉燕 來源: 今日頭條
相關推薦

2023-04-27 07:43:22

RabbitMQ重試隊列死信隊列

2023-09-05 15:48:14

RabbitMQ延遲隊列

2024-04-15 00:00:00

RabbitMQ死信隊列消息

2024-03-18 00:00:03

RabbitMQ代碼延遲隊列

2023-08-08 08:28:03

消息消費端Spring

2024-04-19 00:47:07

RabbitMQ消息機制

2024-12-25 09:32:06

2023-10-23 10:02:58

RabbitMQ延遲隊列

2021-12-08 10:47:35

RabbitMQ 實現延遲

2024-01-26 13:16:00

RabbitMQ延遲隊列docker

2024-04-28 08:52:33

RabbitMQ延遲隊列延遲插件

2023-11-03 10:33:26

2021-10-15 10:39:43

RabbitMQ隊列延遲

2024-11-05 16:58:21

RabbitMQ訂單超時取消延遲隊列

2024-10-16 09:29:30

RabbitMQ延遲隊列

2024-05-08 14:49:22

Redis延遲隊列業務

2024-07-16 18:05:19

延遲隊列MQRabbitMQ

2025-02-19 00:00:00

RabbitMQTTL插件

2018-07-20 09:16:04

鏈式存儲結構

2020-07-30 08:03:36

MQ死信隊列
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 欧美国产日韩精品 | 欧美一区二区在线播放 | 久久精品国产亚洲 | 一区二区三区欧美在线观看 | 草久久 | www.久久国产精品 | 国产精品久久 | 国产精品视频999 | 91原创视频在线观看 | 日韩视频在线一区二区 | 成人高清网站 | 久久精品综合 | av黄色片在线观看 | 黄色国产大片 | 国产草草视频 | 日韩免费视频一区二区 | 波多野结衣在线观看一区二区三区 | 免费看国产片在线观看 | 精品一区二区三区四区五区 | 一区二区国产在线观看 | 中文字幕精品一区二区三区精品 | 一区二区在线 | 欧美在线a| 天天影视色综合 | 欧美阿v| 综合网在线 | 国产一区二区 | 国产在线精品一区二区三区 | 欧美日韩中 | 免费在线视频精品 | 欧洲一级毛片 | 欧美精品久久久久 | 亚洲一区二区三区在线 | 日本午夜精品一区二区三区 | 视频在线一区二区 | 久久精品亚洲欧美日韩久久 | 亚洲午夜一区二区 | 一区二区三区四区在线视频 | 久久香蕉精品视频 | 久久午夜精品福利一区二区 | 国产日韩欧美在线 |