SpringBoot與Pulsar整合,實現金融交易場景下的消息有序性保障
作者:Java知識日歷
用戶發起一筆交易請求,系統需要將該請求發送到交易處理系統,并確保請求按順序被處理。而使用Pulsar的獨占訂閱模式確保交易請求按順序被單一消費者處理,避免亂序導致的賬務錯誤。
Apache Pulsar 設計用于大規模實時數據處理,支持多種消息傳遞模型(發布/訂閱、隊列等),并提供了強大的功能來確保消息的可靠性和性能。
優勢
1. 強大的消息模型
- 發布/訂閱 (Pub/Sub): 支持多個消費者同時從同一個主題接收消息,適合實時數據分析和通知系統。
- 獨占訂閱 (Exclusive Subscription): 確保只有一個消費者能夠消費某個分區的消息,從而保證消息的嚴格順序。
- 共享訂閱 (Shared Subscription): 多個消費者可以負載均衡地消費消息,提高吞吐量。
- 故障域感知路由: 根據地理位置和網絡拓撲優化消息路由,確保高效的數據傳輸。
2. 持久化與存儲
- 持久化消息: 所有消息都被持久化到磁盤,確保消息不會丟失。
- 分層存儲: 使用分層存儲策略,結合內存和磁盤存儲,提高讀寫效率。
- 自動清理: 定期清理過期或不再需要的消息,節省存儲空間。
3. 事務支持
- 事務消息: 支持事務性的消息發送和確認機制,確保數據一致性。
- 兩階段提交: 實現ACID特性,保證消息的一致性和可靠性。
4. 死信隊列
- 死信隊列 (Dead Letter Queue, DLQ): 對于無法成功處理的消息,將其放入死信隊列以便后續排查和處理。
- 重試機制: 在消息處理失敗時,進行一定次數的重試(默認最多3次),如果仍然失敗,則將消息放入死信隊列。
應用場景
用戶發起一筆交易請求,系統需要將該請求發送到交易處理系統,并確保請求按順序被處理。而使用Pulsar的獨占訂閱模式確保交易請求按順序被單一消費者處理,避免亂序導致的賬務錯誤。
啟動Pulsar:
bin/pulsar standalone
代碼實操
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Apache Pulsar Client -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.10.1</version>
</dependency>
<!-- Lombok for cleaner Java code -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- JUnit for testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置文件
在application.properties
文件中配置Pulsar的相關屬性:
# Pulsar broker URL
pulsar.service.url=pulsar://localhost:6650
# Topic name
pulsar.topic.name=finance-transaction-topic
# Dead letter topic name
pulsar.dead-letter.topic.name=dead-letter-topic
# Max redelivery count before sending to dead letter queue
pulsar.max.redeliver.count=3
服務類
創建一個服務類來處理生產和消費消息,包括事務消息和死信隊列的處理邏輯。
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.CompletableFuture;
@Service
@Slf4j
public class FinanceTransactionService {
@Value("${pulsar.service.url}")
private String serviceUrl;
@Value("${pulsar.topic.name}")
private String topicName;
@Value("${pulsar.dead-letter.topic.name}")
private String deadLetterTopicName;
@Value("${pulsar.max.redeliver.count}")
private int maxRedeliverCount;
private PulsarClient client;
private Producer<String> producer;
private Consumer<String> consumer;
/**
* 初始化Pulsar客戶端、生產者和消費者
*/
@PostConstruct
public void init() throws Exception {
// 初始化Pulsar客戶端
client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
// 創建生產者
producer = client.newProducer(Schema.STRING)
.topic(topicName)
.sendTimeout(0, java.util.concurrent.TimeUnit.SECONDS)
.enableBatching(false)
.create();
// 創建消費者
consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("finance-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
.maxDelayMs(60_000)
.minDelayMs(1_000)
.multiplier(2)
.build())
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliverCount)
.deadLetterTopic(deadLetterTopicName)
.build())
.subscribe();
// 開始消費消息
consumeMessages();
}
/**
* 關閉Pulsar客戶端、生產者和消費者
*/
@PreDestroy
public void close() throws Exception {
if (producer != null) {
producer.close();
}
if (consumer != null) {
consumer.close();
}
if (client != null) {
client.close();
}
}
/**
* 發送事務消息
*
* @param message 消息內容
* @return 消息ID的CompletableFuture對象
*/
public CompletableFuture<MessageId> sendTransactionalMessage(String message) {
return producer.sendAsync(message);
}
/**
* 消費消息并處理
*/
private void consumeMessages() {
new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Message<String> msg = consumer.receive();
log.info("Received message: {}", msg.getValue());
// 處理消息
boolean processSuccess = processMessage(msg.getValue());
if (processSuccess) {
// 確認消息
consumer.acknowledgeAsync(msg.getMessageId());
} else {
// 負確認消息,觸發重試機制
consumer.negativeAcknowledge(msg.getMessageId(), new CustomException("Processing failed"));
}
} catch (Exception e) {
log.error("Error processing message", e);
}
}
}).start();
}
/**
* 模擬消息處理邏輯
*
* @param message 消息內容
* @return 處理是否成功
*/
private boolean processMessage(String message) {
// 模擬消息處理邏輯
// 對于每三條消息中的一條模擬處理失敗
long messageId = Long.parseLong(message.split(":")[1]);
return messageId % 3 != 0;
}
static class CustomException extends Exception {
public CustomException(String message) {
super(message);
}
}
// Getter methods for configuration properties (for testing purposes)
public String getServiceUrl() {
return serviceUrl;
}
public String getTopicName() {
return topicName;
}
public String getDeadLetterTopicName() {
return deadLetterTopicName;
}
public int getMaxRedeliverCount() {
return maxRedeliverCount;
}
}
控制器類
創建一個控制器類來暴露API端點用于發送消息。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ExecutionException;
@RestController
public class FinanceTransactionController {
@Autowired
private FinanceTransactionService financeTransactionService;
/**
* 發送消息到Pulsar主題
*
* @param message 消息內容
* @return 發送結果
*/
@PostMapping("/send-message")
public String sendMessage(@RequestParam String message) {
try {
financeTransactionService.sendTransactionalMessage(message).get();
return"Message sent successfully";
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to send message", e);
return"Failed to send message";
}
}
}
單元測試
為了驗證上述功能是否正常工作,我們寫了一些測試用例。
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.ResponseEntity;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class FinanceTransactionControllerTest {
@Autowired
private TestRestTemplate restTemplate;
@Autowired
private FinanceTransactionService financeTransactionService;
/**
* 清空主題中的消息,確保每次測試前環境干凈
*/
@BeforeEach
public void setUp() throws Exception {
clearTopic(financeTransactionService.getTopicName());
clearTopic(financeTransactionService.getDeadLetterTopicName());
}
/**
* 關閉資源
*/
@AfterEach
public void tearDown() throws Exception {
financeTransactionService.close();
}
/**
* 測試成功發送的消息是否正確地出現在主主題中,并且沒有出現在死信隊列中
*/
@Test
public void testSendMessage_Success() {
ResponseEntity<String> response = restTemplate.postForEntity("/send-message?message=transaction:1", null, String.class);
assertEquals("Message sent successfully", response.getBody());
response = restTemplate.postForEntity("/send-message?message=transaction:2", null, String.class);
assertEquals("Message sent successfully", response.getBody());
response = restTemplate.postForEntity("/send-message?message=transaction:4", null, String.class);
assertEquals("Message sent successfully", response.getBody());
// 驗證消息在主主題中
assertMessageInTopic("transaction:1");
assertMessageInTopic("transaction:2");
assertMessageInTopic("transaction:4");
// 驗證死信隊列中沒有消息
assertNoMessagesInTopic(financeTransactionService.getDeadLetterTopicName());
}
/**
* 測試失敗發送的消息是否在達到最大重試次數后進入死信隊列
*/
@Test
public void testSendMessage_Failure() {
ResponseEntity<String> response = restTemplate.postForEntity("/send-message?message=transaction:3", null, String.class);
assertEquals("Message sent successfully", response.getBody());
// 驗證消息在死信隊列中(經過多次重試)
assertMessageInTopicWithRetries("transaction:3", financeTransactionService.getMaxRedeliverCount());
}
/**
* 清空指定主題中的所有消息
*
* @param topicName 主題名稱
*/
private void clearTopic(String topicName) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl(financeTransactionService.getServiceUrl())
.build();
Reader<String> reader = client.newReader(Schema.STRING)
.topic(topicName)
.startMessageId(MessageId.earliest)
.create();
while (reader.hasMessageAvailable()) {
reader.readNext();
}
reader.close();
client.close();
}
/**
* 驗證指定主題中是否存在特定消息
*
* @param expectedMessage 預期消息內容
*/
private void assertMessageInTopic(String expectedMessage) {
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(financeTransactionService.getServiceUrl())
.build();
Reader<String> reader = client.newReader(Schema.STRING)
.topic(financeTransactionService.getTopicName())
.startMessageId(MessageId.earliest)
.create()) {
while (reader.hasMessageAvailable()) {
Message<String> msg = reader.readNext();
if (msg.getValue().equals(expectedMessage)) {
return;
}
}
fail("Expected message not found in topic: " + expectedMessage);
} catch (Exception e) {
fail("Failed to read from topic: " + e.getMessage());
}
}
/**
* 驗證指定主題中沒有消息
*
* @param topicName 主題名稱
*/
private void assertNoMessagesInTopic(String topicName) {
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(financeTransactionService.getServiceUrl())
.build();
Reader<String> reader = client.newReader(Schema.STRING)
.topic(topicName)
.startMessageId(MessageId.earliest)
.create()) {
assertFalse(reader.hasMessageAvailable(), "Unexpected messages found in topic: " + topicName);
} catch (Exception e) {
fail("Failed to read from topic: " + e.getMessage());
}
}
/**
* 驗證指定主題中是否存在特定消息(帶有重試機制)
*
* @param expectedMessage 預期消息內容
* @param maxRetries 最大重試次數
*/
private void assertMessageInTopicWithRetries(String expectedMessage, int maxRetries) {
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(financeTransactionService.getServiceUrl())
.build();
Reader<String> reader = client.newReader(Schema.STRING)
.topic(financeTransactionService.getDeadLetterTopicName())
.startMessageId(MessageId.earliest)
.create()) {
int retryCount = 0;
while (retryCount < maxRetries) {
if (reader.hasMessageAvailable()) {
Message<String> msg = reader.readNext();
if (msg.getValue().equals(expectedMessage)) {
return;
}
}
retryCount++;
Thread.sleep(1000); // 等待1秒后重試
}
fail("Expected message not found in dead letter topic after retries: " + expectedMessage);
} catch (Exception e) {
fail("Failed to read from dead letter topic: " + e.getMessage());
}
}
}
測試結果
發送消息:
curl -X POST http://localhost:8080/send-message\?message\=transaction\:1
curl -X POST http://localhost:8080/send-message\?message\=transaction\:2
curl -X POST http://localhost:8080/send-message\?message\=transaction\:3
curl -X POST http://localhost:8080/send-message\?message\=transaction\:4
日志:
Received message: transaction:1
Received message: transaction:2
Received message: transaction:3
Received message: transaction:4
責任編輯:武曉燕
來源:
Java知識日歷