SpringBoot分布式事務之可靠消息最終一致性
環(huán)境:springboot2.3.9 + RocketMQ4.8.0
可靠消息最終一致性原理
- 執(zhí)行流程
- Producer發(fā)送Prepare message到broker。
- Prepare Message發(fā)送成功后開始執(zhí)行本地事務。
- 如果本地事務執(zhí)行成功的話則返回commit,如果執(zhí)行失敗則返回rollback。(這個是在事務消息的回調方法里由開發(fā)者自己決定commit or rollback)
- Producer發(fā)送上一步的commit還是rollback到broker,這里有以下兩種情況:
1、如果broker收到了commit/rollback消息 :
如果收到了commit,則broker認為整個事務是沒問題的,執(zhí)行成功的。那么會下發(fā)消息給Consumer端消費。
如果收到了rollback,則broker認為本地事務執(zhí)行失敗了,broker將會刪除Half Message,不下發(fā)給Consumer端。
2、如果broker未收到消息(如果執(zhí)行本地事務突然宕機了,相當執(zhí)行本地事務(executeLocalTransaction)執(zhí)行結果返回unknow,則和broker未收到確認消息的情況一樣處理。):
broker會定時回查本地事務的執(zhí)行結果:如果回查結果是本地事務已經執(zhí)行則返回commit,若未執(zhí)行,則返回unknow。
Producer端回查的結果發(fā)送給Broker。Broker接收到的如果是commit,則broker視為整個事務執(zhí)行成功,如果是rollback,則broker視為本地事務執(zhí)行失敗,broker刪除Half Message,不下發(fā)給consumer。如果broker未接收到回查的結果(或者查到的是unknow),則broker會定時進行重復回查,以確保查到最終的事務結果。重復回查的時間間隔和次數都可配。
工程結構
圖片
建立父子工程,兩個子項目account-manager,integral-manager。
依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
Account子模塊
- 配置文件
server:
port: 8081
---
rocketmq:
nameServer: localhost:9876
producer:
group: pack-mq
---
spring:
jpa:
generateDdl: false
hibernate:
ddlAuto: update
openInView: true
show-sql: true
---
spring:
datasource:
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/account?serverTimeznotallow=GMT%2B8
username: root
password: ******
type: com.zaxxer.hikari.HikariDataSource
hikari:
minimumIdle: 10
maximumPoolSize: 200
autoCommit: true
idleTimeout: 30000
poolName: MasterDatabookHikariCP
maxLifetime: 1800000
connectionTimeout: 30000
connectionTestQuery: SELECT 1
- 業(yè)務實體類
// 用戶表
@Entity
@Table(name = "t_account")
public class Account {
@Id
private Long id;
private String name ;
}
// 業(yè)務記錄表(用來查詢去重)
@Entity
@Table(name = "t_account_log")
public class AccountLog {
@Id
private Long txid;
private Date createTime ;
}
- DAO相關類
public interface AccountRepository extends JpaRepository<Account, Long> {
}
public interface AccountLogRepository extends JpaRepository<AccountLog, Long> {
}
- Service相關類
@Resource
private AccountRepository accountRepository ;
@Resource
private AccountLogRepository accountLogRepository ;
// 該方法保存業(yè)務數據,同時保存操作記錄;操作記錄用來回查。
@Transactional
public boolean register(Account account) {
accountRepository.save(account) ;
AccountLog accountLog = new AccountLog(account.getId(), new Date()) ;
accountLogRepository.save(accountLog) ;
return true ;
}
public AccountLog existsTxId(Long txid) {
return accountLogRepository.findById(txid).orElse(null) ;
}
- 發(fā)送消息方法
@Resource
private RocketMQTemplate rocketMQTemplate ;
public String sendTx(String topic, String tags, Account account) {
String uuid = UUID.randomUUID().toString().replaceAll("-", "") ;
TransactionSendResult result =rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload(account).
setHeader("tx_id", uuid).build(), uuid) ;
return result.getSendStatus().name() ;
}
- 消息監(jiān)聽(生產者監(jiān)聽)
@RocketMQTransactionListener
public class ProducerMessageListener implements RocketMQLocalTransactionListener {
@Resource
private AccountService accountService ;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ;
accountService.register(account) ;
} catch (Exception e) {
e.printStackTrace() ;
return RocketMQLocalTransactionState.ROLLBACK ;
}
return RocketMQLocalTransactionState.COMMIT ;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 這里檢查本地事務是否執(zhí)行成功
try {
Account account = new JsonMapper().readValue((byte[])msg.getPayload(), Account.class) ;
System.out.println("執(zhí)行查詢ID為:" + account.getId() + " 的數據是否存在") ;
AccountLog accountLog = accountService.existsTxId(account.getId()) ;
if (accountLog == null) {
return RocketMQLocalTransactionState.UNKNOWN ;
}
} catch (Exception e) {
e.printStackTrace() ;
return RocketMQLocalTransactionState.UNKNOWN ;
}
return RocketMQLocalTransactionState.COMMIT ;
}
}
- Controller接口
@RestController
@RequestMapping("/accounts")
public class AccountController {
@Resource
private ProducerMessageService messageService ;
@PostMapping("/send")
public Object sendMessage(@RequestBody Account account) {
return messageService.sendTx("tx-topic", "mks", account) ;
}
}
Integral子模塊
- 業(yè)務實體類
@Entity
@Table(name = "t_integral")
public class Integral {
@Id
private Long id;
private Integer score ;
private Long acccountId ;
}
- DAO相關類
public interface IntegralRepository extends JpaRepository<Integral, Long> {
}
- Service相關類
@Resource
private IntegralRepository integralRepository ;
@Transactional
public Integral saveIntegral(Integral integral) {
return integralRepository.save(integral) ;
}
- 消息監(jiān)聽
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "mks")
@Component
public class IntegralMessageListener implements RocketMQListener<String> {
@Resource
private IntegralService integralService ;
@SuppressWarnings("unchecked")
@Override
public void onMessage(String message) {
System.out.println("Integral接收到消息:" + message) ;
try {
Map<String, Object> jsonMap = new JsonMapper().readValue(message, Map.class) ;
Integer id = (Integer) jsonMap.get("id") ;
integralService.saveIntegral(new Integral(1L, 1000, id + 0L)) ;
} catch (Exception e) {
throw new RuntimeException(e) ;
}
}
}
測試
分別啟動兩個子模塊
- 初始數據表
圖片
- Postman測試
圖片
Account模塊
圖片
Integral模塊
圖片
當子模塊Account執(zhí)行本地事務發(fā)生錯誤時,事務會回滾并且刪除消息。子模塊Integral并不會收到消息。