看看面包超人的 '招牌線程池' 用得可還行?
本文轉載自微信公眾號「Shooter茶杯」,作者Shooter 。轉載本文請聯系Shooter茶杯公眾號。
本文主要是介紹線程池的一些進階玩法 。
面包超人鎮樓
1、線程池簡簡單單 4 連發
- 1、線程池的核心線程數怎么設置?
- 2、8C16G 的機器需要幾臺可以抗起 3W 的qps?
- 3、如何動態的修改線程池參數?
- 4、線程池可以先啟動最大線程數再將任務放到阻塞隊列里么?
后面的舉例的機器配置統一是 8核16G !
2、線程池的核心線程數到底怎么設置?首先說個不太正確的答案:
IO 密集型的設置為 2n, 計算密集型設置為 n+1
為什么不對?因為核心線程數設置多少要具體情況具體分析,大家使用線程池的業務場景不同,解決方案自然是不一樣的,下面我舉個例子做詳細的分析,然后總結出一個方法論就可以適用各個不同的場景了!!!
舉例:
- 1、假設現在要給 100w 用戶發放優惠券,通過線程池異步發送
- 2、假設某線程池執行發優惠券的任務共耗時 50ms,其中 45ms 在io, 5ms 在進行計算
(真正的 io 耗時 計算耗時可以通過 記錄log 判斷時間差值計算出來 取平均值即可 )
3、如何設置線程池的參數快速的將這 100w 張券發完?
先拋出答案公式,再論證這個公式的正確性:
核心線程數 = CPU核數 * ((Io耗時 / 計算耗時) + 1)
核心線程數 = 8C * ((45ms / 5ms) +1 ) = 80個
45ms / 5ms 是什么意思?
CPU 在等待 IO 返回時完全可以將 CPU 時間片拿出來去做其他的計算,45ms 可以多處理 9 個計算任務,再加上原本就有一個 5ms 在計算,也就是說: 一個CPU 核在執行這個 50ms 發券任務時,可以并發的起10個線程去處理任務!那8C CPU 最多同時可以有 8個核心并行的處理任務, 8 * 10 = 80
一秒鐘一個線程可以處理 1000ms / 50ms = 20個任務
可以算出線程池執行任務的峰值 qps = 20 * 80 = 1600
發完100w 張券所需時間: 100w / 1600 = 625S,也就是說大概 10分鐘左右就能發完 100w 張券。
不太正確的結論: 核心線程數在處理這個任務的情況下可以設置為 80 用來極限的壓榨機器CPU 的性能。
what?為什么算出 80 又不正確了?
因為將核心線程數設置為 80,這幾乎吃完了所有的 CPU 時間片, CPU 的負載將會達到 100% ; 試想一下生產環境如果你的機器 CPU 負載是 100% , 慌不慌?(CPU 負載打滿機器不會宕機, 但沒有 CPU 資源來處理用戶的請求,表現為服務假死/機器請求半天無反應)
設置線程池核心線程數要考慮 CPU 的使用要素
- 1、每臺機器操作系統需要消耗一些 CPU 資源; 假設用了 2% 的CPU 資源;
- 2、如果是面向用戶的服務,處理用戶的請求也是要消耗CPU 資源的,可以通過一些監控系統,看看平時 CPU 在繁忙時間段的負載是多少; 假設用了 10% 的資源;
- 3、如果除了發券任務的線程池還有其他線程池在運行,就得把其他線程池消耗的CPU資源也算上,假設用了 13% 的資源;
- 4、實際情況一些中間件框架也會用線程池,也會吃一些CPU 資源, 這里暫不做考慮。
在我的實際項目里有一個專門跑定時任務和消費 MQ 消息的服務:
我需要考慮的點:
- 1、操作系統的CPU 資源, 算占用 2% 的CPU資源
- 2、MQ 消費消息 算占用 5% 的CPU 資源
- 3、有其他的定時任務也在用線程池跑任務 算占用 13% 的CPU 資源
- 4、機器的 CPU 在無人監控的非必要時段不能超過 60%。
60% - 2% - 5% - 13% = 40%
發 100w 張優惠券的線程池就只能消耗 40%的資源于是核心線程數最多可以設置為:
核心線程數: 80個 * 40% = 32個;
CPU 100% 時可以設置 80個線程去跑任務 CPU 40% 時可以設置 32個線程去跑任務 那這樣設置系統正常運行CPU大概是 60% 左右, 就算偶爾飆高到 70%-80% 也不用太慌~
補充: 為什么用線程池沒考慮上下文的切換?
1ms = 1000us, 一次上下文的切換大概是 1us, 上下文切換的時間跟執行任務的時間比起來可以忽略不計。
結論 : CPU核數 * ((Io耗時 / 計算耗時) + 1)
這是機器 CPU 負載 100% 時極限的值, 乘以期望的 CPU 負載百分比即可算出實際情況最佳的線程數;
PS: 萬一設置錯了核心線程數又不想改代碼重新發布,可以繼續看第三個問題如何動態修改線程池參數!
2、8C16G 的機器需要幾臺可以抗起 3W 的qps?
首先算出單臺機器的 QPS, 3w 除以單臺機器的 qps 即可算出所需的機器數。
想知道單臺機器某個接口的 QPS 很簡單, 壓測即可。
不過顯然面試的時候如果被問這個問題是壓測不了的。
實際上是面試官在考察你對線程池的理解,接著往下看~
假設一個 用戶領券系統的 qps 在3w左右
大部分服務通常的部署在 Tomcat 上, Tomcat 內部也是通過線程來處理用戶的請求,Tomcat 也是通過線程池來管理線程, 實際上算出 Tomcat 實際的并發和理想狀態能支持的的并發就好了。
上個問題分析出來發券接口 50ms 耗時, 8C 的CPU 占用 100%, 不考慮內存 磁盤 網絡等其他開銷, 線程池極限的QPS 是1600, 這里也不考慮有沒有其他線程池或者七七八八的東西消耗 CPU 資源了。假設 CPU 只能維持在 70% 左右的負載;
單臺機器的 qps 就只能有 1600 * 70% = 1120,就算 1100
3w / 1100 = 27.27 向上取整 大概需要 28 臺機器。
作為一個有經驗的開發人員實際部署的時候絕對要多擴容幾臺服務器來兜底, 推薦部署 32 - 36 臺機器分兩個集群部署。
3、如何動態的修改線程池參數?為什么需要動態的修改線程池參數呢?
比如第一個發券任務發 100w 張券需要 10 分鐘, 假設今天突然要發 200w 張券了, 多了100w 的發券任務,也不想用其他手段來解決了, 且機器的 CPU 負載很低只有 1% ; (為了強行舉例修改線程池參數費盡苦心)
看到第一個和第二個問題,想必你也收獲了如下信息:
使用 8C16G 的機器發放 100w 張優惠券, 處理每個優惠券任務耗時 50ms , 其中 45ms在IO , 5ms 在計算, 核心線程數設置為 32, CPU 負載到 40% 左右, 10分鐘可以把優惠券發完。
如果想發 200w 張券, 最快的方法是將 核心線程數 32 設置為 64, CPU 負載在 80% 左右。
如何動態的修改線程池參數呢?
JDK 的 ThreadPoolExecutor 提供了修改線程池參數的 API
- ThreadPoolExecutor.setCorePoolSize // 修改核心線程數
- ThreadPoolExecutor.setMaximumPoolSize // 修改最大線程數
- ThreadPoolExecutor.setKeepAliveTime // 修改空閑線程存活時間
- ThreadPoolExecutor.setRejectedExecutionHandler // 修改拒絕策略
- ThreadPoolExecutor.setThreadFactory // 修改線程工廠
(不可直接修改阻塞隊列大小,想達到修改阻塞隊列的效果對線程池做一些封裝即可)
- 1、首先將線程池定義為一個 Bean 對象;
- @Bean("refreshLowPriceExecutor")
- public ThreadPoolExecutor refreshLowPriceExecutor() {
- final BlockingQueue<Runnable> queue = new LinkedBlockingDeque<>(1000000);
- final int corePoolSize = 20;
- final int maximumPoolSize = 100;
- final int keepAliveTime = 200;
- ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, queue);
- return executorService;
- }
- 2、可以通過分布式配置 or controller接口 or 數據庫觸發修改線程的各個參數, 推薦使用分布式配置(各種用法大同小異):
- private Map<String, String> config;
- @QMapConfig("config.properties")
- private void getValueChange(Map<String, String> config) {
- refreshLowPriceExecutor.setCorePoolSize(Integer.valueOf(config.get("core_size")));
- refreshLowPriceExecutor.setMaximumPoolSize(Integer.valueOf(config.get("max_size")));
- System.out.println("當前核心線程數為 :" + refreshLowPriceExecutor.getCorePoolSize());
- System.out.println("當前最大線程數為 :" + refreshLowPriceExecutor.getMaximumPoolSize());
- this.config = config;
- }
- 3、改了核心線程數,線程池是如何讓線程數立即生效的?
- public void execute(Runnable command) {
- // 省略注釋/非核心代碼
- int c = ctl.get();
- // 線程池執行任務的處理邏輯主要分三步
- // 第一步 : 當前線程數小于核心線程數則繼續添加worker創建線程
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- // 第二步 : 當前線程數達到了核心線程數后,將任務放進阻塞隊列
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 第三步 : 隊列滿了就將啟動最大線程數限制的線程, 失敗就將任務交給拒絕策略去處理
- else if (!addWorker(command, false))
- reject(command);
- }
在線程池的核心線程數被修改后,只要有任務繼續添加進線程池,execute 方法就會繼續創建新線程去處理任務,這樣核心線程數就生效了。
- 4、使用 ScheduledThreadPoolExecutor 監控線程池內部狀況
- // 封裝成一個任務
- Runnable runnable = () -> monitorThreadPool();
- public void monitorThreadPool(){
- log.info("核心線程數" + refreshLowPriceExecutor.getCorePoolSize());
- log.info("活躍線程數" + refreshLowPriceExecutor.getActiveCount());
- log.info("最大線程數" + refreshLowPriceExecutor.getMaximumPoolSize());
- log.info("任務數" + refreshLowPriceExecutor.getTaskCount());
- log.info("線程池里的線程數" + refreshLowPriceExecutor.getPoolSize());
- log.info("獲取隊列再獲取隊列任務數" + refreshLowPriceExecutor.getQueue().size());
- }
- // 將任務交給延時線程池
- executor.scheduleAtFixedRate(runnable, initialDelay,period, TimeUnit);
4、線程池可以先啟動最大線程數再將任務放到阻塞隊列里么?
答案是當然可以!
繼續分析線程池三步走的后兩步邏輯
- public void execute(Runnable command) {
- // 省略注釋/非必要代碼
- // 第二步 : 當前線程池正在運行且 阻塞隊列的 offer 方法返回 true
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 第三步 : 啟動大于核心線程數但小于最大線程數個線程, 添加worker失敗就將任務交給拒絕策略去處理
- else if (!addWorker(command, false))
- reject(command);
- }
啟動最大線程數再將任務放到阻塞隊列的訣竅就在 workQueue 的 offer 方法;
我們可以用自己實現的阻塞隊列在重寫 offer 方法; 在 offer 方法中判斷 當前線程數是否大于等于最大線程數,如果不大于就返回 false, 這樣就跳過了 execute 方法的第二步, 來到了第三步的創建最大線程數的邏輯。
看看 dubbo 是怎么做的 , 直接將代碼 copy(白嫖) 過來即可 地址
https://github.com/apache/dubbo/blob/master/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java
- @Override
- public boolean offer(Runnable runnable) {
- if (executor == null) {
- throw new RejectedExecutionException("The task queue does not have executor!");
- }
- int currentPoolThreadSize = executor.getPoolSize();
- // 主要是這個邏輯 當前線程數是否小于最大線程數,如果小于返回 false
- // 這樣就可以跳過 execute 方法的第二步, 來到了第三步的創建最大線程數的邏輯。
- // return false to let executor create new worker.
- if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
- return false;
- }
- // currentPoolThreadSize >= max
- return super.offer(runnable);
- }
本文轉載自微信公眾號「Shooter茶杯」,可以通過以下二維碼關注。轉載本文請聯系Shooter茶杯公眾號。