成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

SpringBoot結合RabbitMQ實現分布式事務之最大努力通知

開發 前端
如果盡最大努力也沒有通知到接收方,或者接收方消費消息后要再次消費,此時可由接收方主動向通知方查詢消息信息來滿足需求。

環境:springboot.2.4.12 + RabbitMQ3.7.4

什么是最大努力通知

這是一個充值的案例

圖片圖片

交互流程 :

1、賬戶系統調用充值系統接口。

2、充值系統完成支付向賬戶系統發起充值結果通知 若通知失敗,則充值系統按策略進行重復通知。

3、賬戶系統接收到充值結果通知修改充值狀態。

4、賬戶系統未接收到通知會主動調用充值系統的接口查詢充值結果。通過上邊的例子我們總結最大努力通知方案的目標 :目標 :發起通知方通過一定的機制最大努力將業務處理結果通知到接收方。

具體包括 :

1、有一定的消息重復通知機制。因為接收通知方可能沒有接收到通知,此時要有一定的機制對消息重復通知。

2、消息校對機制。如果盡最大努力也沒有通知到接收方,或者接收方消費消息后要再次消費,此時可由接收方主動向通知方查詢消息信息來滿足需求。

最大努力通知與可靠消息一致性有什么不同?

1、解決方案思想不同 可靠消息一致性,發起通知方需要保證將消息發出去,并且將消息發到接收通知方,消息的可靠性關鍵由發起通知方來保證。最大努力通知,發起通知方盡最大的努力將業務處理結果通知為接收通知方,但是可能消息接收不到,此時需要接收通知方主動調用發起通知方的接口查詢業務處理結果,通知的可靠性關鍵在接收通知方。

2、兩者的業務應用場景不同 可靠消息一致性關注的是交易過程的事務一致,以異步的方式完成交易。最大努力通知關注的是交易后的通知事務,即將交易結果可靠的通知出去。

3、技術解決方向不同 可靠消息一致性要解決消息從發出到接收的一致性,即消息發出并且被接收到。最大努力通知無法保證消息從發出到接收的一致性,只提供消息接收的可靠性機制。可靠機制是,最大努力地將消息通知給接收方,當消息無法被接收方接收時,由接收方主動查詢消費。

通過RabbitMQ實現最大努力通知

關于RabbitMQ相關文章《SpringBoot RabbitMQ消息可靠發送與接收 》,《RabbitMQ消息確認機制confirm 》。

  • 項目結構

圖片圖片

兩個子模塊users-mananger(賬戶模塊),pay-manager(支付模塊)

  • 依賴
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <scope>runtime</scope>
</dependency>

子模塊pay-manager

  • 配置文件
server:
  port: 8080
---
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisherConfirmType: correlated
    publisherReturns: true
    listener:
      simple:
        concurrency: 5
        maxConcurrency: 10
        prefetch: 5
        acknowledgeMode: MANUAL
        retry:
          enabled: true
          initialInterval: 3000
          maxAttempts: 3
        defaultRequeueRejected: false
  • 實體類

記錄充值金額及賬戶信息

@Entity
@Table(name = "t_pay_info")
public class PayInfo implements Serializable{
 @Id
 private Long id;
 private BigDecimal money ;
 private Long accountId ;
}
  • DAO及Service
public interface PayInfoRepository extends JpaRepository<PayInfo, Long> {
 PayInfo findByOrderId(String orderId) ;
}
@Service
public class PayInfoService {
  
  @Resource
  private PayInfoRepository payInfoRepository ;
  @Resource
  private RabbitTemplate rabbitTemplate ;
  
  // 數據保存完后發送消息(這里發送消息可以應用確認模式或事物模式)
  @Transactional
  public PayInfo savePayInfo(PayInfo payInfo) {
    payInfo.setId(System.currentTimeMillis()) ;
    PayInfo result = payInfoRepository.save(payInfo) ;
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replaceAll("-", "")) ;
    try {
      rabbitTemplate.convertAndSend("pay-exchange", "pay.#", new ObjectMapper().writeValueAsString(payInfo), correlationData) ;
    } catch (AmqpException | JsonProcessingException e) {
      e.printStackTrace();
    }
    return result ;
  }
  
  public PayInfo queryByOrderId(String orderId) {
    return payInfoRepository.findByOrderId(orderId) ;
  }
  
}

