成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Java 并發流程工具的實戰探索

開發 前端
本文將踏上Java并發流程工具的實戰探索之旅。我們不僅會深入剖析這些工具的核心原理,更會通過實際代碼示例,詳細展示它們在不同應用場景中的具體應用。

在當今數字化時代,軟件系統面臨著日益增長的高并發需求。無論是大型電商平臺在促銷活動時處理海量的訂單請求,還是在線游戲服務器同時承載眾多玩家的交互操作,高效的并發處理能力都成為了系統性能和穩定性的關鍵因素。

Java作為一門廣泛應用于企業級開發的編程語言,提供了豐富且強大的并發流程工具。這些工具猶如精巧的齒輪,相互配合,使開發者能夠在多線程環境下有條不紊地控制程序的執行流程,確保各個線程之間協調運作,高效完成復雜的任務。

本文將踏上Java并發流程工具的實戰探索之旅。我們不僅會深入剖析這些工具的核心原理,更會通過實際代碼示例,詳細展示它們在不同應用場景中的具體應用。從簡單的線程同步控制,到復雜的多階段任務協調,一步步揭開Java并發流程工具的神秘面紗,幫助讀者掌握在實際項目中運用這些工具優化程序性能、提升系統可靠性的技巧。

一、CountDownLatch

1. 詳解CountDownLatch工作流程

筆者一般稱CountDownLatch為倒計時門閂,它主要用于需要某些條件下才能喚醒的需求場景,例如我們線程1必須等到線程2做完某些事,那么就可以設置一個CountDownLatch并將數值設置為1,一旦線程2完成業務邏輯后,將數值修改為0,此時線程1就會被喚醒:

2. 模擬等待工作完成

通過上述的描述可能有點抽象,我們直接通過幾個例子演示一下,我們現在有這樣一個需求,希望等待5個線程完成之后,打印輸出一句工作完成:

對應的代碼示例如下,可以看到我們創建了數值為5的CountDownLatch ,一旦線程池里的線程完成工作后就調用countDown進行扣減,一旦數值變為0,主線程await就會放行,執行后續輸出:

int workerSize = 5;
        CountDownLatch workCount = new CountDownLatch(workerSize);
        ExecutorService threadPool = Executors.newFixedThreadPool(workerSize);

        for (int i = 0; i < workerSize; i++) {
            final int workerNum = i;
            //5個工人輸出完成工作后,扣減倒計時門閂數
            threadPool.submit(() -> {
                log.info("worker[{}]完成手頭的工作", workerNum);
                workCount.countDown();
            });
        }

        try {
            //阻塞當前線程(主線程)往后走,只有倒計時門閂變為0之后才能繼續后續邏輯
            log.info("等待worker工作完成");
            workCount.await();
        } catch (InterruptedException e) {
            log.info("倒計時門閂阻塞失敗,失敗原因[{}]", e.getMessage(), e);
        }

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }

        log.info("所有工人都完成手頭的工作了");

對應的我們也給出輸出結果,可以看到主線程在線程池線程完成后才輸出:

3. 模擬運動員賽跑

實際上CountDownLatch可以讓多個線程進行等待,我們不妨用線程模擬一下所有運動員就緒后,等待槍響后起跑的場景:

代碼如下,每當運動員即線程池的線程準備就緒,則調用await等待槍響,一旦所有運動員就緒之后,主線程調用countDown模擬槍響,然后運動員起跑:

public static void main(String[] args) {
        log.info("百米跑比賽開始");

        int playerNum = 3;
        CountDownLatch gun = new CountDownLatch(1);
        ExecutorService threadPool = Executors.newFixedThreadPool(playerNum);
        
        for (int i = 0; i < playerNum; i++) {
            final int playNo = i;
            
            threadPool.submit(() -> {
                log.info("[{}]號運動員已就緒", playNo);
                try {
                    gun.await();
                } catch (InterruptedException e) {
                    log.info("[{}]號運動員線程阻塞失敗,失敗原因[{}]", playNo, e.getMessage(), e);
                }
                log.info("[{}]號運動員已經到達重點", playNo);
            });
        }

        //按下槍 所有運動員起跑
        gun.countDown();

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }

        log.info("百米賽跑已結束");
    }

對應的我們也給出相應的輸出結果:

4. 從源碼角度分析CountDownLatch工作流程

我們以等待所有工人完成工作的例子進行解析,實際上在CountDownLatch是通過state和一個抽象隊列即aqs完成多線程之間的流程調度,主線程調用await方法等待其他worker線程,如果其它worker線程沒有完成工作,那么CountDownLatch就會將其存入抽象隊列中。

