成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Java 線程池知識(shí)點(diǎn)小結(jié)

開發(fā)
本文將深入探討 Java 線程池的原理、實(shí)現(xiàn)方式及其優(yōu)化技巧,旨在幫助開發(fā)者更好地理解和利用這一重要機(jī)制,從而構(gòu)建更加高效穩(wěn)定的并發(fā)系統(tǒng)。

在當(dāng)今軟件開發(fā)領(lǐng)域,高性能和高并發(fā)處理能力成為了衡量應(yīng)用程序質(zhì)量的重要標(biāo)準(zhǔn)之一。Java 作為一門廣泛使用的編程語言,在支持多線程和并發(fā)編程方面提供了強(qiáng)大的工具——線程池。本文將深入探討 Java 線程池的原理、實(shí)現(xiàn)方式及其優(yōu)化技巧,旨在幫助開發(fā)者更好地理解和利用這一重要機(jī)制,從而構(gòu)建更加高效穩(wěn)定的并發(fā)系統(tǒng)。

一、詳解線程池核心知識(shí)點(diǎn)

1. 為什么需要線程池

我們可以從性能、資源、安全等角度來回答這個(gè)問題:

  • 提高響應(yīng)速度:從性能角度來說,通過線程池進(jìn)行池化統(tǒng)一管理線程,使用時(shí)直接通過線程池獲取,不再需要手動(dòng)創(chuàng)建線程,響應(yīng)速度大大提高。
  • 降低資源消耗:從資源消耗的角度,由于線程池被池化管理了,我們無需為了某些功能去手動(dòng)創(chuàng)建和銷毀線程,資源消耗自然降低。
  • 便于管理和監(jiān)控:因?yàn)槲覀兊墓ぷ骶€程都來自于線程池中所以對(duì)于線程的監(jiān)控和管理自然方便了許多。

2. 線程池使用示例

接下來我們展示了一個(gè)非常簡(jiǎn)單的demo,創(chuàng)建一個(gè)含有3個(gè)線程的線程,提交3個(gè)任務(wù)到線程池中,讓線程池中的線程池執(zhí)行。 完成后通過shutdown停止線程池,線程池收到通知后會(huì)將手頭的任務(wù)都執(zhí)行完,再將線程池停止,所以筆者這里使用isTerminated判斷線程池是否完全停止了。只有狀態(tài)為terminated才能說明線程池關(guān)閉了,結(jié)束循環(huán),退出方法。

//創(chuàng)建含有3個(gè)線程的線程池
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        //提交3個(gè)任務(wù)到線程池中
        for (int i = 0; i < 3; i++) {
            finalint taskNo = i;
            threadPool.execute(() -> {
                log.info("執(zhí)行任務(wù){(diào)}", taskNo);
            });
        }

        //關(guān)閉線程池
        threadPool.shutdown();
        //如果線程池還沒達(dá)到Terminated狀態(tài),說明線程池中還有任務(wù)沒有執(zhí)行完,則繼續(xù)循環(huán)等待線程池執(zhí)行完任務(wù)
        while (!threadPool.isTerminated()) {

        }

對(duì)應(yīng)輸出結(jié)果如下:

10:38:51.993 [pool-1-thread-3] INFO com.sharkChili.Main - 執(zhí)行任務(wù)2
10:38:51.993 [pool-1-thread-2] INFO com.sharkChili.Main - 執(zhí)行任務(wù)1
10:38:51.993 [pool-1-thread-1] INFO com.sharkChili.Main - 執(zhí)行任務(wù)0

3. 詳解線程池核心參數(shù)

我們上文通過Executors框架創(chuàng)建了線程池,它底層是通過ThreadPoolExecutor完成線程池的創(chuàng)建:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

可以看到ThreadPoolExecutor的構(gòu)造方法包含下面幾個(gè)參數(shù),它們分別是:

  • corePoolSize:核心線程數(shù),即時(shí)空閑也會(huì)保留在線程池中的線程。
  • maximumPoolSize:線程池允許創(chuàng)建的最大線程數(shù),例如配置為10,那么線程池中最大的線程數(shù)就為10。
  • keepAliveTime:核:心線程數(shù)以外的線程的生存時(shí)間,例如corePoolSize為2,maximumPoolSize為5,假如我們線程池中有5個(gè)線程,核心線程以外有3個(gè),這3個(gè)線程如果在keepAliveTime的時(shí)間內(nèi)沒有被用到就會(huì)被回收。
  • unit:keepAliveTime的時(shí)間單位。
  • workQueue:當(dāng)核心線程都在忙碌時(shí),任務(wù)都會(huì)先放到隊(duì)列中。
  • threadFactory:線程工廠,用戶可以通過這個(gè)參數(shù)指定創(chuàng)建線程的線程工廠。
  • handler:當(dāng)線程池?zé)o法接受新的任務(wù)時(shí),就會(huì)根據(jù)這個(gè)參數(shù)做出拒絕策略,默認(rèn)拒絕策略是直接拋異常。

對(duì)應(yīng)的構(gòu)造方法如下所示:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        //......
    }

4. 線程池的工作流程

