成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Java并發編程:深入剖析CyclicBarrier源碼

開發 前端
CyclicBarrier主要用來控制線程的執行速率,初始化時指定線程數,線程調用await()方法時會阻塞,直到到達的線程數等于初始線程數,才會放行,并且一起執行。與CountDownLatch區別是,CyclicBarrier可以循環執行,而CountDownLatch只能執行一次。

引言

CyclicBarrier中文叫做循環柵欄,用來控制線程的執行速率。

適用場景:一組線程在到達柵欄之前,需要相互等待,到達柵欄之后(滿足了特定條件),再一起執行。

適用場景好像跟CountDownLatch一樣,前面介紹過CountDownLatch的適用場景,跟第二種場景很像,不過還是有點區別:

  1. CountDownLatch需要手動調用countDown()方法,這組線程才能一起執行,而CyclicBarrier無需調用調用任何方法,線程會自動執行。
  2. CountDownLatch只能使用一次,而CyclicBarrier可以循環使用。

再提一下CountDownLatch的兩個適用場景:

  1. 當前線程等待其他線程都執行完成之后,再執行。
  2. 所有線程滿足條件后,再一起執行。

使用示例

CyclicBarrier常用的方法就一個await()方法,調用await()方法之后,會阻塞當前線程,直到柵欄前的所有線程都調用了await()方法,才會放行,并且一起執行。

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author 一燈架構
 * @apiNote CyclicBarrier測試類
 **/
@Slf4j
public class CyclicBarrierTest {

    public static void main(String[] args) throws InterruptedException {

        // 1. 創建一個線程池,用來執行任務
        ExecutorService executorService = Executors.newCachedThreadPool();

        // 2. 創建一個循環柵欄,線程數是3
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

        // 3. 提交9個任務,剛好可以循環3輪
        for (int i = 0; i < 9; i++) {
            // 4. 睡眠100ms再提交任務,避免并發提交
            Thread.sleep(100);
            executorService.execute(() -> {
                try {
                    // 5. 睡眠1秒,模擬任務準備階段
                    Thread.sleep(1000);
                    log.info(Thread.currentThread().getName() + " 準備 " + cyclicBarrier.getNumberWaiting());
                    // 6. 阻塞當前任務,直到3個線程都到達柵欄
                    cyclicBarrier.await();

                    log.info(Thread.currentThread().getName() + " 執行完成");
                } catch (Exception e) {
                }
            });
        }

        // 7. 關閉線程池
        executorService.shutdown();
    }
}

輸出結果:

10:00:00.001 [pool-1-thread-1] INFO com.yideng.CyclicBarrierTest - pool-1-thread-1 準備 0
10:00:00.002 [pool-1-thread-2] INFO com.yideng.CyclicBarrierTest - pool-1-thread-2 準備 1
10:00:00.003 [pool-1-thread-3] INFO com.yideng.CyclicBarrierTest - pool-1-thread-3 準備 2
10:00:00.003 [pool-1-thread-3] INFO com.yideng.CyclicBarrierTest - pool-1-thread-3 執行完成
10:00:00.003 [pool-1-thread-1] INFO com.yideng.CyclicBarrierTest - pool-1-thread-1 執行完成
10:00:00.004 [pool-1-thread-2] INFO com.yideng.CyclicBarrierTest - pool-1-thread-2 執行完成
10:00:00.010 [pool-1-thread-4] INFO com.yideng.CyclicBarrierTest - pool-1-thread-4 準備 0
10:00:00.011 [pool-1-thread-5] INFO com.yideng.CyclicBarrierTest - pool-1-thread-5 準備 1
10:00:01.003 [pool-1-thread-6] INFO com.yideng.CyclicBarrierTest - pool-1-thread-6 準備 2
10:00:01.004 [pool-1-thread-6] INFO com.yideng.CyclicBarrierTest - pool-1-thread-6 執行完成
10:00:01.004 [pool-1-thread-4] INFO com.yideng.CyclicBarrierTest - pool-1-thread-4 執行完成
10:00:01.004 [pool-1-thread-5] INFO com.yideng.CyclicBarrierTest - pool-1-thread-5 執行完成
10:00:01.114 [pool-1-thread-7] INFO com.yideng.CyclicBarrierTest - pool-1-thread-7 準備 0
10:00:01.213 [pool-1-thread-8] INFO com.yideng.CyclicBarrierTest - pool-1-thread-8 準備 1
10:00:01.317 [pool-1-thread-9] INFO com.yideng.CyclicBarrierTest - pool-1-thread-9 準備 2
10:00:01.318 [pool-1-thread-9] INFO com.yideng.CyclicBarrierTest - pool-1-thread-9 執行完成
10:00:01.318 [pool-1-thread-7] INFO com.yideng.CyclicBarrierTest - pool-1-thread-7 執行完成
10:00:01.319 [pool-1-thread-8] INFO com.yideng.CyclicBarrierTest - pool-1-thread-8 執行完成