一旦其他線程將state設置為0時,await對應的線程就會從抽象隊列中釋放并喚醒:

對應我們給出countDown的實現,可以看到該方法底層就是將aqs隊列中的state進行扣減:

public void countDown() {
        sync.releaseShared(1);
    }

//releaseShared內部核心邏輯就是將state扣減1
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                //扣減state并通過cas修改賦值
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

而countDown本質上就是查看這個state,如果state被扣減為0,則調用aqs底層doReleaseShared方法將隊列中等待線程喚醒:

public void countDown() {
        sync.releaseShared(1);
    }


public final boolean releaseShared(int arg) {
  //查看是否扣減為0
        if (tryReleaseShared(arg)) {
        //如果是0則將當前等待線程喚醒
            doReleaseShared();
            return true;
        }
        return false;
    }

上文講解countDown涉及一些關于AQS的實用理解和設計,關于更多AQS的知識點,感興趣的讀者可以閱讀一下筆者的這篇文章:《AQS 源碼解析:原理與實踐

二、Semaphore

1. 詳解Semaphore

信號量多用于限流的場景,例如我們希望單位時間內只能有一個線程工作,我們就可以使用信號量,只有拿到線程的信號量才能工作,工作完成后釋放信號量,其余線程才能爭搶這個信號量并進行進一步的操作。 對應我們給出下面這段代碼,可以看到生命信號量數值為6,每當線程拿到3個信號量之后就會執行業務操作,完成后調用release釋放3個令牌,讓其他線程繼續爭搶:

//設置可復用的信號量,令牌數為3
        Semaphore semaphore = new Semaphore(6, true);
        //創建5個線程
        int workSize = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(workSize);


        for (int i = 0; i < workSize; i++) {
            executorService.submit(() -> {
                try {
                    //拿3個令牌
                    semaphore.acquire(3);

                    log.info("進行業務邏輯處理.......");
                    ThreadUtil.sleep(1000);

                    //釋放3個令牌
                    semaphore.release(3);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executorService.shutdown();
        while (!executorService.isTerminated()) {

        }

對應輸出結果如下,可以看到每個線程拿到令牌后都會休眠1秒,從輸出結果來看每秒只有兩個線程才工作,符合我們的限流需求:

2. 詳解Semaphore工作原理

Semaphore底層也是用到的aqs隊列,線程進行資源獲取時也是通過查看state是否足夠,在明確足夠的情況下進行state扣減,然后進行工作。如果線程發現state數量不夠,那么就會被Semaphore存入aqs底層的抽象隊列中,直到state數量足夠后被喚醒:

對此我們給出Semaphore底層的acquire的邏輯可以看到,它會讀取state數值然后進行扣減,如果剩余數量大于0則說明令牌獲取成功線程可以執行后續邏輯,反之說明當前令牌數不夠,外部邏輯會將該線程掛到等待隊列中,等待令牌足夠后將其喚醒:

protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                //讀取可用的state    
                int available = getState();
                //計算剩余的state
                int remaining = available - acquires;
                //如果小于0說明令牌數不足直接返回出去,讓外部將線程掛起,反之通過cas修改剩余數,返回大于0的結果讓持有令牌的線程執行后續邏輯
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

3. Semaphore使用注意事項

  • 獲取和釋放的時候都可以指定數量,但是要保持一致。
  • 公平性設置為true會更加合理
  • 并不必須由獲取許可證的線程釋放許可證。可以是A獲取,B釋放。

三、Condition

1. 詳解Condition

Condition即條件對象,不是很常用或者直接用到的對象,常用于線程等待喚醒操作,例如A線程需要等待某個條件的時候,我們可以通過condition.await()方法,A線程就會進入阻塞狀態。

線程B執行condition.signal()方法,則JVM就會從被阻塞線程中找到等待該condition的線程。線程A收到可執行信號的時候,他的線程狀態就會變成Runnable可執行狀態。

對此我們給出代碼示例,可以看到我們從ReentrantLock 中拿到一個Condition 對象,讓創建的線程進入等待狀態,隨后讓主線程調用condition 的signal將其喚醒:

private ReentrantLock lock = new ReentrantLock();
    //條件對象,操控線程的等待和通知
    private Condition condition = lock.newCondition();

    public void waitCondition() throws InterruptedException {
        lock.lock();
        try {
            log.info("等待達到條件后通知");
            condition.await();
            log.info("收到通知,開始執行業務邏輯");
        } finally {
            lock.unlock();
            log.info("執行完成,釋放鎖");
        }
    }


    public void notifyCondition() throws InterruptedException {
        lock.lock();
        try {
            log.info("達到條件發起通知");
            condition.signal();
            log.info("發起通知結束");
        } finally {
            lock.unlock();
            log.info("發起通知執行完成,釋放鎖");
        }
    }


    public static void main(String[] args) throws InterruptedException {
        Main obj = new Main();

        new Thread(() -> {
            try {
                obj.waitCondition();
                //讓出CPU時間片,交給主線程發起通知
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                log.error("等待條件通知設置失敗,失敗原因 [{}]", e.getMessage(), e);
            }
        }).start();

        //休眠3s喚醒等待線程
        Thread.sleep(3000);
        obj.notifyCondition();
    }

對應的我們也給出輸出結果:

2. 基于條件對象完成生產者、消費者模式

我們假設用一個隊列存放一波生產者生產的資源,當資源滿了通知消費者消費。當消費者消費空了,通知生產者生產。

所以這時候使用condition控制流程最合適(這也是阻塞的隊列內部的實現),所以我們要定義兩個信號,分別為:

  • 當資源被耗盡,我們就使用資源未滿條件(notFull): 調用signal通知生產者消費,消費者調用await進入等待。
  • 當資源被填滿,使用資源為空條件(notEmpty):將生產者用await方法掛起,消費者用signal喚醒消費告知非空。

很明顯生產者和消費者本質上就是基于這兩個標識分別標志自己的等待時機和通知時機,以生產者為例,即每生產一個資源后就可以調用notEmpty通知消費者消費,當生產者速度過快,則用await等待未滿notFull條件阻塞:

首先我們給出生產者和消費者條件和資源隊列聲明,基于上述條件我們給出一個經典的生產者和消費者模式的示例,我們首先給出生產者代碼,可以看到資源滿的時候調用notFull.await();將自己掛起等待未滿,生產資源后調用 notEmpty.signal();通知消費者消費。

對應消費者示例代碼也是一樣,當資源消費完全,調用notEmpty.await();等待不空,一旦消費定量資源調用notFull.signal();通知生產者生產。

最終代碼示例如下:

@Slf4j
public class ProducerMode {

    //鎖
    private static ReentrantLock lock = new ReentrantLock();
    // 資源未滿
    private Condition notFull = lock.newCondition();
    //資源為空
    private Condition notEmpty = lock.newCondition();

    private Queue<Integer> queue = new PriorityQueue<>(10);
    private int queueMaxSize = 10;

    /**
     * 生產者
     */
    private class Producer extends Thread {
        @Override
        public void run() {

            while (true) {
                lock.lock();

                try {
                    if (queueMaxSize == queue.size()) {
                        log.info("當前隊列已滿,通知消費者消費");
                        //等待不滿條件觸發
                        notFull.await();

                    }

                    queue.offer(1);
                    log.info("生產者補貨,當前隊列有 【{}】", queue.size());
                    //通知消費者隊列不空,可以消費
                    notEmpty.signal();
                } catch (Exception e) {
                    log.error("生產者報錯,失敗原因 [{}]", e.getMessage(), e);
                } finally {
                    lock.unlock();
                }


            }
        }



    }

    /**
     * 消費者
     */
    private class Consumer extends Thread {
        @Override
        public void run() {

            while (true) {
                lock.lock();

                try {
                    if (0 == queue.size()) {
                        log.info("當前隊列已空,通知生產者補貨");
                        //等待不空條件達到
                        notEmpty.await();

                    }

                    queue.poll();
                    //通知消費者不滿了
                    notFull.signal();
                    log.info("消費者完成消費,當前隊列還剩余 【{}】個元素", queue.size());
                } catch (Exception e) {
                    log.error("生產者報錯,失敗原因 [{}]", e.getMessage(), e);
                } finally {
                    lock.unlock();
                }


            }
        }
    }


    public static void main(String[] args) {
        ProducerMode mode = new ProducerMode();
        Producer producer = mode.new Producer();
        ProducerMode.Consumer consumer = mode.new Consumer();
        producer.start();
        consumer.start();
    }
}

對應的我們給出輸出結果:

四、CyclicBarrier

1. CyclicBarrier 原理和使用示例

CyclicBarrier 也就是循環柵欄對象,不是很常用,它主要用于等待線程數就緒后執行公共邏輯的業務場景。 例如我們希望每湊齊5個線程后執行后續邏輯,我們就可以說明CyclicBarrier 數值為5,然后每個線程到期后調用await等待其他線程就緒。

一旦到齊5個,CyclicBarrier 就會通知這些線程開始工作,對應的代碼如下所示:

public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CyclicBarrier barrier = new CyclicBarrier(threadCount);

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                System.out.println("線程 " + Thread.currentThread().getName() + " 開始執行任務");
                try {
                    // 模擬執行任務
                    Thread.sleep(1000);
                    System.out.println("線程 " + Thread.currentThread().getName() + " 到達屏障");
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }

                System.out.println("所有線程都到達屏障,一起繼續執行");
            }).start();
        }
    }

