成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

你心心念念的RabbitMQ個人實踐來了來了它來了

開發 前端
MQ(Message Queue)就是消息隊列,其有點有很多:解耦、異步、削峰等等,本文來聊一下RabbitMQ的一些概念以及使用。

前言

MQ(Message Queue)就是消息隊列,其有點有很多:解耦、異步、削峰等等,本文來聊一下RabbitMQ的一些概念以及使用。

RabbitMq

案例

Springboot整合RabbitMQ簡單案例

基本概念

  • Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
  • Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
  • Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
  • Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
  • Producer:消息生產者,就是投遞消息的程序。
  • Consumer:消息消費者,就是接受消息的程序。

發布消息到RabbitMQ需要經過兩步:

  1. producer → exchange
  2. exchange 根據 exchange 的類型和 routing key 確定將消息投遞到哪個隊列

工作流程

了解了RabbitMQ的一些概念,我們來捋捋使用RabbitMQ的流程:

  1. 創建Exchange
  2. 創建Queue
  3. 將Queue綁定進Exchange中(此處會設置routing key)
  4. 生產者發布消息
  5. 消費者訂閱消息

交換機(Exchange)

交換機可以綁定隊列,綁定時可以給隊列指定路由(Routing key)和參數(Arguments)

所有的消息發送都是經過交換機轉發到隊列的,而不是直接到隊列中

交換機類型:

  • direct
  • 根據確定的路由(routing key)轉發消息到隊列中(一條消息可以發到多個隊列,只要路由相同)
  • fanout
  • 路由無效,只要和該交換機綁定的隊列,都能接收到消息
  • topic
  • 允許路由使用*和#來進行模糊匹配
  • *表示一個單詞
  • 表示任意數量(零個或多個)單詞
  • 例如:如果隊列的路由為com.# 那么往交換機發消息是,路由填com.ccc 隊列就可以收到消息
  • headers
  • 忽略路由,由參數(Arguments)來確定轉發的隊列

消息過期時間TTL

有兩種方式設置TTL,創建隊列時設置整個隊列的TTL或者在發送消息時單獨設置每條消息的TTL,消息存活時間取兩者的最小值。

  1. 創建隊列時設置
  2. 是消息的存活時間,不是隊列的存活時間,別搞混了。
  3. @Beanpublic Queue queue(){ Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 5000); // 設置隊列中的消息5秒過期 return new Queue("queueName",true, false, false, args);}
  4. 發送消息時設置
  5. public void makeOrder(String userid,String productid,int num){ String exchangeName = "ttl_exchange"; String routingKey = "ttlmessage"; //給消息設置過期時間 MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){ public Message postProcessMessage(Message message){ // 設置消息5秒過期 message.getMessageProperties().setExpiration("5000"); return message; } } rabbitTemplate.convertAndSend(exchangeName,routingKey,"message",messagePostProcessor);}

死信隊列

死信隊列也是一個正常隊列,只是當綁定了死信隊列的隊列滿足相應條件,就會將滿足條件的消息轉移到死信隊列中。

進入死信隊列的條件:

  1. 消息被拒絕
  2. 消息過期(超時)
  3. 隊列達到最大長度

死信隊列的配置:

  1. 按照正常步驟定義一個隊列(交換機、隊列、綁定)
  2. 給需要綁定死信隊列的隊列添加x-dead-letter-exchange(死信隊列的交換機)和x-dead-letter-routing-key(死信隊列的路由)參數
  3. @Beanpublic Queue queue(){ Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "死信隊列交換機名稱"); args.put("x-dead-letter-routing-key", "死信隊列路由"); return new Queue("queueName",true, false, false, args);}

如何保證MQ消息正確送達與消費

可靠性生產和推送

步驟:

  1. 發送消息前數據庫保存MQ消息發送日志
  2. MQ消息發送后使用回調更新日志狀態

實現:

上面我們講了,發布消息到RabbitMQ需要經過兩步:

producer → exchange
exchange 根據 exchange 的類型和 routing key 確定將消息投遞到哪個隊列

所以,發布消息的確認也分兩個部分,以下是確認步驟:

  1. 修改MQ應答機制(yml)
  2. spring: rabbitmq: username: rmq password: 123456 virtual-host: / host: localhost port: 5672 # 發送消息確認,producer -> exchange publisher-confirm-type: correlated # 發送消息確認,exchange -> queue publisher-returns: true
  3. 新增mq的回調方法
  4. /** * PostConstruct注解好多人以為是Spring提供的。其實是Java自己的注解。 * Java中該注解的說明:@PostConstruct該注解被用來修飾一個非靜態的void()方法。 * 被@PostConstruct修飾的方法會在服務器加載Servlet的時候運行,并且只會被服務器執行一次。 * PostConstruct在構造函數之后執行,init()方法之前執行。 * Constructor(構造方法) -> @Autowired(依賴注入) -> @PostConstruct(注釋的方法) */@PostConstructprivate void regCallBack() { // producer -> exchange 成功或失敗都會觸發此回調 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // 這個id是在消息發送的時候傳入的 String id = correlationData.getId(); // 如果ack為true代表消息被mq成功接收 if (!ack) { // 應答失敗,修改日志狀態 System.out.println("exchange 應答失敗,做失敗處理!"); } else { // 應答成功,修改日志狀態 System.out.println("exchange 成功處理"); } } }); // 這個回調只有exchange -> queue 失敗時才會觸發 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("exchange -> queue 發送失敗"); } });}
  5. 修改MQ發送消息的方法,增加日志id的傳遞
  6. String correlationId = "這是日志id";rabbitTemplate.convertAndSend(exchange, routeKey, message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 消費者需要correlationId才做這個處理 message.getMessageProperties().setCorrelationId(correlationId); return message; }}, new CorrelationData(correlationId));// 如果消費者不需要獲取correlationId,則用下面這種即可rabbitTemplate.convertAndSend(exchange, routeKey, msg, new CorrelationData(correlationId));