從ThreadPoolExecutor的execute方法我們大體可以窺探到其內(nèi)部核心邏輯:

  • 如果工作的線程小于核心線程數(shù),則調(diào)用addWorker創(chuàng)建線程并執(zhí)行我們傳入的任務(wù)。
  • 如果核心線程都在工作,則調(diào)用workQueue.offer(command)將我們提交的任務(wù)放到隊(duì)列中。
  • 如果隊(duì)列也無法容納任務(wù)時(shí),則繼續(xù)創(chuàng)建線程并用這些線程處理新進(jìn)來的任務(wù)。
  • 如果還有新的任務(wù)接入且當(dāng)線程數(shù)達(dá)到maximumPoolSize時(shí),說明已經(jīng)無法容納任務(wù)了,則調(diào)用reject(command)按照拒絕策略處理任務(wù)。

對(duì)應(yīng)的我們給出execute的源碼核心邏輯,讀者可自行參閱:

public void execute(Runnable command) {
        //......
        //工作線程數(shù)小于核心線程數(shù),則創(chuàng)建線程線程處理傳入的任務(wù)
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        
        if (isRunning(c) && workQueue.offer(command)) {//核心線程都在工作則將任務(wù)存入阻塞隊(duì)列
           //......
        }
        elseif (!addWorker(command, false))//隊(duì)列無法容納則繼續(xù)創(chuàng)建線程應(yīng)急處理,如果創(chuàng)建失敗說明當(dāng)前線程超過maximumPoolSize,則調(diào)用reject按照拒絕策略處理任務(wù)
            reject(command);
    }

5. 線程池的幾種狀態(tài)

ThreadPoolExecutor通過2的次冪并結(jié)合位運(yùn)算標(biāo)識(shí)了幾種線程的狀態(tài),這種做法也是計(jì)算機(jī)程序設(shè)計(jì)中常見的實(shí)用技巧,即通過位運(yùn)算標(biāo)識(shí)替代常規(guī)數(shù)值比對(duì),保證狀態(tài)唯一性的同時(shí)還能保證程序的執(zhí)行效率,對(duì)應(yīng)的線程池狀態(tài)和注釋如下:

//RUNNING 說明線程正處于運(yùn)行狀態(tài),正在處理任務(wù)和接受新的任務(wù)進(jìn)來
privatestaticfinalint RUNNING    = -1 << COUNT_BITS;
//說明線程收到關(guān)閉的通知了,繼續(xù)處理手頭任務(wù),但不接受新任務(wù)
    privatestaticfinalint SHUTDOWN   =  0 << COUNT_BITS;
    //STOP說明線程停止了不處理任務(wù)也不接受任務(wù),即時(shí)隊(duì)列中有任務(wù),我們也會(huì)將其打斷。
    privatestaticfinalint STOP       =  1 << COUNT_BITS;
    //表明所有任務(wù)都已經(jīng)停止,記錄的任務(wù)數(shù)量為0
    privatestaticfinalint TIDYING    =  2 << COUNT_BITS;
    //線程池完全停止了
    privatestaticfinalint TERMINATED =  3 << COUNT_BITS;

6. 線程池的幾種拒絕策略

  • AbortPolicy:這個(gè)拒絕策略在無法容納新任務(wù)的時(shí)候直接拋出異常,這種策略是線程池默認(rèn)的拒絕策略。
public static class AbortPolicy implements RejectedExecutionHandler {
        
        public AbortPolicy() { }

     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
  • CallerRunsPolicy:從源碼中可以看出,當(dāng)線程池?zé)o法容納新任務(wù)的時(shí),會(huì)直接將當(dāng)前任務(wù)交給調(diào)用者執(zhí)行。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
        
      

      
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
         //......
         //讓當(dāng)前提交任務(wù)的線程運(yùn)行
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
  • DiscardOldestPolicy:顧名思義,當(dāng)線程池?zé)o法最新任務(wù)時(shí),會(huì)將隊(duì)首的任務(wù)丟棄,將新任務(wù)存入。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
      
        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
             //將隊(duì)首元素poll掉,并將當(dāng)前任務(wù)提交
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

DiscardPolicy:從源碼中可以看出這個(gè)策略什么也不做,相當(dāng)于直接將當(dāng)前任務(wù)丟棄。

public static class DiscardPolicy implements RejectedExecutionHandler {
     
        public DiscardPolicy() { }

      //什么都不做直接即丟棄當(dāng)前任務(wù)
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

6. 線程兩種任務(wù)提交方式

首先是execute,當(dāng)任務(wù)提交到線程池中時(shí)直接按照流程執(zhí)行即可,處理完成后是沒有返回值的源碼上文已經(jīng)給出這里就不多贅述。而submit它會(huì)將傳進(jìn)來的任務(wù)封裝成RunnableFuture,然后將Future返回出去,調(diào)用者可以通過get方法獲取返回結(jié)果:

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        //提交任務(wù)
        execute(ftask);
        //返回Future,后續(xù)我們可以通過get獲取結(jié)果
        return ftask;
    }

對(duì)應(yīng)的我們也給出使用示例:

