成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

SpringBoot整合RocketMQ實現事務/廣播/順序消息詳解

開發 前端
TransactionStatus.CommitTransaction:提交事務消息,消費者可以消費此消息,TransactionStatus.RollbackTransaction:回滾事務,它代表該消息將被刪除,不允許被消費。

環境:springboot2.4.12 + RocketMQ4.8.0

依賴

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.0</version>
</dependency>

配置文件

server:
  port: 8080
---
rocketmq:
  nameServer: localhost:9876
  producer:
    group: demo-mq

普通消息

發送

@Resource
private RocketMQTemplate rocketMQTemplate ;
  
public void send(String message) {
  rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build());
}

接收

@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2")
@Component
public class ConsumerListener implements RocketMQListener<String> {


  @Override
  public void onMessage(String message) {
    System.out.println("接收到消息:" + message) ;
  }


}

順序消息

發送

@Resource
private RocketMQTemplate rocketMQTemplate ;


public void sendOrder(String topic, String message, String tags, int id) {
  rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(), 
    "order-" + id, new SendCallback() {
      @Override
      public void onSuccess(SendResult sendResult) {
        System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ;
      }
      @Override
      public void onException(Throwable e) {
        e.printStackTrace() ;
      }
    });
}

這里是根據hashkey將消息發送到不同的隊列中

@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", 
  selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY)
@Component
public class ConsumerOrderListener implements RocketMQListener<String> {


  @Override
  public void onMessage(String message) {
    System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ;
  }


}

consumeMode = ConsumeMode.ORDERLY,指明了消息模式為順序模式,一個隊列,一個線程。

結果

圖片圖片

當consumeMode = ConsumeMode.CONCURRENTLY執行結果如下:

圖片圖片

集群/廣播消息模式

發送端

@Resource
private RocketMQTemplate rocketMQTemplate ;
  
public void send(String topic, String message, String tags) {
  rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ;
}

集群消息模式

消費端

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", 
  selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {


  @Override
  public void onMessage(String message) {
    System.out.println("ConsumerBroadListener1接收到消息:" + message) ;
  }


}

messageModel = MessageModel.CLUSTERING

測試

啟動兩個服務分別端口是8080,8081

8080服務

圖片圖片

8081服務

圖片圖片

集群消息模式下,每個服務分別接收一部分消息,實現了負載均衡

廣播消息模式

消費端

@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", 
  selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING)
@Component
public class ConsumerBroadListener implements RocketMQListener<String> {


  @Override
  public void onMessage(String message) {
    System.out.println("ConsumerBroadListener1接收到消息:" + message) ;
  }


}

messageModel = MessageModel.BROADCASTING

測試

啟動兩個服務分別端口是8080,8081

8080服務

圖片圖片

8081服務

圖片圖片

集群消息模式下,每個服務分別都接受了同樣的消息。

事務消息

RocketMQ事務的3個狀態

TransactionStatus.CommitTransaction:提交事務消息,消費者可以消費此消息TransactionStatus.RollbackTransaction:回滾事務,它代表該消息將被刪除,不允許被消費。TransactionStatus.Unknown :中間狀態,它代表需要檢查消息隊列來確定狀態。

RocketMQ實現事務消息主要分為兩個階段:正常事務的發送及提交、事務信息的補償流程 整體流程為:

正常事務發送與提交階段

1、生產者發送一個半消息給MQServer(半消息是指消費者暫時不能消費的消息)2、服務端響應消息寫入結果,半消息發送成功3、開始執行本地事務4、根據本地事務的執行狀態執行Commit或者Rollback操作

事務信息的補償流程1、如果MQServer長時間沒收到本地事務的執行狀態會向生產者發起一個確認回查的操作請求2、生產者收到確認回查請求后,檢查本地事務的執行狀態3、根據檢查后的結果執行Commit或者Rollback操作補償階段主要是用于解決生產者在發送Commit或者Rollback操作時發生超時或失敗的情況。

發送端

@Resource
private RocketMQTemplate rocketMQTemplate ; 
public void sendTx(String topic, Long id, String tags) {
  rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(
    new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))).
    setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(), 
    UUID.randomUUID().toString().replaceAll("-", "")) ;
}

生產者對應的監聽器

@RocketMQTransactionListener
public class ProducerTxListener implements RocketMQLocalTransactionListener {
  
  @Resource
  private BusinessService bs ;