可靠性消費

步驟:

  • 開啟手動應答
  • 監聽器增加手動應答邏輯

實現:

  1. 開啟手動應答
  2. spring: rabbitmq: username: rmq password: 123456 virtual-host: / host: localhost port: 5672 listener: simple: acknowledge-mode: manual # 將自動應答ack模式改成手動應答
  3. acknowledge-mode有三種類型:
  4. nome:不進行ack,rabbitmq默認消費者正確處理所有請求
  5. munual:手動確認
  6. auto:自動確認消息(默認類型)。如果消費者拋出異常,則消息重回隊列。
  7. 監聽器增加手動應答邏輯
  8. @RabbitListener(queues = {"隊列名字"})public void messageConsumer(String orderMsg, Channel channel, @Headers Map<String,Object> headers) throws Exception{ // 需要producer做相應處理,consumer才能拿到correlationId String correlationId = messages.getMessageProperties().getCorrelationId(); System.out.println("消息為:" + orderMsg); long tag = Long.parseLong(headers.get(AmqpHeaders.DELIVERY_TAG).toString()); try { // 消費成功,進行確認 channel.basicAck(tag, false); } catch (IOException e) { // 消費失敗,重發 // requeue代表是否重發,為false則直接將消息丟棄,有死信就進入死信隊列 channel.basicNack(tag, false, true); }}

總結

本文介紹了RabbitMQ的一些概念和簡單使用,有不少東西其實是沒有講清楚的,比如publisher-confirm-type和acknowledge-mode的幾種類型的區別等等。主要是在官方文檔找不到相關的細致描述,查文檔的能力還是有待提高。。。

責任編輯:武曉燕 來源: Java碼農之路
相關推薦

2021-04-16 16:21:02

鴻蒙HarmonyOS應用開發

2025-01-15 10:02:09

APIVueDOM

2017-08-14 16:11:28

戴爾電腦

2024-01-15 16:34:13

模型訓練

2023-03-15 08:03:31

2022-07-19 08:04:04

HTTP應用層協議

2021-10-28 18:58:57

動態規劃數據結構算法

2021-09-28 14:25:48

iPhone 13蘋果手機

2013-07-12 09:59:58

Android 5.0

2010-02-03 13:25:34

云計算

2024-05-08 08:50:39

React19模式UI

2015-03-05 09:26:28

網絡中立命名數據網絡FDD

2016-09-02 08:20:33

OpsDevWWDCDevOps

2024-07-23 08:59:17

Set開發前端

2023-04-23 18:56:53

LinuxKubuntu

2014-06-23 09:04:56

Docker

2014-12-08 09:55:33

Android 5.0Google

2015-12-29 13:32:41

2020-10-09 08:38:14

Python 3.9Python開發

2018-04-25 15:53:12

霧計算
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久久久久久久综合 | 国产精品成人免费 | 老司机精品福利视频 | 午夜av电影院 | 国产欧美一区二区三区日本久久久 | 99tv| 羞羞涩涩在线观看 | 成人三级影院 | 国产精品自在线 | 久久爱黑人激情av摘花 | 亚洲 中文 欧美 日韩 在线观看 | 国产9 9在线 | 中文 | 日韩一区二区在线观看视频 | 中文字幕高清免费日韩视频在线 | 日韩电影一区 | 久草色播 | 亚洲国产一区二区在线 | 一区二区不卡 | 成年人免费在线视频 | 日韩成人高清在线 | 99久久久久| 亚洲国产精品一区二区三区 | 综合天天久久 | 中文字幕视频一区 | 国产在线视频三区 | 欧美激情在线观看一区二区三区 | 天堂久 | 亚洲在线视频 | 欧美日韩国产在线观看 | 国产精品久久久久久亚洲调教 | 7777在线 | 亚洲人成人一区二区在线观看 | 国产亚洲精品精品国产亚洲综合 | 精品不卡 | 国产成人精品久久二区二区91 | 免费国产成人av | 毛片一级网站 | 日韩综合网 | h视频在线播放 | 亚洲国产一区在线 | 欧美综合一区二区三区 |