@Test
    void baseUse() throws ExecutionException, InterruptedException {
        //創(chuàng)建含有3個(gè)線程的線程池
        ExecutorService threadPool = Executors.newFixedThreadPool(3);

        //提交3個(gè)任務(wù)到線程池中
        for (int i = 0; i < 3; i++) {
            finalint taskNo = i;
            Future<Integer> future = threadPool.submit(() -> {
                logger.info("執(zhí)行任務(wù){(diào)}", taskNo);
                return1;
            });
            logger.info("處理結(jié)果:{}", future.get());
        }

        //關(guān)閉線程池
        threadPool.shutdown();
        //如果線程池還沒達(dá)到Terminated狀態(tài),說明線程池中還有任務(wù)沒有執(zhí)行完,則繼續(xù)循環(huán)等待線程池執(zhí)行完任務(wù)
        while (!threadPool.isTerminated()) {

        }
    }

輸出結(jié)果:

00:24:41.204 [pool-1-thread-1] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 執(zhí)行任務(wù)0
00:24:41.208 [main] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 處理結(jié)果:1
00:24:41.209 [pool-1-thread-2] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 執(zhí)行任務(wù)1
00:24:41.209 [main] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 處理結(jié)果:1
00:24:41.209 [pool-1-thread-3] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 執(zhí)行任務(wù)2
00:24:41.209 [main] INFO com.example.javacommonmistakes100.JavaCommonMistakes100ApplicationTests - 處理結(jié)果:1

7. 線程池的關(guān)閉方式

線程池的停止方式有兩種:

  • shutdown:筆者上述代碼示例用的都是這種方式,使用這個(gè)方法之后,我們無法提交新的任務(wù)進(jìn)來,線程池會(huì)繼續(xù)工作,將手頭的任務(wù)執(zhí)行完再停止:
public void shutdown() {
//上鎖
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
         //檢查并設(shè)置狀態(tài),不再接受新任務(wù)
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            //打斷空閑的線程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        //停止線程池
        tryTerminate();
    }
  • shutdownNow:這種停止方式就比較粗暴了,線程池會(huì)直接將手頭的任務(wù)都強(qiáng)行停止,且不接受新任務(wù)進(jìn)來,線程停止立即生效:
public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        //上鎖
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
         //設(shè)置狀態(tài)為stop強(qiáng)行停止
            checkShutdownAccess();
            advanceRunState(STOP);
            //打斷空閑線程
            interruptWorkers();
            //移除隊(duì)列中的任務(wù)
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        //停止線程池
        tryTerminate();
        return tasks;
    }

8. 非核心線程創(chuàng)建與調(diào)度饑餓問題

了解了線程池整體工作原理后,讀者是否想過,為什么先要用corePoolSize核心線程,然后當(dāng)核心線程處理不過來時(shí)將異步任務(wù)先放到workQueue中,而不是直接開maximumPoolSize的線程數(shù)繼續(xù)處理應(yīng)急任務(wù)呢?如果按照當(dāng)前的線程池執(zhí)行流程不就存在一個(gè)饑餓問題?即后來的任務(wù)可能會(huì)比存在于隊(duì)列中的任務(wù)先執(zhí)行:

其實(shí)回答這個(gè)問題,其實(shí)我們可以通過反證法來解釋,假設(shè)任務(wù)處理不過來之后,直接創(chuàng)建maximumPoolSize個(gè)線程處理任務(wù),那么就會(huì)存在以下幾個(gè)問題:

  • 僅僅因?yàn)楹诵木€程數(shù)處理不來任務(wù)就認(rèn)為是應(yīng)急情況,這會(huì)導(dǎo)致應(yīng)急線程被提前創(chuàng)建,這就可能存在頻繁創(chuàng)建和銷毀線程的性能損耗,例如核心線程為4,剛剛好某個(gè)時(shí)間段剛剛好來個(gè)5個(gè)異步任務(wù),僅僅因?yàn)槎嗔艘粋€(gè)任務(wù),在沒有任何緩沖的情況下,直接創(chuàng)建應(yīng)急線程然后被銷毀,這就會(huì)導(dǎo)致這種不合理的性能損耗。
  • 資源消耗:創(chuàng)建完最大線程之后,線程有可能處于空閑中,這也不能意味著線程沒有任何開銷,一旦線程被啟動(dòng)對(duì)于CPU、內(nèi)存而言都是存在一定的資源開銷的,如果maximumPoolSize線程數(shù)過大,對(duì)于系統(tǒng)資源占用也是非常不劃算的。

總的來說,設(shè)計(jì)者們認(rèn)為只有緩沖區(qū)處理不來(隊(duì)列容納不下)的情況下才能開啟應(yīng)急線程是一種對(duì)于應(yīng)急情況的判斷依據(jù),由此避免了非應(yīng)急情況創(chuàng)建應(yīng)急線程的開銷:

//存入阻塞隊(duì)列失敗后,才會(huì)嘗試調(diào)用addWorker開啟非核心線程,即通過阻塞隊(duì)列的閾值來作為應(yīng)急情況判斷的依據(jù)
 if (isRunning(c) && workQueue.offer(command)) {
          //......
        }
 else if (!addWorker(command, false))//如果非核心線程開啟失敗,則執(zhí)行拒絕策略
            reject(command);