示例中CyclicBarrier包含3個線程,提交9個任務,每3個任務為一組,調用await()方法后會相互等待,直到3個線程都調用了await()方法,然后放行,并且一起執行,9個任務會循環3輪,從輸出結果中可以看出。

示例中getNumberWaiting()方法可以查看CyclicBarrier中已經等待的線程數。

看完了CyclicBarrier的使用方式,再看一下CyclicBarrier的源碼實現。

類屬性

public class CyclicBarrier {

    /**
     * 互斥鎖,用來保證線程安全
     */
    private final ReentrantLock lock = new ReentrantLock();

    /**
     * 柵欄條件操作
     */
    private final Condition trip = lock.newCondition();
    
    /**
     * 柵欄初始線程數
     */
    private final int parties;
    
    /**
     * 到達柵欄后的操作
     */
    private final Runnable barrierCommand;

    /**
     * 柵欄前未到達的線程數
     */
    private int count;

    /**
     * 當前循環輪數
     */
    private Generation generation = new Generation();

    
    private static class Generation {
        boolean broken = false;
    }
}

CyclicBarrier內部使用了ReentrantLock來保證線程安全,又使用了Condition來實現線程的等待與喚醒操作。

初始化

CyclicBarrier初始化的可以指定線程數和到達柵欄后的操作。

/**
 * 指定線程數
 */
public CyclicBarrier(int parties) {
    this(parties, null);
}

/**
 * 指定線程數和到達柵欄后的操作
 * @param parties 線程數
 * @param barrierAction 到達柵欄后的操作
 */
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) {
        throw new IllegalArgumentException();
    }
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

比如到達柵欄后,關閉線程池:

CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> executorService.shutdown());

看一下await()方法源碼。

await方法源碼

/**
     * await方法入口
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    /**
     * await方法核心邏輯
     * @param timed 是否允許超時,false表示不允許
     * @param nanos 超時時間
     */
    private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException, TimeoutException {
        // 1. 加鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 2. 獲取當前循環輪數
            final Generation g = generation;
            if (g.broken) {
                throw new BrokenBarrierException();
            }

            // 3. 如果當前線程已中斷,就打破柵欄
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            // 4. 計數器減一,如果計數器為零,表示所有線程都到達了柵欄
            int index = --count;
            if (index == 0) {
                boolean ranAction = false;
                try {
                    // 5. 如果初始化時指定了barrierCommand,就執行
                    final Runnable command = barrierCommand;
                    if (command != null) {
                        command.run();
                    }
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction) {
                        breakBarrier();
                    }
                }
            }

            for (; ; ) {
                try {
                    // 6. 如果不允許超時,就阻塞當前線程
                    if (!timed) {
                        trip.await();
                    } else if (nanos > 0L) {
                        nanos = trip.awaitNanos(nanos);
                    }
                } catch (InterruptedException ie) {
                    if (g == generation && !g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken) {
                    throw new BrokenBarrierException();
                }

                if (g != generation) {
                    return index;
                }

                // 7. 如果已超時,就打破柵欄
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            // 8. 釋放鎖
            lock.unlock();
        }
    }

