這重試器寫的真地地地地地地道,你覺得地道嗎?
服務總是不穩定的,有的時候需要編寫重試邏輯,比如,HTTP的重試;定時任務的重試等。
一、簡單的實現
我們可以使用while
或for
循環,配置try-catch
和break
組合,完成循環邏輯。
final Random random = new Random();
final int maxRetryCount = 10;
int times = 0;
while (true) {
times++;
if (times > maxRetryCount) {
break;
}
try {
// 業務邏輯
System.out.println("最大重試" + maxRetryCount + "次,當前是第" + times + "次");
if (random.nextInt(10) > 5) {
throw new RuntimeException("隨機數失敗");
}
if (random.nextInt(10) / 2 == 0) {
System.out.println("邏輯執行成功");
break;
}
Thread.sleep(1000);
// 業務邏輯
} catch (Exception e) {
System.out.println("進入異常捕獲");
}
}
System.out.println("業務邏輯執行完畢");
其中一次執行結果:
最大重試10次,當前是第1次 進入異常捕獲 最大重試10次,當前是第2次 進入異常捕獲 最大重試10次,當前是第3次 最大重試10次,當前是第4次 進入異常捕獲 最大重試10次,當前是第5次 最大重試10次,當前是第6次 進入異常捕獲 最大重試10次,當前是第7次 最大重試10次,當前是第8次 邏輯執行成功 業務邏輯執行完畢
上面這種實現算是重試邏輯的模板化代碼,大差不差的都是這種寫法。
我們再看看其他的寫法。
二、重試裝飾器的實現
本節我們使用裝飾器模式實現,借助經典的面向對象編程風格(通過類和接口)。同時,我們選擇更簡潔的函數式方法。
首先,我們將聲明一個函數,接收Supplier<T>
和最大調用次數作為參數。
然后,還是使用while
循環和try-catch
塊多次調用該函數。
最后,我們將通過返回另一個Supplier<T>
來保留原始數據類型。
static <T> Supplier<T> retryFunction(Supplier<T> supplier, int maxRetries) {
return () -> {
int retries = 0;
while (retries < maxRetries) {
try {
return supplier.get();
} catch (Exception e) {
retries++;
}
}
throw new IllegalStateException(String.format("任務在 %s 次嘗試后失敗", maxRetries));
};
}
有了上面的函數,我們可以基于這個函數裝飾器繼續創建CompletableFuture
:
static <T> CompletableFuture<T> retryTask(Supplier<T> supplier, int maxRetries) {
Supplier<T> retryableSupplier = retryFunction(supplier, maxRetries);
return CompletableFuture.supplyAsync(retryableSupplier);
}
模擬下業務場景,有個標志位數字,我們需要檢查這個標志位是否大于4,如果大于就結束任務,如果小于4,重試。
final AtomicInteger retriesCounter = new AtomicInteger(0);
Supplier<Integer> codeToRun = () -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
int retryNr = retriesCounter.get();
System.out.println("Retrying: " + retryNr + "; thread:" + Thread.currentThread().getName());
if (retryNr < 4) {
retriesCounter.incrementAndGet();
throw new RuntimeException();
}
return 100;
};
然后我們借助retryTask()
函數完成重試邏輯,假設最大重試10次,根據上面的定義,正常會在第四次跳出:
CompletableFuture<Integer> result = retryTask(codeToRun, 10);
結果會打印:
Retrying: 0 Retrying: 1 Retrying: 2 Retrying: 3 Retrying: 4 100 4
如果重試次數小于4,就會觸發IllegalStateException
異常:
try {
result = retryTask(codeToRun, 3);
System.out.println(result.get());
} catch (Exception e) {
System.out.println("超過最大重試次數");
}
結果會打印:
Retrying: 0 Retrying: 1 Retrying: 2 超過最大重試次數
三、重試CompletableFuture
CompletableFuture
提供了內部邏輯出現異常時處理的方法,比如exceptionally()
等方法,我們可以直接使用這些方法,不需要自定義裝飾器。
CompletableFuture
的使用可以參考由淺入深掌握CompletableFuture的七種用法。
(一)不安全重試
exceptionally()
方法允許指定一個替代函數,當主要邏輯出現異常時,會調用指定的替代函數。
如果我們打算重試兩次,我們可以這樣寫:
static <T> CompletableFuture<T> retryTwice(Supplier<T> supplier) {
return CompletableFuture.supplyAsync(supplier)
.exceptionally(__ -> supplier.get())
.exceptionally(__ -> supplier.get());
}
如果重試次數可變,我們可以使用for
循環:
static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionally(__ -> supplier.get());
}
return cf;
}
上面的寫法可以滿足需求,但是有一點需要注意,當Supplier
運行比較快,在CompletableFuture
和exceptionally()
回退創建之前執行完畢,那exceptionally()
的方法就會在主線程執行。比如,我們設定retryUnsafe
休眠1000ms,指定codeToRun
不做sleep:
static <T> CompletableFuture<T> retryUnsafe(Supplier<T> supplier, int maxRetries) throws InterruptedException {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
Thread.sleep(1000);
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionally(__ -> supplier.get());
}
return cf;
}
codeToRun = () -> {
int retryNr = retriesCounter.get();
System.out.println("Retrying: " + retryNr + "; thread:" + Thread.currentThread().getName());
if (retryNr < 4) {
retriesCounter.incrementAndGet();
throw new RuntimeException();
}
return 100;
};
retryUnsafe(codeToRun, 3);
運行結果將是:
Retrying: 0; thread:ForkJoinPool.commonPool-worker-1 Retrying: 1; thread:main Retrying: 2; thread:main Retrying: 3; thread:main
符合預期,后續調用是由主線程執行的。如果初始調用很快,但后續調用預計會更慢,這可能會成為問題。
(二)異步重試
如果是在Java12之后,我們可以通過exceptionallyAsync()
方法實現,將所有重試都異步執行。
static <T> CompletableFuture<T> retryExceptionallyAsync(Supplier<T> supplier, int maxRetries) {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
for (int i = 0; i < maxRetries; i++) {
cf = cf.exceptionallyAsync(__ -> supplier.get());
}
return cf;
}
這個時候運行結果將是:
Retrying: 0; thread:ForkJoinPool.commonPool-worker-1
Retrying: 1; thread:ForkJoinPool.commonPool-worker-1
Retrying: 2; thread:ForkJoinPool.commonPool-worker-1
Retrying: 3; thread:ForkJoinPool.commonPool-worker-1
(三)嵌套CompletableFutures
如果是在Java12之前,我們需要實現兼容方案,那我們可以寫一個增加邏輯,實現異步化,我們可以這樣寫:
static <T> CompletableFuture<T> retryNesting(Supplier<T> supplier, int maxRetries)
throws InterruptedException {
CompletableFuture<T> cf = CompletableFuture.supplyAsync(supplier);
Thread.sleep(1000);
for (int i = 0; i < maxRetries; i++) {
cf = cf.thenApply(CompletableFuture::completedFuture)
.exceptionally(__ -> CompletableFuture.supplyAsync(supplier))
.thenCompose(Function.identity());
}
return cf;
}
其實簡單或就是有創建了一個CompletableFuture
,運行結果也是符合預期的。
四、總結
在本文中,我們探討了在CompletableFuture中重試函數調用的概念。我們首先深入研究了以函數式風格實現裝飾器模式,使我們能夠重試函數本身。
隨后,我們利用CompletableFuture API完成相同的任務,同時保持異步流程。我們發現了Java 12中引入的exceptionallyAsync()
方法,它非常適合這個目的。最后,我們提出了一種替代方法,僅依賴于原始Java 8 API中的方法。
文末總結
本文介紹了使用實現重試邏輯的三種方式:簡單模式、裝飾器模式、和利用CompletableFuture
原生API的實現。