對應的我們給出相應輸出示例:

2. CyclicBarrier 與CountDownLatch區別(重點)

  • CountDownLatch用戶事件即主要是業務流程上的控制并不是針對線程,CyclicBarrier 循環柵欄作用于線程,如上代碼必須等待線程到齊后觸發。
  • 循環柵欄可重復使用,CountDownLatch則不能。

五、小結

通過本次對Java并發流程工具的實戰探索,我們對Java并發編程領域有了更為深入且全面的認知。 從CountDownLatch到CyclicBarrier,再到Semaphore和Exchanger等工具,每一個都在多線程協作場景中有著獨特的用途。

  • CountDownLatch如同倒計時器,能讓一組線程等待某個特定事件完成后再繼續執行;
  • CyclicBarrier則像聚會的召集者,使多個線程在特定點上匯聚,然后一起繼續前行;
  • Semaphore猶如資源的守護者,精確控制著對有限資源的訪問;
  • Exchanger為兩個線程之間的數據交換提供了安全高效的通道。

在實際的代碼實踐中,我們看到這些工具如何巧妙地解決多線程協作中復雜的同步和通信問題,極大地提高了程序的并發處理能力和性能。不僅如此,我們還學會了根據不同的業務場景,如任務并行化、資源管理、數據交換等,選擇最合適的并發流程工具,以實現最優的解決方案。

然而,Java并發編程是一個廣闊且復雜的領域,這些工具在帶來便利的同時,也要求我們對線程安全、資源競爭等問題保持高度警惕。在使用過程中,必須深入理解其原理和潛在風險,確保代碼的正確性和穩定性。

希望本次的探索能為你在Java并發編程的道路上點亮一盞明燈,在未來面對各種并發挑戰時,你能夠熟練運用這些工具,編寫出高效、可靠且易于維護的多線程程序,為構建更強大、更具競爭力的軟件系統奠定堅實的基礎。

責任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關推薦

2025-05-12 08:24:04

高并發流量系統

2025-06-17 09:32:15

2023-05-15 08:12:38

2023-10-18 09:27:58

Java編程

2019-11-12 09:32:35

高并發流量協議

2022-08-04 20:41:42

高并發流量SQL

2025-03-24 09:57:19

2025-05-28 02:20:00

2024-09-29 11:07:46

2023-12-04 13:48:00

編 程Atomic

2025-06-06 10:01:25

2024-07-02 11:32:38

2021-10-08 08:55:23

FacebookBGP工具

2024-01-31 08:50:41

Guava并發工具

2024-04-07 00:04:00

Go語言Map

2020-11-30 16:01:03

Semaphore

2020-12-03 11:15:21

CyclicBarri

2019-06-26 07:11:35

Java流程監控開發

2023-08-25 09:36:43

Java編程

2023-04-09 16:34:49

JavaSemaphore開發
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 成人在线免费观看 | 亚洲视频区 | 欧美淫| 91爱啪啪 | 色婷婷综合久久久中字幕精品久久 | 做a网站 | 亚洲少妇综合网 | a级大片免费观看 | 天天久久 | 一区二区三区国产精品 | 隔壁老王国产在线精品 | 观看av | 国产精品国产a | 亚洲欧美视频 | 欧美性生活免费 | 日韩影院在线观看 | 99久久久久久久 | 久久99精品久久久久婷婷 | 国产高清视频一区 | 91黄在线观看 | 99精品网| 色免费视频 | 久久久久国产精品一区二区 | 在线看av的网址 | 爱爱视频日本 | 99精品久久99久久久久 | 国产欧美精品区一区二区三区 | 精品欧美一区免费观看α√ | 亚洲精品资源 | 久久伦理电影 | 国产精品久久777777 | 久久亚洲一区 | 日韩三片 | 中文字幕av在线一二三区 | 欧洲亚洲一区二区三区 | av在线天堂网| 美女久久| 国产一区二区日韩 | 成人免费网站www网站高清 | 成人一区二区在线 | 久久久久久久综合色一本 |