再來回答另外一個(gè)問題即任務(wù)饑餓問題,,這確實(shí)會(huì)存在一定情況下存在饑餓問題,但筆者認(rèn)為該問題只要線程池參數(shù)設(shè)定得當(dāng),在非核心線程啟動(dòng)之后,這些堆積在阻塞隊(duì)列的任務(wù)在一定時(shí)間后就會(huì)被任意抽身出來的線程從隊(duì)列中取出并處理,對(duì)應(yīng)的我們給出每一個(gè)worker線程的執(zhí)行邏輯即ThreadPoolExecutor的runWorker方法:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //......
        try {
        //從阻塞隊(duì)列中獲取任務(wù)task
            while (task != null || (task = getTask()) != null) {
               //......
                try {
                  //......
                    try {
                    //執(zhí)行任務(wù)
                        task.run();
                    } catch (RuntimeException x) {
                      //......
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    //......
                }
            }
            //......
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

當(dāng)然如果讀者對(duì)于任務(wù)公平有著嚴(yán)格要求同時(shí)系統(tǒng)資源也足夠充分,完全可以考慮通過Executors.newSingleThreadExecutor()這種只有一個(gè)線程的線程輪詢處理阻塞隊(duì)列的任務(wù)模式,來保證異步任務(wù)順序性和公平性:

對(duì)應(yīng)的我們也給出singleThreadExecutor的核心實(shí)現(xiàn),讀者也可以參考源碼了解一下:

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

二、線程池使用注意事項(xiàng)

1. 避免使用Executors的newFixedThreadPool

接下來我們來看看日常使用線程池時(shí)一些錯(cuò)誤示例,為了更好的看到線程池的變化,我們編寫這樣一個(gè)定時(shí)任務(wù)去監(jiān)控線程池的變化。

/**
     * 打印線程池情況
     *
     * @param threadPool
     */
    private void printStats(ThreadPoolExecutor threadPool) {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            log.info("=========================");

            log.info("Pool Size:{}", threadPool.getPoolSize());
            log.info("Active Threads:{}", threadPool.getActiveCount());
            log.info("Number of Tasks  Completed: {}", threadPool.getCompletedTaskCount());
            log.info("Number of Tasks in Queue:{}", threadPool.getQueue().size());

            log.info("=========================");
        }, 0, 1, TimeUnit.SECONDS);


    }

先來看看這樣一段代碼,我們循環(huán)1e次,每次創(chuàng)建這樣一個(gè)任務(wù):生成一串大字符串,休眠一小時(shí)后打印輸出。

@GetMapping("oom1")
    public void oom1() {
        ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
        printStats(threadPool);

        for (int i = 0; i < 1_0000_0000; i++) {
            threadPool.submit(() -> {
                String payload = IntStream.rangeClosed(1, 100_0000)
                        .mapToObj(__ -> "a")
                        .collect(Collectors.joining("")) + UUID.randomUUID().toString();
                try {
                    TimeUnit.HOURS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                log.info(payload);
            });

        }

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }
    }

項(xiàng)目啟動(dòng)后使用jvisualvm監(jiān)控項(xiàng)目的變化:

可以看到此時(shí)CPU使用情況,堆區(qū)、還有線程數(shù)使用情況都是正常的。

然后我們對(duì)剛剛的接口發(fā)起請(qǐng)求:

curl http://localhost:8080/threadpooloom/oom1

我們先來看看控制臺(tái)輸出,可以看到線程數(shù)沒有增加,而隊(duì)列的任務(wù)卻不斷累積。

看看jvisualvm,此時(shí)堆區(qū)內(nèi)存不斷增加,盡管發(fā)生了幾次GC,還是沒有回收到足夠的空間。最終引發(fā)OOM問題。

我們通過源碼來觀察一下newFixedThreadPool的特征,可以看到它的核心線程數(shù)和最大線程數(shù)都是傳進(jìn)來的值,這意味著無論多少個(gè)任務(wù)進(jìn)來,線程數(shù)都是nThreads。如果我們沒有足夠的線程去執(zhí)行的任務(wù)的話,任務(wù)就會(huì)堆到LinkedBlockingQueue中,從源碼中我們也能看出,LinkedBlockingQueue是無界隊(duì)列(底層是通過鏈表實(shí)現(xiàn)的):

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

2. 避免使用Executors的newCachedThreadPool

再來看看第二段代碼,同樣的任務(wù)提交到newCachedThreadPool中,我們看看會(huì)發(fā)生什么。

@GetMapping("oom2")
    public void oom2() {
        ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        printStats(threadPool);

        for (int i = 0; i < 1_0000_0000; i++) {
            threadPool.submit(() -> {
                String payload = IntStream.rangeClosed(1, 100_0000)
                        .mapToObj(__ -> "b")
                        .collect(Collectors.joining("")) + UUID.randomUUID().toString();
                try {
                    TimeUnit.HOURS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                log.info(payload);
            });

        }

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }
    }

先來看看控制臺(tái),可以看到線程數(shù)正在不斷的飆升。