await()方法源碼很長,但是邏輯很簡單,主要分為以下四步:

  1. 加鎖,保證線程安全。
  2. 統計柵欄前等待的線程數,如果所有線程都到達了柵欄,就執行初始化時指定的barrierCommand。
  3. 如果線程沒有指定了超時時間,就直接阻塞當前線程。如果指定了超時時間,就等待直到超時,如果已超時,就打破柵欄。
  4. 釋放鎖

再看一下打破柵欄的源碼:

/**
 * 打破柵欄
 */
private void breakBarrier() {
    // 1. 設置當前循環輪數的狀態為已打破
    generation.broken = true;
    // 2. 重置線程數
    count = parties;
    // 3. 喚醒所有等待的線程
    trip.signalAll();
}

其他常用方法

CyclicBarrier還有一些常用的方法:

/**
 * 等待(帶超時時間)
 * @param timeout 超時時間
 * @param unit 時間單位
 */
public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
        BrokenBarrierException,
        TimeoutException {
    ...
}

/**
 * 重置柵欄(當柵欄出現異常情況時使用)
 */
public void reset() {
    ...
}

總結

看完了CyclicBarrier的所有源碼,是不是覺得CyclicBarrier邏輯很簡單。

CyclicBarrier主要用來控制線程的執行速率,初始化時指定線程數,線程調用await()方法時會阻塞,直到到達的線程數等于初始線程數,才會放行,并且一起執行。與CountDownLatch區別是,CyclicBarrier可以循環執行,而CountDownLatch只能執行一次。

責任編輯:武曉燕 來源: 一燈架構
相關推薦

2011-07-11 17:38:42

JAVA

2020-12-03 11:15:21

CyclicBarri

2021-03-18 00:14:29

JavaCyclicBarri高并發

2009-09-08 16:31:13

Linq開放式并發

2010-07-13 13:06:41

Perl面向對象

2014-03-14 10:34:28

JavaJava并發

2022-11-14 11:09:36

源碼AQS加鎖

2021-03-06 22:41:06

內核源碼CAS

2024-12-31 09:00:12

Java線程狀態

2019-09-16 09:23:34

高并發編程CountDownLaCyclicBarri

2024-01-29 15:54:41

Java線程池公平鎖

2024-04-02 09:40:39

多線程Java原子性

2020-12-11 07:32:45

編程ThreadLocalJava

2020-11-13 08:42:24

Synchronize

2024-03-18 08:15:48

Java并發編程

2022-10-12 07:53:46

并發編程同步工具

2024-02-29 09:37:25

Java并發編程

2010-03-15 19:02:25

Java編程語言

2010-09-17 10:53:45

Java運行環境

2020-12-07 09:40:19

Future&Futu編程Java
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲一区二区三区在线免费观看 | 亚洲+变态+欧美+另类+精品 | 久久精品中文 | 福利国产 | 国产2区 | 亚洲一区二区免费 | 免费一二区 | 国产欧美一区二区三区在线看蜜臀 | 激情av| 91精品综合久久久久久五月天 | 免费精品视频在线观看 | 天堂视频中文在线 | 91视频大全 | 国产一区二区三区在线看 | 国产精品免费看 | 亚洲精品亚洲人成人网 | 日一日操一操 | 欧美一级欧美三级在线观看 | 国产精品国产a级 | 精品国产乱码久久久久久88av | 免费一区二区三区 | 日韩在线视频免费观看 | 超碰在线人人 | 国产精品爱久久久久久久 | 亚洲高清三级 | 成人精品一区亚洲午夜久久久 | 午夜视频在线视频 | 九九久久久 | 毛片区| 欧美欧美欧美 | 中文字幕不卡在线观看 | 香蕉久久a毛片 | 黄色免费在线观看 | 91久久国产综合久久 | 不卡的av在线 | 成人亚洲精品久久久久软件 | 国产成人精品午夜 | 国产不卡一区在线观看 | 国产夜恋视频在线观看 | h视频免费观看 | 日韩久久精品视频 |