成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

使用雙異步后,如何保證數據一致性?

數據庫 其他數據庫
AbstractQueuedSynchronizer簡稱AQS,它是一個同步框架,它提供通用機制來原子性管理同步狀態、阻塞和喚醒線程,以及 維護被阻塞線程的隊列。基于 AQS 實現的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch 和 FutureTask。

一、前情提要

在上一篇文章中,我們使用雙異步后,從 191s 優化到 2s,有個小伙伴在評論區問我,如何保證插入后數據的一致性呢?

很簡單,通過對比Excel文件行數和入庫數量是否相等即可。

那么,如何獲取異步線程的返回值呢?

二、通過Future獲取異步返回值

我們可以通過給異步方法添加Future返回值的方式獲取結果。

FutureTask 除了實現 Future 接口外,還實現了 Runnable 接口。因此,FutureTask 可以交給 Executor 執行,也可以由調用線程直接執行FutureTask.run()。

1、FutureTask 是基于 AbstractQueuedSynchronizer實現的

AbstractQueuedSynchronizer簡稱AQS,它是一個同步框架,它提供通用機制來原子性管理同步狀態、阻塞和喚醒線程,以及 維護被阻塞線程的隊列。基于 AQS 實現的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch 和 FutureTask。

基于 AQS實現的同步器包含兩種操作:

  • acquire,阻塞調用線程,直到AQS的狀態允許這個線程繼續執行,在FutureTask中,get()就是這個方法;
  • release,改變AQS的狀態,使state變為非阻塞狀態,在FutureTask中,可以通過run()和cancel()實現。

2、FutureTask執行流程

執行@Async異步方法。

建立新線程async-executor-X,執行Runnable的run()方法,(FutureTask實現RunnableFuture,RunnableFuture實現Runnable)。

判斷狀態state。

  • 如果未新建或者不處于AQS,直接返回。
  • 否則進入COMPLETING狀態,執行異步線程代碼。

如果執行cancel()方法改變AQS的狀態時,會喚醒AQS等待隊列中的第一個線程線程async-executor-1。

線程async-executor-1被喚醒后

  • 將自己從AQS隊列中移除。
  • 然后喚醒next線程async-executor-2。
  • 改變線程async-executor-1的state。
  • 等待get()線程取值。

next等待線程被喚醒后,循環線程async-executor-1的步驟。

  • 被喚醒。
  • 從AQS隊列中移除。
  • 喚醒next線程。
  • 改變異步線程狀態。

新建線程async-executor-N,監聽異步方法的state。

  • 如果處于EXCEPTIONAL以上狀態,拋出異常。
  • 如果處于COMPLETING狀態,加入AQS隊列等待。
  • 如果處于NORMAL狀態,返回結果。

3、get()方法執行流程

get()方法通過判斷狀態state觀測異步線程是否已結束,如果結束直接將結果返回,否則會將等待節點扔進等待隊列自旋,阻塞住線程。

自旋直至異步線程執行完畢,獲取另一邊的線程計算出結果或取消后,將等待隊列里的所有節點依次喚醒并移除隊列。

如果state小于等于COMPLETING,表示任務還在執行中。

  • 計算超時時間。
  • 如果超時,則從等待隊列中移除等待節點WaitNode,返回當前狀態state。
  • 阻塞隊列nanos毫秒。
  • 如果已有等待節點WaitNode,將線程置空。
  • 返回當前狀態。
  • 如果線程被中斷,從等待隊列中移除等待節點WaitNode,拋出中斷異常。
  • 如果state大于COMPLETING。
  • 如果任務正在執行,讓出時間片。
  • 如果還未構造等待節點,則new一個新的等待節點。
  • 如果未入隊列,CAS嘗試入隊。
  • 如果有超時時間參數。
  • 否則阻塞隊列。

如果state大于COMPLETING。

  • 如果執行完畢,返回結果。
  • 如果大于等于取消狀態,則拋出異常。