從jvisualvm也能看出堆區(qū)和線程數(shù)也在不斷飆升,最終導(dǎo)致OOM。

#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 32744 bytes for ChunkPool::allocate
# An error report file with more information is saved as:
# F:\github\java-common-mistakes-100\hs_err_pid147400.log

我們來看看newCachedThreadPool源碼,可以看到這個(gè)線程池核心線程數(shù)初始為0,最大線程數(shù)為Integer.MAX_VALUE,而隊(duì)列使用的是SynchronousQueue,所以這個(gè)隊(duì)列等于不會(huì)存儲(chǔ)任何任務(wù)。

這就意味著我們每次提交一個(gè)任務(wù)沒有線程處理的話,線程池就會(huì)創(chuàng)建一個(gè)新的線程去處理這個(gè)任務(wù),1s內(nèi)沒有線程使用就將其銷毀。

我們的連續(xù)1e次循環(huán)提交任務(wù)就會(huì)導(dǎo)致創(chuàng)建1e個(gè)線程,最終導(dǎo)致線程數(shù)飆升,進(jìn)而引發(fā)OOM問題。

public static ExecutorService newCachedThreadPool() {
  //隊(duì)列不可容納元素,最大線程數(shù)設(shè)置為Integer.MAX_VALUE
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

3. 確保你創(chuàng)建線程池的方式線程可以被復(fù)用

我們監(jiān)控發(fā)現(xiàn)某段時(shí)間線程會(huì)不斷飆升,然后急速下降,然后急速上升:

然后我們?cè)诰€程的棧幀中看到SynchronousQueue,大概率有人使用newCachedThreadPool。

最終通過全局搜索看到這樣一段代碼,可以看到這個(gè)工具類每次請(qǐng)求就會(huì)創(chuàng)建一個(gè)newCachedThreadPool給用戶使用。

static class ThreadPoolHelper {

     

        public static ThreadPoolExecutor getThreadPool() {
            return (ThreadPoolExecutor) Executors.newCachedThreadPool();
        }

    
    }

我們?cè)诙ㄎ坏秸{(diào)用出,真想明了了,原來每一次請(qǐng)求都會(huì)創(chuàng)建一個(gè)newCachedThreadPool處理大量的任務(wù),由于newCachedThreadPool回收時(shí)間為1s,所以線程使用完之后立刻就被回收了。

@GetMapping("wrong")
    public String wrong() {
        ThreadPoolExecutor threadPool = ThreadPoolHelper.getThreadPool();
        IntStream.rangeClosed(1, 20).forEach(i -> {
            threadPool.execute(() -> {
                String payload = IntStream.rangeClosed(1, 1000000)
                        .mapToObj(__ -> "a")
                        .collect(Collectors.joining("")) + UUID.randomUUID().toString();
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                log.debug(payload);
            });

        });


        return"ok";
    }

解決方式也很簡(jiǎn)單,我們按需調(diào)整線程池參數(shù),將線程池作為靜態(tài)變量全局復(fù)用即可。

static class ThreadPoolHelper {

        privatestatic ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10,
                50,
                2,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1000),
                new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get());

       

        public static ThreadPoolExecutor getRightThreadPool() {
            return threadPool;
        }
    }

從監(jiān)控來看線程數(shù)正常多了。

4. 仔細(xì)斟酌線程混用策略

我們使用線程池來處理一些異步任務(wù),每個(gè)任務(wù)耗時(shí)10ms左右。

@GetMapping("wrong")
    public int wrong() throws ExecutionException, InterruptedException {
        return threadPool.submit(calcTask()).get();
    }


private Callable<Integer> calcTask() {
        return () -> {
            log.info("執(zhí)行異步任務(wù)");
            TimeUnit.MILLISECONDS.sleep(10);
            return1;
        };
    }

壓測(cè)的時(shí)候發(fā)現(xiàn)性能很差,處理時(shí)間最長(zhǎng)要283ms。

步入線程池發(fā)現(xiàn),線程池的配置如下,只有2個(gè)線程和50個(gè)隊(duì)列。

private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
            2,
            1,
            TimeUnit.HOURS,
            new ArrayBlockingQueue<>(50),
            new ThreadFactoryBuilder().setNameFormat("batchfileprocess-threadpool-%d").get(),
            new ThreadPoolExecutor.CallerRunsPolicy());

查看調(diào)用發(fā)現(xiàn),原來后臺(tái)有一個(gè)處理字符串并將內(nèi)容寫入到文本文件的操作,綜合來看相當(dāng)于一個(gè)計(jì)算型任務(wù),由于這個(gè)任務(wù)不是很經(jīng)常出現(xiàn),所以開發(fā)者就設(shè)置兩個(gè)線程,并且為了讓任務(wù)能夠正確完成,拒絕策略也是使用CallerRunsPolicy,讓多出來的任務(wù)用調(diào)用者線程來執(zhí)行。