支付完成后發送消息。

  • Controller接口
@RestController
@RequestMapping("/payInfos")
public class PayInfoController {
 @Resource
 private PayInfoService payInfoService ;
  
  // 支付接口
 @PostMapping("/pay")
 public Object pay(@RequestBody PayInfo payInfo) {
  payInfoService.savePayInfo(payInfo) ;
  return "支付已提交,等待結果" ;
 }
  
 @GetMapping("/queryPay")
 public Object queryPay(String orderId) {
  return payInfoService.queryByOrderId(orderId) ;
 }
  
}

子模塊users-manager

  • 應用配置
server:
  port: 8081
---
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisherConfirmType: correlated
    publisherReturns: true
    listener:
      simple:
        concurrency: 5
        maxConcurrency: 10
        prefetch: 5
        acknowledgeMode: MANUAL
        retry:
          enabled: true
          initialInterval: 3000
          maxAttempts: 3
        defaultRequeueRejected: false
  • 實體類
@Entity
@Table(name = "t_users")
public class Users {
 @Id
 private Long id;
 private String name ;
 private BigDecimal money ;
}

賬戶信息表

@Entity
@Table(name = "t_users_log")
public class UsersLog {
 @Id
 private Long id;
 private String orderId ;
 // 0:支付中,1:已支付,2:已取消
 @Column(columnDefinition = "int default 0")
 private Integer status = 0 ;
 private BigDecimal money ;
 private Date createTime ;
}

賬戶充值記錄表(去重)

  • DAO及Service
public interface UsersRepository extends JpaRepository<Users, Long> {
}
public interface UsersLogRepository extends JpaRepository<UsersLog, Long> {
 UsersLog findByOrderId(String orderId) ;
}
  • Service類
@Service
public class UsersService {  
  @Resource
  private UsersRepository usersRepository ;
  @Resource
  private UsersLogRepository usersLogRepository ;
  
 @Transactional
 public boolean updateMoneyAndLogStatus(Long id, String orderId) {
  UsersLog usersLog = usersLogRepository.findByOrderId(orderId) ;
  if (usersLog != null && 1 == usersLog.getStatus()) {
   throw new RuntimeException("已支付") ;
  }
  Users users = usersRepository.findById(id).orElse(null) ;
  if (users == null) {
   throw new RuntimeException("賬戶不存在") ;
  }
  users.setMoney(users.getMoney().add(usersLog.getMoney())) ;
  usersRepository.save(users) ;
  usersLog.setStatus(1) ;
  usersLogRepository.save(usersLog) ;
  return true ;
 }
  
 @Transactional
 public boolean saveLog(UsersLog usersLog) {
  usersLog.setId(System.currentTimeMillis()) ;
  usersLogRepository.save(usersLog) ;
  return true ;
 }
}
  • 消息監聽
@Component
public class PayMessageListener {
  
 private static final Logger logger = LoggerFactory.getLogger(PayMessageListener.class) ;
  
 @Resource
 private  UsersService usersService ;
  
 @SuppressWarnings("unchecked")
 @RabbitListener(queues = {"pay-queue"})
 @RabbitHandler
 public void receive(Message message, Channel channel) {
  long deliveryTag = message.getMessageProperties().getDeliveryTag() ;
  byte[] buf =  null ;
  try {
   buf = message.getBody() ;
   logger.info("接受到消息:{}", new String(buf, "UTF-8")) ;
   Map<String, Object> result = new JsonMapper().readValue(buf, Map.class) ;
   Long id = ((Integer) result.get("accountId")) + 0L ;
   String orderId = (String) result.get("orderId") ;
   usersService.updateMoneyAndLogStatus(id, orderId) ;
   channel.basicAck(deliveryTag, true) ;
  } catch (Exception e) {
   logger.error("消息接受出現異常:{}, 異常消息:{}", e.getMessage(), new String(buf, Charset.forName("UTF-8"))) ;
   e.printStackTrace() ;
   try {
    // 應該將這類異常的消息放入死信隊列中,以便人工排查。
    channel.basicReject(deliveryTag, false);
   } catch (IOException e1) {
    logger.error("拒絕消息重入隊列異常:{}", e1.getMessage()) ;
    e1.printStackTrace();
   }
  }
 }
}
  • Controller接口
