使用雙異步后,如何保證數據一致性?
一、前情提要
在上一篇文章中,我們使用雙異步后,從 191s 優化到 2s,有個小伙伴在評論區問我,如何保證插入后數據的一致性呢?
很簡單,通過對比Excel文件行數和入庫數量是否相等即可。
那么,如何獲取異步線程的返回值呢?
二、通過Future獲取異步返回值
我們可以通過給異步方法添加Future返回值的方式獲取結果。
FutureTask 除了實現 Future 接口外,還實現了 Runnable 接口。因此,FutureTask 可以交給 Executor 執行,也可以由調用線程直接執行FutureTask.run()。
1、FutureTask 是基于 AbstractQueuedSynchronizer實現的
AbstractQueuedSynchronizer簡稱AQS,它是一個同步框架,它提供通用機制來原子性管理同步狀態、阻塞和喚醒線程,以及 維護被阻塞線程的隊列。基于 AQS 實現的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch 和 FutureTask。
基于 AQS實現的同步器包含兩種操作:
- acquire,阻塞調用線程,直到AQS的狀態允許這個線程繼續執行,在FutureTask中,get()就是這個方法;
- release,改變AQS的狀態,使state變為非阻塞狀態,在FutureTask中,可以通過run()和cancel()實現。
2、FutureTask執行流程
執行@Async異步方法。
建立新線程async-executor-X,執行Runnable的run()方法,(FutureTask實現RunnableFuture,RunnableFuture實現Runnable)。
判斷狀態state。
- 如果未新建或者不處于AQS,直接返回。
- 否則進入COMPLETING狀態,執行異步線程代碼。
如果執行cancel()方法改變AQS的狀態時,會喚醒AQS等待隊列中的第一個線程線程async-executor-1。
線程async-executor-1被喚醒后
- 將自己從AQS隊列中移除。
- 然后喚醒next線程async-executor-2。
- 改變線程async-executor-1的state。
- 等待get()線程取值。
next等待線程被喚醒后,循環線程async-executor-1的步驟。
- 被喚醒。
- 從AQS隊列中移除。
- 喚醒next線程。
- 改變異步線程狀態。
新建線程async-executor-N,監聽異步方法的state。
- 如果處于EXCEPTIONAL以上狀態,拋出異常。
- 如果處于COMPLETING狀態,加入AQS隊列等待。
- 如果處于NORMAL狀態,返回結果。
3、get()方法執行流程
get()方法通過判斷狀態state觀測異步線程是否已結束,如果結束直接將結果返回,否則會將等待節點扔進等待隊列自旋,阻塞住線程。
自旋直至異步線程執行完畢,獲取另一邊的線程計算出結果或取消后,將等待隊列里的所有節點依次喚醒并移除隊列。
如果state小于等于COMPLETING,表示任務還在執行中。
- 計算超時時間。
- 如果超時,則從等待隊列中移除等待節點WaitNode,返回當前狀態state。
- 阻塞隊列nanos毫秒。
- 如果已有等待節點WaitNode,將線程置空。
- 返回當前狀態。
- 如果線程被中斷,從等待隊列中移除等待節點WaitNode,拋出中斷異常。
- 如果state大于COMPLETING。
- 如果任務正在執行,讓出時間片。
- 如果還未構造等待節點,則new一個新的等待節點。
- 如果未入隊列,CAS嘗試入隊。
- 如果有超時時間參數。
- 否則阻塞隊列。
如果state大于COMPLETING。
- 如果執行完畢,返回結果。
- 如果大于等于取消狀態,則拋出異常。
很多小朋友對讀源碼,嗤之以鼻,工作3年、5年,還是沒認真讀過任何源碼,覺得讀了也沒啥用,或者讀了也看不懂~
其實,只要把源碼的執行流程通過畫圖的形式呈現出來,你就會幡然醒悟,原來是這樣的~
簡而言之:
- 如果異步線程還沒執行完,則進入CAS自旋。
- 其它線程獲取結果或取消后,重新喚醒CAS隊列中等待的線程。
- 再通過get()判斷狀態state;。
- 直至返回結果或(取消、超時、異常)為止。
三、FutureTask源碼具體分析
1、FutureTask源碼
通過定義整形狀態值,判斷state大小,這個思想很有意思,值得學習。
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
public class FutureTask<V> implements RunnableFuture<V> {
// 最初始的狀態是new 新建狀態
private volatile int state;
private static final int NEW = 0; // 新建狀態
private static final int COMPLETING = 1; // 完成中
private static final int NORMAL = 2; // 正常執行完
private static final int EXCEPTIONAL = 3; // 異常
private static final int CANCELLED = 4; // 取消
private static final int INTERRUPTING = 5; // 正在中斷
private static final int INTERRUPTED = 6; // 已中斷
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 任務還在執行中
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 線程被中斷,從等待隊列中移除等待節點WaitNode,拋出中斷異常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 任務已執行完畢或取消
if (s > COMPLETING) {
// 如果已有等待節點WaitNode,將線程置空
if (q != null)
q.thread = null;
return s;
}
// 任務正在執行,讓出時間片
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 還未構造等待節點,則new一個新的等待節點
else if (q == null)
q = new WaitNode();
// 未入隊列,CAS嘗試入隊
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果有超時時間參數
else if (timed) {
// 計算超時時間
nanos = deadline - System.nanoTime();
// 如果超時,則從等待隊列中移除等待節點WaitNode,返回當前狀態state
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 阻塞隊列nanos毫秒
LockSupport.parkNanos(this, nanos);
}
else
// 阻塞隊列
LockSupport.park(this);
}
}
private V report(int s) throws ExecutionException {
// 獲取outcome中記錄的返回結果
Object x = outcome;
// 如果執行完畢,返回結果
if (s == NORMAL)
return (V)x;
// 如果大于等于取消狀態,則拋出異常
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
}
2、將異步方法的返回值改為Future<Integer>,將返回值放到new AsyncResult<>();中
@Async("async-executor")
public void readXls(String filePath, String filename) {
try {
// 此代碼為簡化關鍵性代碼
List<Future<Integer>> futureList = new ArrayList<>();
for (int time = 0; time < times; time++) {
Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();
futureList.add(sumFuture);
}
}catch (Exception e){
logger.error("readXlsCacheAsync---插入數據異常:",e);
}
}
@Async("async-executor")
public Future<Integer> readXlsCacheAsync() {
try {
// 此代碼為簡化關鍵性代碼
return new AsyncResult<>(sum);
}catch (Exception e){
return new AsyncResult<>(0);
}
}
3、通過Future<Integer>.get()獲取返回值:
public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow){
int[] futureSumArr = new int[futureList.size()];
for (int i = 0;i<futureList.size();i++) {
try {
Future<Integer> future = futureList.get(i);
while (true) {
if (future.isDone() && !future.isCancelled()) {
Integer futureSum = future.get();
logger.info("獲取Future返回值成功"+"----Future:" + future
+ ",Result:" + futureSum);
futureSumArr[i] += futureSum;
break;
} else {
logger.info("Future正在執行---獲取Future返回值中---等待3秒");
Thread.sleep(3000);
}
}
} catch (Exception e) {
logger.error("獲取Future返回值異常: ", e);
}
}
boolean insertFlag = getInsertSum(futureSumArr, excelRow);
logger.info("獲取所有異步線程Future的返回值成功,Excel插入結果="+insertFlag);
return insertFlag;
}
4、這里也可以通過新線程+Future獲取Future返回值
不過感覺多此一舉了,就當練習Future異步取返回值了~
public static Future<Boolean> getFutureResultThreadFuture(List<Future<Integer>> futureList, int excelRow) {
ExecutorService service = Executors.newSingleThreadExecutor();
final boolean[] insertFlag = {false};
service.execute(new Runnable() {
public void run() {
try {
insertFlag[0] = getFutureResult(futureList, excelRow);
} catch (Exception e) {
logger.error("新線程+Future獲取Future返回值異常: ", e);
insertFlag[0] = false;
}
}
});
service.shutdown();
return new AsyncResult<>(insertFlag[0]);
}
獲取異步線程結果后,我們可以通過添加事務的方式,實現Excel入庫操作的數據一致性。
但Future會造成主線程的阻塞,這個就很不友好了,有沒有更優解呢?
在BUG中磨礪,在優化中成長,我們下期見~