@PostConstruct
    public void init() {
        printStats(threadPool);
        new Thread(() -> {
            String payload = IntStream.rangeClosed(1, 100_0000)
                    .mapToObj(__ -> "a")
                    .collect(Collectors.joining(""));
            while (true) {

                threadPool.execute(() -> {
                    try {
                        Files.write(Paths.get("demo.txt"), Collections.singletonList(LocalTime.now().toString() + ":" + payload), UTF_8, CREATE, TRUNCATE_EXISTING);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
//                    log.info("batch file processing done");
                });
            }
        }, "T1").start();
    }

解決方式也很簡(jiǎn)單,上述線程池并不是為我們這種IO密集型任務(wù)準(zhǔn)備的,所以我們單獨(dú)為其劃分一個(gè)線程池出來處理這些任務(wù)。

private ThreadPoolExecutor asyncCalcThreadPool  = new ThreadPoolExecutor(200,
            200,
            1,
            TimeUnit.HOURS,
            new ArrayBlockingQueue<>(50),
            new ThreadFactoryBuilder().setNameFormat("asynccalc-threadpool-%d").get(),
            new ThreadPoolExecutor.CallerRunsPolicy());

    @GetMapping("wrong")
    public int wrong() throws ExecutionException, InterruptedException {
        return asyncCalcThreadPool.submit(calcTask()).get();
    }

經(jīng)過壓測(cè)可以發(fā)現(xiàn)性能明顯上來了:

5. 使用正確的方式提交任務(wù)

假如我們提交給線程池的任務(wù)沒有返回值,我們建議使用execute。

這一點(diǎn)我們不妨看一下這樣一段代碼,該代碼會(huì)循環(huán)提交10個(gè)算術(shù)異常的任務(wù)給線程池??梢钥吹轿覀兲峤坏娜蝿?wù)是沒有返回值的,而我們提交任務(wù)時(shí)卻用到了submit。使用submit提交任務(wù)時(shí),會(huì)返回一個(gè)Future對(duì)象,通過Future對(duì)象我們可以使用get方法阻塞獲取任務(wù)返回結(jié)果。

因?yàn)槲覀兊娜蝿?wù)是沒有返回值的,所以我們提交過程中并沒有通過get方法獲取返回結(jié)果,這就導(dǎo)致了一個(gè)隱患——吞異常。

private static ExecutorService threadPool = Executors.newFixedThreadPool(1);

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            //提交一個(gè)算術(shù)異常的任務(wù)
            threadPool.submit(() -> {
                log.info("開始執(zhí)行運(yùn)算");
                int r = 1 / 0;
                log.info("結(jié)束執(zhí)行運(yùn)算");
            });


        }
        //等待線程池關(guān)閉
        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }
    }

可以看到這段代碼的輸出結(jié)果如下,控制臺(tái)僅僅輸出線程開始工作,卻沒有輸出結(jié)果。

09:15:36.940 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算
09:15:36.942 [pool-1-thread-1] INFO com.sharkchili.Main - 開始執(zhí)行運(yùn)算

這一點(diǎn),我們通過查看FutureTask的run源碼可以得知,F(xiàn)utureTask的run方法執(zhí)行步驟如下:

  • 調(diào)用call方法,執(zhí)行任務(wù)。
  • 得到result后將ran設(shè)置為true。
  • 如果執(zhí)行過程中報(bào)錯(cuò),直接進(jìn)入catch模塊,將result設(shè)置為null,并將ran設(shè)置為false。
  • 調(diào)用setException處理異常。
try {    //執(zhí)行任務(wù),返回一個(gè)結(jié)果賦值給result
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                 //任務(wù)拋出異常后,將result設(shè)置為null,ran狀態(tài)設(shè)置為false,并調(diào)用setException處理異常
                    result = null;
                    ran = false;
                    setException(ex);
                }

步入代碼查看setException我們可以發(fā)現(xiàn),它會(huì)將異常結(jié)果賦值給outcome然后調(diào)用finishCompletion結(jié)束任務(wù),所以如果我們沒有主動(dòng)獲取任務(wù)結(jié)果,那么這個(gè)錯(cuò)誤就永遠(yuǎn)不會(huì)被感知。

protected void setException(Throwable t) {
  //通過cas將結(jié)果設(shè)置為完成(COMPLETING)值為1
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
         //將異常賦值給outcome直接將任務(wù)結(jié)束
            outcome = t;
            //通過cas將結(jié)果設(shè)置為異常(EXCEPTIONAL)值為3
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

針對(duì)上述問題,要想獲取異常也很簡(jiǎn)單,主動(dòng)調(diào)用get獲取結(jié)果即可:

private static ExecutorService threadPool = Executors.newFixedThreadPool(1);

    public static void main(String[] args)  {
        for (int i = 0; i < 10; i++) {
            //提交一個(gè)算術(shù)異常的任務(wù)
            Future<?> future = threadPool.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " do working");
                int r = 1 / 0;
                System.out.println(r);
            });

            try {
                //通過get阻塞獲取任務(wù)結(jié)果
                Object o = future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

        }

        threadPool.shutdown();
        while (!threadPool.isTerminated()){

        }
    }

從輸出結(jié)果可以看到出現(xiàn)異常后,錯(cuò)誤直接拋出,我們就可以及時(shí)調(diào)試處理了。

