SpringBoot與Axon Framework整合,實現事件溯源驅動的分布式業務系統
Axon Framework 是一個用于構建復雜分布式系統的開源框架,特別適用于實現事件溯源(Event Sourcing)和命令查詢責任分離(CQRS)模式,提供強大的工具來簡化事件驅動架構的開發。
選擇Axon Framework的理由
1. 事件溯源(Event Sourcing)
- 數據完整性: 事件溯源通過記錄每個業務操作的變化事件來保持數據的完整性和一致性。這對于金融系統尤為重要,因為它需要精確跟蹤每一筆交易的歷史記錄。
- 審計和合規性: 銀行業務對審計和合規性有嚴格的要求。事件溯源可以幫助我們輕松地重建歷史狀態,并提供詳細的變更日志。
2. CQRS 模式(Command Query Responsibility Segregation)
- 分離讀寫操作: CQRS 將讀操作和寫操作分開,使得系統可以在不同的優化方向上獨立發展。這有助于提高系統的性能和可擴展性。
- 靈活的設計: 分離讀寫邏輯可以簡化復雜查詢的設計,同時允許使用不同類型的數據庫來滿足不同的性能需求。
3. 高性能和可擴展性
- 分布式架構: Axon 支持構建分布式的微服務架構,適用于大規模的應用場景。它可以處理高并發請求,并且易于水平擴展。
- 異步處理: Axon 提供了強大的異步命令處理機制,減少了事務的鎖定時間,提高了系統的吞吐量。
4. 豐富的生態系統
- 內置支持: Axon 框架提供了許多開箱即用的功能,如事件存儲、聚合管理、命令總線等,大大減少了開發工作量。
- 社區和支持: Axon 擁有一個活躍的開發者社區和技術文檔,便于解決在開發過程中遇到的問題。
5. 領域驅動設計(DDD)的支持
- 模型驅動: Axon 強調領域驅動設計,鼓勵將復雜的業務邏輯分解為小的、自治的聚合根,從而更好地反映真實的業務場景。
- 清晰的職責劃分: 通過使用 DDD 原則,我們可以確保每個模塊都有明確的職責,提高了代碼的可維護性和可理解性。
6. 安全性
- 細粒度控制: Axon 提供了細粒度的安全控制機制,可以根據不同的角色和權限執行不同的操作。
- 加密和認證: 結合 Spring Security 等安全框架,可以進一步增強系統的安全性,保護敏感信息。
應用案例
1. ING Bank
ING 銀行是最早采用 Axon Framework 的大型金融機構之一。他們利用 Axon 構建了多個分布式系統,包括支付處理、賬戶管理和風險評估等關鍵業務流程。
- 項目: ING 使用 Axon 來構建其下一代銀行平臺,實現了高可用性和可擴展性。
- 優勢: 通過事件溯源提高了數據一致性和審計能力。
2. KLM Royal Dutch Airlines
荷蘭皇家航空(KLM)使用 Axon Framework 來重構其核心預訂系統,以提高系統的靈活性和響應速度。
- 項目: KLM 通過 Axon 實現了訂單管理系統的現代化,支持復雜的業務規則和多渠道集成。
- 優勢: 增強了系統的可維護性和可擴展性。
3. Baloise Insurance Group
巴洛伊茲保險集團是一家瑞士保險公司,使用 Axon Framework 來改進其理賠處理系統。
- 項目: 巴洛伊茲利用 Axon 構建了一個靈活且可擴展的理賠處理平臺。
- 優勢: 提升了理賠處理的速度和準確性,并簡化了系統的維護工作。
4. Adyen
Adyen 是一家全球領先的支付服務提供商,使用 Axon Framework 來處理復雜的支付交易和結算流程。
- 項目: Adyen 利用 Axon 實現了一個高性能的支付處理引擎,支持實時交易處理。
- 優勢: 確保了交易的可靠性和一致性,提升了系統的性能。
5. Deutsche Bahn
德意志鐵路公司使用 Axon Framework 來優化其票務系統。
- 項目: 德意志鐵路利用 Axon 構建了一個現代化的票務平臺,支持在線購票和退票等功能。
- 優勢: 提高了系統的穩定性和用戶體驗。
6. Zalando SE
Zalando 是一家德國電商平臺,使用 Axon Framework 來構建其訂單管理系統。
- 項目: Zalando 利用 Axon 實現了一個高度可擴展的訂單管理系統,支持復雜的業務流程。
- 優勢: 提升了系統的響應能力和可維護性。
代碼實操
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
<relativePath/><!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>axon-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>axon-demo</name>
<description>Demo project for Spring Boot and Axon Framework with MySQL</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>4.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-test</artifactId>
<version>4.6.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
src/main/java/com/example/axondemo/aggregate/BankAccountAggregate.java
package com.example.axondemo.aggregate;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.spring.stereotype.Aggregate;
import org.axonframework.modelling.command.AggregateLifecycle;
@Slf4j
@Aggregate
public class BankAccountAggregate {
@AggregateIdentifier
private String accountId; // 賬戶ID,作為聚合根的標識符
private double balance; // 賬戶余額
public BankAccountAggregate() {} // 默認構造函數
// 處理創建賬戶命令
@CommandHandler
public BankAccountAggregate(com.example.axondemo.command.CreateBankAccountCommand command) {
if (command.getInitialDeposit() < 0) {
throw new IllegalArgumentException("初始存款必須為正數");
}
log.info("處理創建賬戶命令,賬戶ID: {}", command.getAccountId());
// 應用事件來更改狀態
AggregateLifecycle.apply(new com.example.axondemo.event.BankAccountCreatedEvent(command.getAccountId(), command.getInitialDeposit()));
}
// 處理存款命令
@CommandHandler
public void handle(com.example.axondemo.command.DepositMoneyCommand command) {
if (command.getAmount() <= 0) {
throw new IllegalArgumentException("存款金額必須為正數");
}
log.info("處理存款命令,賬戶ID: {}", command.getAccountId());
// 應用事件來更改狀態
AggregateLifecycle.apply(new com.example.axondemo.event.MoneyDepositedEvent(command.getAccountId(), command.getAmount()));
}
// 處理取款命令
@CommandHandler
public void handle(com.example.axondemo.command.WithdrawMoneyCommand command) {
if (command.getAmount() > balance || command.getAmount() <= 0) {
throw new IllegalArgumentException("無效的取款金額");
}
log.info("處理取款命令,賬戶ID: {}", command.getAccountId());
// 應用事件來更改狀態
AggregateLifecycle.apply(new com.example.axondemo.event.MoneyWithdrewEvent(command.getAccountId(), command.getAmount()));
}
// 處理賬戶創建事件
@EventSourcingHandler
protected void on(com.example.axondemo.event.BankAccountCreatedEvent event) {
this.accountId = event.getAccountId();
this.balance = event.getInitialDeposit();
log.info("應用賬戶創建事件,賬戶ID: {}", event.getAccountId());
}
// 處理存款事件
@EventSourcingHandler
protected void on(com.example.axondemo.event.MoneyDepositedEvent event) {
this.balance += event.getAmount();
log.info("應用存款事件,賬戶ID: {}, 金額: {}", event.getAccountId(), event.getAmount());
}
// 處理取款事件
@EventSourcingHandler
protected void on(com.example.axondemo.event.MoneyWithdrewEvent event) {
this.balance -= event.getAmount();
log.info("應用取款事件,賬戶ID: {}, 金額: {}", event.getAccountId(), event.getAmount());
}
}
src/main/java/com/example/axondemo/command/CreateBankAccountCommand.java
package com.example.axondemo.command;
import lombok.Builder;
import lombok.Data;
import org.axonframework.modelling.command.TargetAggregateIdentifier;
@Data
@Builder
public class CreateBankAccountCommand {
@TargetAggregateIdentifier // 標記目標聚合根的標識符
private final String accountId; // 賬戶ID
private final double initialDeposit; // 初始存款
}
src/main/java/com/example/axondemo/command/DepositMoneyCommand.java
package com.example.axondemo.command;
import lombok.Builder;
import lombok.Data;
import org.axonframework.modelling.command.TargetAggregateIdentifier;
@Data
@Builder
public class DepositMoneyCommand {
@TargetAggregateIdentifier // 標記目標聚合根的標識符
private final String accountId; // 賬戶ID
private final double amount; // 存款金額
}
src/main/java/com/example/axondemo/command/WithdrawMoneyCommand.java
package com.example.axondemo.command;
import lombok.Builder;
import lombok.Data;
import org.axonframework.modelling.command.TargetAggregateIdentifier;
@Data
@Builder
public class WithdrawMoneyCommand {
@TargetAggregateIdentifier // 標記目標聚合根的標識符
private final String accountId; // 賬戶ID
private final double amount; // 取款金額
}
src/main/java/com/example/axondemo/controller/AccountController.java
package com.example.axondemo.controller;
import com.example.axondemo.command.*;
import com.example.axondemo.dto.CreateBankAccountRequest;
import com.example.axondemo.dto.DepositRequest;
import com.example.axondemo.dto.WithdrawRequest;
import com.example.axondemo.exception.InsufficientFundsException;
import com.example.axondemo.exception.InvalidAmountException;
import com.example.axondemo.repository.BankAccountRepository;
import lombok.RequiredArgsConstructor;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import javax.validation.Valid;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("/accounts")
@RequiredArgsConstructor
public class AccountController {
private final CommandGateway commandGateway; // 命令網關,用于發送命令
private final BankAccountRepository bankAccountRepository; // 銀行賬戶倉庫
// 創建賬戶
@PostMapping("/")
public ResponseEntity<String> createAccount(@Valid @RequestBody CreateBankAccountRequest request) {
String accountId = UUID.randomUUID().toString(); // 生成唯一的賬戶ID
CompletableFuture<Object> future = commandGateway.send(
CreateBankAccountCommand.builder()
.accountId(accountId)
.initialDeposit(request.getInitialDeposit())
.build()
);
return future.thenApply(response -> ResponseEntity.ok(accountId)) // 成功時返回賬戶ID
.exceptionally(ex -> ResponseEntity.badRequest().body(ex.getMessage())) // 失敗時返回錯誤信息
.join();
}
// 存款
@PostMapping("/{accountId}/deposit")
public ResponseEntity<Void> deposit(@PathVariable String accountId, @Valid @RequestBody DepositRequest request) {
CompletableFuture<Object> future = commandGateway.send(
DepositMoneyCommand.builder()
.accountId(accountId)
.amount(request.getAmount())
.build()
);
return future.thenApply(response -> ResponseEntity.ok().<Void>build()) // 成功時返回200 OK
.exceptionally(ex -> ResponseEntity.badRequest().body(null)) // 失敗時返回400 Bad Request
.join();
}
// 取款
@PostMapping("/{accountId}/withdraw")
public ResponseEntity<Void> withdraw(@PathVariable String accountId, @Valid @RequestBody WithdrawRequest request) {
CompletableFuture<Object> future = commandGateway.send(
WithdrawMoneyCommand.builder()
.accountId(accountId)
.amount(request.getAmount())
.build()
);
return future.thenApply(response -> ResponseEntity.ok().<Void>build()) // 成功時返回200 OK
.exceptionally(ex -> ResponseEntity.badRequest().body(null)) // 失敗時返回400 Bad Request
.join();
}
// 查詢賬戶余額
@GetMapping("/{accountId}/balance")
public ResponseEntity<Double> getBalance(@PathVariable String accountId) {
Double balance = bankAccountRepository.findById(accountId).map(it -> it.getBalance()).orElse(0.0); // 獲取賬戶余額
return ResponseEntity.ok(balance); // 返回賬戶余額
}
}
src/main/java/com/example/axondemo/dto/CreateBankAccountRequest.java
package com.example.axondemo.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.NotNull;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CreateBankAccountRequest {
@NotNull(message = "初始存款不能為空") // 驗證初始存款不為空
@DecimalMin(value = "0", message = "初始存款必須非負") // 驗證初始存款非負
private double initialDeposit; // 初始存款
}
src/main/java/com/example/axondemo/dto/DepositRequest.java
package com.example.axondemo.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.NotNull;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DepositRequest {
@NotNull(message = "金額不能為空") // 驗證金額不為空
@DecimalMin(value = "0", message = "金額必須非負") // 驗證金額非負
private double amount; // 存款金額
}
src/main/java/com/example/axondemo/dto/WithdrawRequest.java
package com.example.axondemo.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.NotNull;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WithdrawRequest {
@NotNull(message = "金額不能為空") // 驗證金額不為空
@DecimalMin(value = "0", message = "金額必須非負") // 驗證金額非負
private double amount; // 取款金額
}
src/main/java/com/example/axondemo/event/BankAccountCreatedEvent.java
賬戶創建: 通過事件 BankAccountCreatedEvent 記錄賬戶的初始狀態。
package com.example.axondemo.event;
import lombok.Builder;
import lombok.Data;
import org.axonframework.serialization.Revision;
@Data
@Builder
@Revision("1")
public class BankAccountCreatedEvent {
private final String accountId; // 賬戶ID
private final double initialDeposit; // 初始存款
}
src/main/java/com/example/axondemo/event/MoneyDepositedEvent.java
存款和取款: 通過事件 MoneyDepositedEvent 和 MoneyWithdrewEvent 記錄每一次的資金變動。
package com.example.axondemo.event;
import lombok.Builder;
import lombok.Data;
import org.axonframework.serialization.Revision;
@Data
@Builder
@Revision("1")
public class MoneyDepositedEvent {
private final String accountId; // 賬戶ID
private final double amount; // 存款金額
}
src/main/java/com/example/axondemo/event/MoneyWithdrewEvent.java
package com.example.axondemo.event;
import lombok.Builder;
import lombok.Data;
import org.axonframework.serialization.Revision;
@Data
@Builder
@Revision("1")
public class MoneyWithdrewEvent {
private final String accountId; // 賬戶ID
private final double amount; // 取款金額
}
src/main/java/com/example/axondemo/exception/InsufficientFundsException.java
package com.example.axondemo.exception;
public class InsufficientFundsException extends RuntimeException {
public InsufficientFundsException(String message) {
super(message);
}
}
src/main/java/com/example/axondemo/exception/InvalidAmountException.java
package com.example.axondemo.exception;
public class InvalidAmountException extends RuntimeException {
public InvalidAmountException(String message) {
super(message);
}
}
src/main/java/com/example/axondemo/exception/GlobalExceptionHandler.java
package com.example.axondemo.exception;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.MethodArgumentNotValidException;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import java.util.HashMap;
import java.util.Map;
@RestControllerAdvice
public class GlobalExceptionHandler {
// 處理驗證異常
@ResponseStatus(HttpStatus.BAD_REQUEST)
@ExceptionHandler(MethodArgumentNotValidException.class)
public Map<String, String> handleValidationExceptions(MethodArgumentNotValidException ex) {
Map<String, String> errors = new HashMap<>();
ex.getBindingResult().getFieldErrors().forEach(error ->
errors.put(error.getField(), error.getDefaultMessage()));
return errors;
}
// 處理非法參數異常
@ExceptionHandler(IllegalArgumentException.class)
public ResponseEntity<String> handleIllegalArgumentException(IllegalArgumentException ex) {
return ResponseEntity.badRequest().body(ex.getMessage());
}
// 處理資金不足異常
@ExceptionHandler(InsufficientFundsException.class)
public ResponseEntity<String> handleInsufficientFundsException(InsufficientFundsException ex) {
return ResponseEntity.status(HttpStatus.CONFLICT).body(ex.getMessage());
}
// 處理解析金額異常
@ExceptionHandler(InvalidAmountException.class)
public ResponseEntity<String> handleInvalidAmountException(InvalidAmountException ex) {
return ResponseEntity.badRequest().body(ex.getMessage());
}
}
src/main/java/com/example/axondemo/projection/BankAccountProjection.java
余額查詢: 使用投影類 BankAccountProjection 將事件轉換為可供查詢的數據視圖。
package com.example.axondemo.projection;
import com.example.axondemo.event.BankAccountCreatedEvent;
import com.example.axondemo.event.MoneyDepositedEvent;
import com.example.axondemo.event.MoneyWithdrewEvent;
import com.example.axondemo.repository.BankAccountEntity;
import com.example.axondemo.repository.BankAccountRepository;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.eventhandling.EventHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class BankAccountProjection {
@Autowired
private BankAccountRepository bankAccountRepository; // 銀行賬戶倉庫
// 處理賬戶創建事件
@EventHandler
public void on(BankAccountCreatedEvent event) {
BankAccountEntity bankAccountEntity = BankAccountEntity.builder()
.accountId(event.getAccountId())
.balance(event.getInitialDeposit())
.build();
bankAccountRepository.save(bankAccountEntity);
log.info("投影賬戶創建事件,賬戶ID: {}", event.getAccountId());
}
// 處理存款事件
@EventHandler
public void on(MoneyDepositedEvent event) {
bankAccountRepository.findById(event.getAccountId())
.ifPresentOrElse(
bankAccountEntity -> {
bankAccountEntity.setBalance(bankAccountEntity.getBalance() + event.getAmount());
bankAccountRepository.save(bankAccountEntity);
log.info("投影存款事件,賬戶ID: {}, 金額: {}", event.getAccountId(), event.getAmount());
},
() -> log.error("未找到賬戶ID: {}", event.getAccountId())
);
}
// 處理取款事件
@EventHandler
public void on(MoneyWithdrewEvent event) {
bankAccountRepository.findById(event.getAccountId())
.ifPresentOrElse(
bankAccountEntity -> {
bankAccountEntity.setBalance(bankAccountEntity.getBalance() - event.getAmount());
bankAccountRepository.save(bankAccountEntity);
log.info("投影取款事件,賬戶ID: {}, 金額: {}", event.getAccountId(), event.getAmount());
},
() -> log.error("未找到賬戶ID: {}", event.getAccountId())
);
}
}
src/main/java/com/example/axondemo/repository/BankAccountEntity.java
package com.example.axondemo.repository;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.Entity;
import javax.persistence.Id;
@Entity
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class BankAccountEntity {
@Id
private String accountId; // 賬戶ID
private double balance; // 賬戶余額
}
src/main/java/com/example/axondemo/repository/BankAccountRepository.java
package com.example.axondemo.repository;
import org.springframework.data.jpa.repository.JpaRepository;
public interface BankAccountRepository extends JpaRepository<BankAccountEntity, String> {
}
src/main/resources/application.yml
server:
port: 8080
spring:
datasource:
url: jdbc:mysql://localhost:3306/banktest?useSSL=false&serverTimezone=UTC
username: root
password: 12345678
jpa:
hibernate:
ddl-auto: update
show-sql: true
logging:
level:
org.axonframework: INFO
src/main/java/com/example/axondemo/AxonDemoApplication.java
package com.example.axondemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class AxonDemoApplication {
public static void main(String[] args) {
SpringApplication.run(AxonDemoApplication.class, args);
}
}
測試
創建賬戶
- URL: http://localhost:8080/accounts/
- Method: POST
- Headers:
a.Content-Type: application/json
- Body (raw, JSON):
{
"initialDeposit": 100
}
- Response Body:
9f4c1b8e-2a0f-4e5f-b2f2-f8f1e5f1e5f1
存款
- URL: http://localhost:8080/accounts/9f4c1b8e-2a0f-4e5f-b2f2-f8f1e5f1e5f1/deposit
- Method: POST
- Headers:
- Content-Type: application/json
- Body (raw, JSON):
{
"amount": 50
}
- Status Code: 200 OK
- Response Body: (空)
取款
- URL: http://localhost:8080/accounts/9f4c1b8e-2a0f-4e5f-b2f2-f8f1e5f1e5f1/withdraw
- Method: POST
- Headers:
a.Content-Type: application/json
- Body (raw, JSON):
{
"amount": 30
}
- Status Code: 200 OK
- Response Body: (空)
查詢賬戶余額
- URL: http://localhost:8080/accounts/9f4c1b8e-2a0f-4e5f-b2f2-f8f1e5f1e5f1/balance
- Method: GET
- Status Code: 200 OK
- Response Body:
120.0