  @Override
  public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    // 這里執行本地的事務操作,比如保存數據。
    try {
      // 創建一個日志記錄表,將這唯一的ID存入數據庫中,在下面的check方法中可以根據這個id查詢是否有數據
      String id = (String) msg.getHeaders().get("BID") ;
      Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ;
      System.out.println("消息內容:" + users + "\t參與數據:" + arg + "\t本次事務的唯一編號:" + id) ;
      bs.save(users, new UsersLog(users.getId(), id)) ;
    } catch (Exception e) {
      e.printStackTrace() ;
      return RocketMQLocalTransactionState.ROLLBACK ;
    }
    return RocketMQLocalTransactionState.COMMIT ;
  }


  @Override
  public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
    // 這里檢查本地事務是否執行成功
    String id = (String) msg.getHeaders().get("BID") ;
    System.out.println("執行查詢ID為:" + id + " 的數據是否存在") ;
    UsersLog usersLog = bs.queryUsersLog(id) ;
    if (usersLog == null) {
      return RocketMQLocalTransactionState.ROLLBACK ;
    }
    return RocketMQLocalTransactionState.COMMIT ;
  }


}

消費端

@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10")
@Component
public class ConsumerTxListener implements RocketMQListener<Users> {


  @Override
  public void onMessage(Users users) {
    System.out.println("TX接收到消息:" + users) ;
  }


}

Service

@Transactional
public boolean save(Users users, UsersLog usersLog) {
  usersRepository.save(users) ;
  usersLogRepository.save(usersLog) ;
  if (users.getId() == 1) {
    throw new RuntimeException("數據錯誤") ;
  }
  return true ;
}
  
public UsersLog queryUsersLog(String bid) {
  return usersLogRepository.findByBid(bid) ;
}

Controller

@GetMapping("/tx/{id}")
public Object sendTx(@PathVariable("id")Long id) {
  ps.sendTx("tx-topic", id, "tag10") ;
  return "send transaction success" ;
}

測試

調用接口后,控制臺輸出:

圖片圖片

從打印日志看出來都保存完畢了后 消費端才接受到消息。

圖片圖片

圖片圖片

刪除數據,再測試ID為1會報錯的。

圖片圖片

數據庫中沒有數據。。。

是不是也不是很復雜,2個階段來處理。

完畢?。。?/p>

責任編輯:武曉燕 來源: Spring全家桶實戰案例源碼
相關推薦

2021-04-15 09:17:01

SpringBootRocketMQ

2024-11-11 13:28:11

RocketMQ消息類型FIFO

2024-10-29 08:34:27

RocketMQ消息類型事務消息

2024-06-13 09:25:14

2022-06-02 08:21:07

RocketMQ消息中間件

2023-07-17 08:34:03

RocketMQ消息初體驗

2021-07-13 11:52:47

順序消息RocketMQkafka

2021-04-07 08:43:09

SpringBootRocketMQ開發技術

2022-06-27 11:04:24

RocketMQ順序消息

2021-10-03 21:41:13

RocketMQKafkaPulsar

2023-12-15 13:08:00

RocketMQ中間件消費順序

2023-08-09 08:01:00

WebSockett服務器web

2024-02-04 09:02:29

RocketMQ項目處理器

2024-04-25 14:27:32

順序消息事務消息

2022-08-09 08:31:29

RocketMQ消息中間件

2024-11-11 00:00:10

2021-03-26 08:16:32

SpringbootWebsocket前端

2020-09-08 07:37:44

springBoot MQ rabbitMQ

2022-07-04 11:06:02

RocketMQ事務消息實現

2023-04-12 08:56:37

RocketMQSpring核心業務
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 日本精品一区二区三区视频 | 久久久欧洲| 国产欧美综合在线 | 欧美精品久久久久 | 国产视频2021 | 国产精品免费在线 | 亚洲综合大片69999 | 国产精品不卡一区 | 五月婷婷激情网 | 欧美久久久久 | 99精品电影| 精品美女久久久久久免费 | 亚洲视频在线观看免费 | 国产精品二区三区在线观看 | 久久久久久国产精品 | 欧美一级做a爰片免费视频 国产美女特级嫩嫩嫩bbb片 | 亚洲日韩中文字幕 | 国产日韩欧美激情 | 午夜日韩精品 | 性天堂网 | 国产成人精品网站 | 亚洲一区 中文字幕 | 日韩久草| 九九久久这里只有精品 | 国产综合av | 九九久久精品 | 青青久草 | 99久久婷婷国产综合精品首页 | 亚洲精品一区二三区不卡 | 国产精品久久久久久二区 | 国产精品久久久久久久久久久久冷 | 久久久久99 | 欧美综合一区二区三区 | 国产美女在线播放 | 国产成人免费 | 午夜激情视频在线 | 成年人网站免费视频 | 日韩精品免费看 | 国产视频精品在线观看 | 国产欧美精品 | 97国产精品视频人人做人人爱 |