pool-1-thread-1do working
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
pool-1-thread-1do working
pool-1-thread-1do working
 at java.util.concurrent.FutureTask.report(FutureTask.java:122)
pool-1-thread-1do working
pool-1-thread-1do working
 at java.util.concurrent.FutureTask.get(FutureTask.java:192)
pool-1-thread-1do working
 at com.sharkChili.threadpool.Main.main(Main.java:23)
Caused by: java.lang.ArithmeticException: / by zero
 at com.sharkChili.threadpool.Main.lambda$main$0(Main.java:17)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

為什么調(diào)用get才能捕獲到異常呢?通過查看get源碼可以了解到get方法的執(zhí)行步驟:

  • 獲取任務(wù)執(zhí)行狀態(tài)state。
  • 如果state小于COMPLETING(COMPLETING值為1)說明任務(wù)未完成,則調(diào)用awaitDone等待任務(wù)完成。
  • 如果大于1則說明任務(wù)已完成(),通過上文源碼可知我們的任務(wù)已經(jīng)被CAS設(shè)置為EXCEPTIONAL(值為3),所以直接調(diào)用report。
public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //如果s小于1則說命未完成,調(diào)用awaitDone等待完成,在調(diào)用report
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

查看report代碼我們終于知道原因了,我們?nèi)蝿?wù)執(zhí)行報(bào)錯(cuò)所以s的值為3,小于CANCELLED,所以調(diào)用了最后一段代碼將異常拋出了。

private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

通過上述我們知道的submit使用不當(dāng)可能存在吞異常的情況以及應(yīng)對(duì)辦法,實(shí)際上對(duì)于沒有返回值的任務(wù),我們建議直接使用execute,execute感知異常時(shí)會(huì)直接將任務(wù)拋出:

private static ExecutorService threadPool = Executors.newFixedThreadPool(1);

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            //提交一個(gè)算術(shù)異常的任務(wù)
            threadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " do working");
                int r = 1 / 0;
                System.out.println(r);
            });


        }

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }
    }

從輸出結(jié)果來看,算術(shù)異常直接拋出,被主線程感知了。

pool-1-thread-1do working
Exception in thread "pool-1-thread-1" Exception in thread "pool-1-thread-2" java.lang.ArithmeticException: / by zero
pool-1-thread-2do working
pool-1-thread-3do working
 at com.sharkChili.threadpool.Main.lambda$main$0(Main.java:17)
pool-1-thread-4do working
pool-1-thread-5do working
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
pool-1-thread-6do working
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
pool-1-thread-7do working

通過查看sumbit執(zhí)行源碼,我們可以看到代碼調(diào)用棧會(huì)來到ThreadPoolExecutor的runWorker下面這個(gè)代碼段的邏輯:

  • 調(diào)用run執(zhí)行任務(wù)。
  • afterExecute收尾任務(wù)。
  • 如果感知異常則拋出異常throw x。
  • 所以我們的任務(wù)會(huì)因?yàn)樗阈g(shù)異常而拋出任務(wù)。
try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                     //執(zhí)行算數(shù)任務(wù)
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; thrownew Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }

最終代碼被JVM感知直接將異常拋到控制臺(tái),所以對(duì)于沒有返回值的任務(wù),我們建議使用execute執(zhí)行任務(wù)。

private void dispatchUncaughtException(Throwable e) {
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }

6. 避免任務(wù)頻繁拋出異常

上文提到使用execute提交無返回值的任務(wù),這樣異常就會(huì)被感知,但還需要注意的是頻繁的拋出異常會(huì)讓線程消亡,導(dǎo)致線程池每次執(zhí)行新任務(wù)時(shí)回去創(chuàng)建新的線程。

還是以這段代碼為例,我們對(duì)于算術(shù)異常沒有任務(wù)處理。

private static ExecutorService threadPool = Executors.newFixedThreadPool(1);

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            //提交一個(gè)算術(shù)異常的任務(wù)
            threadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " do working");
                int r = 1 / 0;
                System.out.println(r);
            });


        }

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }
    }

可以看到我們明明只有一個(gè)線程的線程池,每次拋出異常后,都會(huì)創(chuàng)建一個(gè)新的線程處理任務(wù)。

pool-1-thread-1 do working
pool-1-thread-2 do working
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
 at com.sharkChili.threadpool.Main.lambda$main$0(Main.java:17)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Exception in thread "pool-1-thread-2" pool-1-thread-3 do working

這一點(diǎn)我們從源碼中可知,拋出的異常被JVM感知并調(diào)用dispatchUncaughtException方法,該方法會(huì)通過getUncaughtExceptionHandler得到線程組,然后調(diào)用uncaughtException處理異常。

private void dispatchUncaughtException(Throwable e) {
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }

最終代碼會(huì)走到e.printStackTrace打印異常堆棧信息并終止任務(wù),銷毀線程。

public void uncaughtException(Thread t, Throwable e) {
        if (parent != null) {
            parent.uncaughtException(t, e);
        } else {
            Thread.UncaughtExceptionHandler ueh =
                Thread.getDefaultUncaughtExceptionHandler();
            if (ueh != null) {
                ueh.uncaughtException(t, e);
            } elseif (!(e instanceof ThreadDeath)) {
                System.err.print("Exception in thread \""
                                 + t.getName() + "\" ");
                e.printStackTrace(System.err);
            }
        }
    }