@RestController
@RequestMapping("/users")
public class UsersController {
  
  @Resource
  private RestTemplate restTemplate ;
  @Resource
  private UsersService usersService ;
  
  @PostMapping("/pay")
  public Object pay(Long id, BigDecimal money) throws Exception {
    HttpHeaders headers = new HttpHeaders() ;
    headers.setContentType(MediaType.APPLICATION_JSON) ;
    String orderId = UUID.randomUUID().toString().replaceAll("-", "") ;
    Map<String, String> params = new HashMap<>() ;
    params.put("accountId", String.valueOf(id)) ;
    params.put("orderId", orderId) ;
    params.put("money", money.toString()) ;
    
    UsersLog usersLog = new UsersLog() ;
    usersLog.setCreateTime(new Date()) ;
    usersLog.setOrderId(orderId);
    usersLog.setMoney(money) ;
    usersLog.setStatus(0) ;
    usersService.saveLog(usersLog) ;
    HttpEntity<String> requestEntity = new HttpEntity<String>(new ObjectMapper().writeValueAsString(params), headers) ;
    return restTemplate.postForObject("http://localhost:8080/payInfos/pay", requestEntity, String.class) ;
  }
  
}

以上是兩個子模塊的所有代碼了

測試

初始數據

圖片圖片

圖片圖片

賬戶子模塊控制臺

圖片圖片

支付子模塊控制臺

圖片圖片

數據表數據

圖片圖片

完畢!!!

責任編輯:武曉燕 來源: Spring全家桶實戰案例源碼
相關推薦

2021-04-16 16:02:13

SpringBoot分布式最大努力通知

2024-06-11 13:50:43

2022-06-27 08:21:05

Seata分布式事務微服務

2022-06-14 10:47:00

分布式事務數據

2017-07-26 15:08:05

大數據分布式事務

2022-06-21 08:27:22

Seata分布式事務

2021-08-06 08:33:27

Springboot分布式Seata

2020-03-31 08:05:23

分布式開發技術

2019-10-10 09:16:34

Zookeeper架構分布式

2009-06-19 15:28:31

JDBC分布式事務

2025-06-11 08:01:06

2009-09-18 15:10:13

分布式事務LINQ TO SQL

2021-09-29 09:07:37

分布式架構系統

2023-01-13 07:39:07

2021-12-09 10:45:19

分布式事務框架

2023-01-06 09:19:12

Seata分布式事務

2025-05-13 02:10:00

2019-06-26 09:41:44

分布式事務微服務

2025-04-29 04:00:00

分布式事務事務消息

2025-05-15 08:05:00

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 天堂网色 | 日韩一区二区三区在线观看 | 天天射天天操天天干 | 视频在线观看亚洲 | 日本超碰 | www.亚洲| 国产一区二区三区在线免费观看 | 色呦呦在线 | 在线观看深夜视频 | 婷婷丁香在线视频 | 久久激情五月丁香伊人 | 亚洲免费观看视频 | www日本在线播放 | 日韩在线观看中文字幕 | 三级在线视频 | 欧美一卡二卡在线 | 欧美高清性xxxxhdvideosex | 韩国精品一区 | 韩日一区 | 日本在线中文 | 国产一区二区三区四区 | 亚洲一区国产精品 | 日本黄色片免费在线观看 | 天天艹天天干天天 | 日韩欧美在线一区 | 欧美一区二区在线观看 | 国产精品久久久亚洲 | 天天综合国产 | 人人种亚洲 | 在线免费黄色 | 国产精品a久久久久 | 国产成人精品一区二区三区视频 | 亚洲精品区 | 日本视频在线播放 | 亚洲国产精品久久久久秋霞不卡 | 成人午夜在线 | 九九伊人sl水蜜桃色推荐 | 嫩草一区二区三区 | 国产精品久久久久久久久久不蜜臀 | 久久国产精品亚洲 | 国产精品区二区三区日本 |