SpringBoot分布式事務之最大努力通知
環境:springboot.2.4.9 + RabbitMQ3.7.4
什么是最大努力通知
這是一個充值的案例
交互流程 :
1、賬戶系統調用充值系統接口。
2、充值系統完成支付向賬戶系統發起充值結果通知 若通知失敗,則充值系統按策略進行重復通知。
3、賬戶系統接收到充值結果通知修改充值狀態。
4、賬戶系統未接收到通知會主動調用充值系統的接口查詢充值結果。
通過上邊的例子我們總結最大努力通知方案的目標 : 發起通知方通過一定的機制最大努力將業務處理結果通知到接收方。 具體包括 :
1、有一定的消息重復通知機制。 因為接收通知方可能沒有接收到通知,此時要有一定的機制對消息重復通知。
2、消息校對機制。 如果盡最大努力也沒有通知到接收方,或者接收方消費消息后要再次消費,此時可由接收方主動向通知方查詢消息信息來滿足需求。
最大努力通知與可靠消息一致性有什么不同?
1、解決方案思想不同 可靠消息一致性,發起通知方需要保證將消息發出去,并且將消息發到接收通知方,消息的可靠性關鍵由發起通知方來保證。 最大努力通知,發起通知方盡最大的努力將業務處理結果通知為接收通知方,但是可能消息接收不到,此時需要接收通知方主動調用發起通知方的接口查詢業務處理結果,通知的可靠性關鍵在接收通知方。
2、兩者的業務應用場景不同 可靠消息一致性關注的是交易過程的事務一致,以異步的方式完成交易。 最大努力通知關注的是交易后的通知事務,即將交易結果可靠的通知出去。
3、技術解決方向不同 可靠消息一致性要解決消息從發出到接收的一致性,即消息發出并且被接收到。 最大努力通知無法保證消息從發出到接收的一致性,只提供消息接收的可靠性機制。可靠機制是,最大努力地將消息通知給接收方,當消息無法被接收方接收時,由接收方主動查詢消費。
通過RabbitMQ實現最大努力通知
關于RabbitMQ相關文章《SpringBoot RabbitMQ消息可靠發送與接收 》,《RabbitMQ消息確認機制confirm 》。
項目結構
兩個子模塊users-mananger(賬戶模塊),pay-manager(支付模塊)
依賴
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-jpa</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <scope>runtime</scope>
- </dependency>
子模塊pay-manager
配置文件
- server:
- port: 8080
- ---
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: guest
- password: guest
- virtual-host: /
- publisherConfirmType: correlated
- publisherReturns: true
- listener:
- simple:
- concurrency: 5
- maxConcurrency: 10
- prefetch: 5
- acknowledgeMode: MANUAL
- retry:
- enabled: true
- initialInterval: 3000
- maxAttempts: 3
- defaultRequeueRejected: false
實體類
記錄充值金額及賬戶信息
- @Entity
- @Table(name = "t_pay_info")
- public class PayInfo implements Serializable{
- @Id
- private Long id;
- private BigDecimal money ;
- private Long accountId ;
- }
DAO及Service
- public interface PayInfoRepository extends JpaRepository<PayInfo, Long> {
- PayInfo findByOrderId(String orderId) ;
- }
- @Service
- public class PayInfoService {
- @Resource
- private PayInfoRepository payInfoRepository ;
- @Resource
- private RabbitTemplate rabbitTemplate ;
- // 數據保存完后發送消息(這里發送消息可以應用確認模式或事物模式)
- @Transactional
- public PayInfo savePayInfo(PayInfo payInfo) {
- payInfo.setId(System.currentTimeMillis()) ;
- PayInfo result = payInfoRepository.save(payInfo) ;
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replaceAll("-", "")) ;
- try {
- rabbitTemplate.convertAndSend("pay-exchange", "pay.#", new ObjectMapper().writeValueAsString(payInfo), correlationData) ;
- } catch (AmqpException | JsonProcessingException e) {
- e.printStackTrace();
- }
- return result ;
- }
- public PayInfo queryByOrderId(String orderId) {
- return payInfoRepository.findByOrderId(orderId) ;
- }
- }
支付完成后發送消息。
Controller接口
- @RestController
- @RequestMapping("/payInfos")
- public class PayInfoController {
- @Resource
- private PayInfoService payInfoService ;
- // 支付接口
- @PostMapping("/pay")
- public Object pay(@RequestBody PayInfo payInfo) {
- payInfoService.savePayInfo(payInfo) ;
- return "支付已提交,等待結果" ;
- }
- @GetMapping("/queryPay")
- public Object queryPay(String orderId) {
- return payInfoService.queryByOrderId(orderId) ;
- }
- }
子模塊users-manager
應用配置
- server:
- port: 8081
- ---
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: guest
- password: guest
- virtual-host: /
- publisherConfirmType: correlated
- publisherReturns: true
- listener:
- simple:
- concurrency: 5
- maxConcurrency: 10
- prefetch: 5
- acknowledgeMode: MANUAL
- retry:
- enabled: true
- initialInterval: 3000
- maxAttempts: 3
- defaultRequeueRejected: false
實體類
- @Entity
- @Table(name = "t_users")
- public class Users {
- @Id
- private Long id;
- private String name ;
- private BigDecimal money ;
- }
賬戶信息表
- @Entity
- @Table(name = "t_users_log")
- public class UsersLog {
- @Id
- private Long id;
- private String orderId ;
- // 0: 支付中,1:已支付,2:已取消
- @Column(columnDefinition = "int default 0")
- private Integer status = 0 ;
- private BigDecimal money ;
- private Date createTime ;
- }
賬戶充值記錄表(去重)
DAO及Service
- public interface UsersRepository extends JpaRepository<Users, Long> {
- }
- public interface UsersLogRepository extends JpaRepository<UsersLog, Long> {
- UsersLog findByOrderId(String orderId) ;
- }
Service類
- @Service
- public class UsersService {
- @Resource
- private UsersRepository usersRepository ;
- @Resource
- private UsersLogRepository usersLogRepository ;
- @Transactional
- public boolean updateMoneyAndLogStatus(Long id, String orderId) {
- UsersLog usersLog = usersLogRepository.findByOrderId(orderId) ;
- if (usersLog != null && 1 == usersLog.getStatus()) {
- throw new RuntimeException("已支付") ;
- }
- Users users = usersRepository.findById(id).orElse(null) ;
- if (users == null) {
- throw new RuntimeException("賬戶不存在") ;
- }
- users.setMoney(users.getMoney().add(usersLog.getMoney())) ;
- usersRepository.save(users) ;
- usersLog.setStatus(1) ;
- usersLogRepository.save(usersLog) ;
- return true ;
- }
- @Transactional
- public boolean saveLog(UsersLog usersLog) {
- usersLog.setId(System.currentTimeMillis()) ;
- usersLogRepository.save(usersLog) ;
- return true ;
- }
- }
消息監聽
- @Component
- public class PayMessageListener {
- private static final Logger logger = LoggerFactory.getLogger(PayMessageListener.class) ;
- @Resource
- private UsersService usersService ;
- @SuppressWarnings("unchecked")
- @RabbitListener(queues = {"pay-queue"})
- @RabbitHandler
- public void receive(Message message, Channel channel) {
- long deliveryTag = message.getMessageProperties().getDeliveryTag() ;
- byte[] buf = null ;
- try {
- buf = message.getBody() ;
- logger.info("接受到消息:{}", new String(buf, "UTF-8")) ;
- Map<String, Object> result = new JsonMapper().readValue(buf, Map.class) ;
- Long id = ((Integer) result.get("accountId")) + 0L ;
- String orderId = (String) result.get("orderId") ;
- usersService.updateMoneyAndLogStatus(id, orderId) ;
- channel.basicAck(deliveryTag, true) ;
- } catch (Exception e) {
- logger.error("消息接受出現異常:{}, 異常消息:{}", e.getMessage(), new String(buf, Charset.forName("UTF-8"))) ;
- e.printStackTrace() ;
- try {
- // 應該將這類異常的消息放入死信隊列中,以便人工排查。
- channel.basicReject(deliveryTag, false);
- } catch (IOException e1) {
- logger.error("拒絕消息重入隊列異常:{}", e1.getMessage()) ;
- e1.printStackTrace();
- }
- }
- }
- }
Controller接口
- @RestController
- @RequestMapping("/users")
- public class UsersController {
- @Resource
- private RestTemplate restTemplate ;
- @Resource
- private UsersService usersService ;
- @PostMapping("/pay")
- public Object pay(Long id, BigDecimal money) throws Exception {
- HttpHeaders headers = new HttpHeaders() ;
- headers.setContentType(MediaType.APPLICATION_JSON) ;
- String orderId = UUID.randomUUID().toString().replaceAll("-", "") ;
- Map<String, String> params = new HashMap<>() ;
- params.put("accountId", String.valueOf(id)) ;
- params.put("orderId", orderId) ;
- params.put("money", money.toString()) ;
- UsersLog usersLog = new UsersLog() ;
- usersLog.setCreateTime(new Date()) ;
- usersLog.setOrderId(orderId);
- usersLog.setMoney(money) ;
- usersLog.setStatus(0) ;
- usersService.saveLog(usersLog) ;
- HttpEntity<String> requestEntity = new HttpEntity<String>(new ObjectMapper().writeValueAsString(params), headers) ;
- return restTemplate.postForObject("http://localhost:8080/payInfos/pay", requestEntity, String.class) ;
- }
- }
以上是兩個子模塊的所有代碼了
測試
初始數據
賬戶子模塊控制臺
支付子模塊控制臺
數據表數據
完畢!!!