所以我們建議,對(duì)于線程池的中的任務(wù)盡可能不要用異常來處理邏輯,對(duì)于可以預(yù)見的異常,我們建議手動(dòng)處理返回,避免線程銷毀再創(chuàng)建的開銷。

以我們的算術(shù)異常為例,我們可以提前判斷一下被處數(shù)提前用業(yè)務(wù)手段處理掉異常。

private static ExecutorService threadPool = Executors.newFixedThreadPool(1);

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            //提交一個(gè)算術(shù)異常的任務(wù)
            threadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " do working");
                //手動(dòng)處理業(yè)務(wù)代碼的異常
                int num= RandomUtil.randomInt(0,10);
                if (num==0){
                    System.out.println("The dividend cannot be zero. ");
                    return;
                }
                int r = 1 / num;
                System.out.println(r);
            });


        }

        threadPool.shutdown();
        while (!threadPool.isTerminated()) {

        }
    }

三、小結(jié)

總結(jié)一下上述線程池的使用經(jīng)驗(yàn):

  • 避免使用Executors創(chuàng)建線程池。
  • 確保線程確實(shí)被服用到。
  • 使用合適的方式提交任務(wù)以及及時(shí)處理任務(wù)中的異常。
  • 確保在合適的場(chǎng)景使用合適的線程池:
CPU密集型:若是CPU密集型,我們希望多利用CPU資源來處理任務(wù),因?yàn)闆]有任何IO,理想情況線程數(shù)=CPU核心數(shù)即可,但是考慮到可能回出現(xiàn)某個(gè)意外情況導(dǎo)致線程阻塞,所以我們建議線程數(shù)=CPU核心數(shù)+1

IO密集型:IO密集型由于每個(gè)任務(wù)可能回出現(xiàn)IO導(dǎo)致任務(wù)阻塞,在單核情況下,我們建議:

線程數(shù)=IO時(shí)長(zhǎng)/CPU計(jì)算耗時(shí)+1

若在多核的情況下,我們建議

線程數(shù)=CPU核心數(shù) * (IO時(shí)長(zhǎng)/CPU計(jì)算耗時(shí)+1)
但是具體情況還要具體結(jié)合壓測(cè)結(jié)果進(jìn)行響應(yīng)調(diào)整。


責(zé)任編輯:趙寧寧 來源: 寫代碼的SharkChili
相關(guān)推薦

2025-05-19 10:00:00

MySQL數(shù)據(jù)庫InnoDB

2025-05-08 10:25:00

Netty網(wǎng)絡(luò)編程框架

2009-12-18 17:34:38

Ruby線程

2021-04-19 08:35:44

PythonPython語言Python基礎(chǔ)

2021-01-18 10:33:53

Java反射模塊

2025-05-13 08:10:00

MySQL二進(jìn)制日志binlog

2016-05-30 17:31:34

Spring框架

2010-08-17 14:56:00

HCNE認(rèn)證

2011-04-15 12:25:21

BGP路由

2021-04-13 08:25:12

測(cè)試開發(fā)Java注解Spring

2010-07-27 15:49:28

Flex

2009-08-06 17:42:32

C#知識(shí)點(diǎn)

2010-06-17 16:42:04

UML

2010-08-18 10:52:46

Linux筆試

2010-09-02 10:11:11

華為認(rèn)證

2020-10-07 15:15:41

Python

2020-11-06 00:50:16

JavaClassLoaderJVM

2021-05-17 06:02:58

Css前端CSS 特效

2009-12-23 16:35:12

Linux系統(tǒng)光驅(qū)軟

2009-08-05 09:22:43

C#調(diào)用VC DLL
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 日日操夜夜操视频 | 91精品国产一二三 | 国产成人网 | 成人亚洲视频 | 亚洲一区二区三区久久 | 中文字幕亚洲精品 | a级免费视频| 国产伊人精品 | 国产一区二区av | 二区成人 | 日韩在线观看网站 | 九九亚洲 | 色www精品视频在线观看 | 91欧美激情一区二区三区成人 | 丝袜久久| 日韩视频在线免费观看 | 91香蕉视频在线观看 | 成人 在线| 成人av播放 | 国产一区二区三区在线看 | 欧美激情精品久久久久久变态 | 二区视频 | 看黄在线 | 亚洲精品一区在线 | 国产一区二区在线视频 | 久久久久久亚洲 | 毛片一区二区三区 | 不卡视频一区二区三区 | 亚洲1区| 天天影视亚洲综合网 | 91精品国产91久久久久久吃药 | 欧美日韩亚洲一区二区 | 国精日本亚洲欧州国产中文久久 | 一级黄色片网站 | 日韩二三区 | 欧美精品一区二区三区在线播放 | 综合久久久 | 亚洲精品久久久久久下一站 | 欧美激情一区二区三级高清视频 | 午夜视频在线播放 | 欧美色综合|