很多小朋友對讀源碼,嗤之以鼻,工作3年、5年,還是沒認真讀過任何源碼,覺得讀了也沒啥用,或者讀了也看不懂~

其實,只要把源碼的執行流程通過畫圖的形式呈現出來,你就會幡然醒悟,原來是這樣的~

簡而言之:

  • 如果異步線程還沒執行完,則進入CAS自旋。
  • 其它線程獲取結果或取消后,重新喚醒CAS隊列中等待的線程。
  • 再通過get()判斷狀態state;。
  • 直至返回結果或(取消、超時、異常)為止。

三、FutureTask源碼具體分析

1、FutureTask源碼

通過定義整形狀態值,判斷state大小,這個思想很有意思,值得學習。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
public class FutureTask<V> implements RunnableFuture<V> {

 // 最初始的狀態是new 新建狀態
 private volatile int state;
    private static final int NEW          = 0; // 新建狀態
    private static final int COMPLETING   = 1; // 完成中
    private static final int NORMAL       = 2; // 正常執行完
    private static final int EXCEPTIONAL  = 3; // 異常
    private static final int CANCELLED    = 4; // 取消
    private static final int INTERRUPTING = 5; // 正在中斷
    private static final int INTERRUPTED  = 6; // 已中斷

 public V get() throws InterruptedException, ExecutionException {
     int s = state;
     // 任務還在執行中
     if (s <= COMPLETING)
         s = awaitDone(false, 0L);
     return report(s);
 }
 
 private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
         // 線程被中斷,從等待隊列中移除等待節點WaitNode,拋出中斷異常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            // 任務已執行完畢或取消
            if (s > COMPLETING) {
             // 如果已有等待節點WaitNode,將線程置空
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 任務正在執行,讓出時間片
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            // 還未構造等待節點,則new一個新的等待節點
            else if (q == null)
                q = new WaitNode();
            // 未入隊列,CAS嘗試入隊
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 如果有超時時間參數
            else if (timed) {
             // 計算超時時間
                nanos = deadline - System.nanoTime();
                // 如果超時,則從等待隊列中移除等待節點WaitNode,返回當前狀態state
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                // 阻塞隊列nanos毫秒
                LockSupport.parkNanos(this, nanos);
            }
            else
             // 阻塞隊列
                LockSupport.park(this);
        }
    }
    
 private V report(int s) throws ExecutionException {
  // 獲取outcome中記錄的返回結果
        Object x = outcome;
        // 如果執行完畢,返回結果
        if (s == NORMAL)
            return (V)x;
            // 如果大于等于取消狀態,則拋出異常
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
}

2、將異步方法的返回值改為Future<Integer>,將返回值放到new AsyncResult<>();中

@Async("async-executor")
public void readXls(String filePath, String filename) {
    try {
     // 此代碼為簡化關鍵性代碼
        List<Future<Integer>> futureList = new ArrayList<>();
        for (int time = 0; time < times; time++) {
            Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();
            futureList.add(sumFuture);
        }
    }catch (Exception e){
        logger.error("readXlsCacheAsync---插入數據異常:",e);
    }
}
@Async("async-executor")
public Future<Integer> readXlsCacheAsync() {
    try {
        // 此代碼為簡化關鍵性代碼
        return new AsyncResult<>(sum);
    }catch (Exception e){
        return new AsyncResult<>(0);
    }
}

3、通過Future<Integer>.get()獲取返回值:

public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow){
    int[] futureSumArr = new int[futureList.size()];
    for (int i = 0;i<futureList.size();i++) {
        try {
            Future<Integer> future = futureList.get(i);
            while (true) {
                if (future.isDone() && !future.isCancelled()) {
                    Integer futureSum = future.get();
                    logger.info("獲取Future返回值成功"+"----Future:" + future
                            + ",Result:" + futureSum);
                    futureSumArr[i] += futureSum;
                    break;
                } else {
                    logger.info("Future正在執行---獲取Future返回值中---等待3秒");
                    Thread.sleep(3000);
                }
            }
        } catch (Exception e) {
            logger.error("獲取Future返回值異常: ", e);
        }
    }
    
    boolean insertFlag = getInsertSum(futureSumArr, excelRow);
    logger.info("獲取所有異步線程Future的返回值成功,Excel插入結果="+insertFlag);
    return insertFlag;
}

4、這里也可以通過新線程+Future獲取Future返回值

不過感覺多此一舉了,就當練習Future異步取返回值了~

public static Future<Boolean> getFutureResultThreadFuture(List<Future<Integer>> futureList, int excelRow) {
    ExecutorService service = Executors.newSingleThreadExecutor();
    final boolean[] insertFlag = {false};
    service.execute(new Runnable() {
        public void run() {
            try {
                insertFlag[0] = getFutureResult(futureList, excelRow);
            } catch (Exception e) {
                logger.error("新線程+Future獲取Future返回值異常: ", e);
                insertFlag[0] = false;
            }
        }
    });
    service.shutdown();
    return new AsyncResult<>(insertFlag[0]);
}

獲取異步線程結果后,我們可以通過添加事務的方式,實現Excel入庫操作的數據一致性。

但Future會造成主線程的阻塞,這個就很不友好了,有沒有更優解呢?

在BUG中磨礪,在優化中成長,我們下期見~


責任編輯:姜華 來源: 哪吒編程
相關推薦

2024-12-26 15:01:29

2025-03-27 08:20:54

2023-09-07 08:11:24

Redis管道機制

2023-05-26 07:34:50

RedisMySQL緩存

2024-08-20 16:13:52

2022-12-05 08:24:32

mongodb數據庫數據

2021-12-14 07:15:57

MySQLRedis數據

2024-07-04 12:36:50

2023-09-15 14:24:54

ByteHouseClickHouse開源

2021-12-05 21:06:27

軟件

2022-08-23 07:46:45

數據一致性數據庫

2022-03-31 08:21:14

數據庫緩存雙寫數據一致性

2022-10-19 12:22:53

并發扣款一致性

2023-12-11 12:27:31

并發Zookeeper數據

2019-08-30 12:46:10

并發扣款查詢SQL

2022-02-17 21:04:27

數據庫MysqlRedis

2018-08-14 10:39:04

數據錯誤DIX

2020-09-03 09:45:38

緩存數據庫分布式

2025-04-27 08:52:21

Redis數據庫緩存

2021-03-04 06:49:53

RocketMQ事務
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 91精品无人区卡一卡二卡三 | 国产极品粉嫩美女呻吟在线看人 | 亚洲二区在线观看 | 色久影院 | 黄色欧美在线 | 国产精品www | 亚洲一区av | 91大神在线看 | 国产成人高清视频 | 精品免费国产一区二区三区四区介绍 | 欧美久久久久久 | 理论片87福利理论电影 | 日本成人三级电影 | 波多野结衣一二三区 | 亚洲视频中文字幕 | 日韩欧美手机在线 | 国产精品久久久久久妇女 | 久久高清精品 | 特黄色一级毛片 | 国产激情综合五月久久 | 孕妇一级毛片 | 久久久久久91香蕉国产 | 国产一区二区av | 高清视频一区二区三区 | av电影手机在线看 | 久久久123| 国产成人在线一区二区 | 亚洲精品在线免费播放 | 国产ts人妖一区二区三区 | 日韩欧美一级精品久久 | 国产精品久久久久久久久免费桃花 | 亚洲人成网亚洲欧洲无码 | 久久成人精品视频 | 91色视频在线 | 一区二区视频在线 | 在线亚洲人成电影网站色www | 日韩在线观看中文字幕 | 亚洲精品国产电影 | 一级二级三级在线观看 | 久久久久久999 | 在线综合视频 |