線程池 ThreadPoolExecutor 為了提供擴展,提供了兩個方法 beforeExecute 和 afterExecute,每個任務執行前后都會調用這兩個方法,相當于對線程任務的執行做了一個切面。
?監控線程池:執行超時、等待超時;執行超時數量、等待超時數量;
擴展線程池 ThreadPoolExecutor 的兩個方法 beforeExecute 和 afterExecute
自定義Runnable 記錄關鍵節點時間
關鍵時間節點參數:
- 任務創建(提交)時間:submitTime
- 任務開始執行時間:startExeTime
- 任務結束執行時間:endExeTime
- 任務在隊列等待時間:任務開始執行時間 - 任務創建(提交)時間
- 任務執行總時間:任務結束執行時間 - 任務開始執行時間
源碼分析
線程池 ThreadPoolExecutor 為了提供擴展,提供了兩個方法 beforeExecute 和 afterExecute,每個任務執行前后都會調用這兩個方法,相當于對線程任務的執行做了一個切面。
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* @param t 執行任務的線程
* @param
protected void beforeExecute(Thread t, Runnable r){ }
/**
* @param r 將要被執行的任務
* @param
protected void afterExecute(Runnable r, Throwable t){ }
}
源碼執行邏輯:

線程池擴展代碼:
public class ThreadPoolExpandTest {
// 定義線程池
public static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
2,
4,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(5),
new ThreadPoolExecutor.DiscardOldestPolicy()
){
@Override
/**
* @param t 執行任務的線程
* @param
protected void beforeExecute(Thread t, Runnable r){
System.out.println("beforeExecute將要被執行");
}
/**
* @param r 將要被執行的任務
* @param
@Override
protected void afterExecute(Runnable r, Throwable t){
System.out.println("afterExecute已經執行完畢");
}
};
public static void main(String[] args){
poolExecutor.execute(()->{
System.out.println("任務執行");
});
}
}
運行結果:
beforeExecute執行
任務執行
afterExecute執行
總結:從測試代碼可以看出,通過擴展線程池參數可以進行任務執行的監控。
自定義Runnable
通過自定義Runnable,記錄任務執行的一些時間:
public class DynamicRunnable implements Runnable{
/**
* runnable
*/
private final Runnable runnable;
/**
* 任務創建(提交)時間
*/
private final Long submitTime;
/**
* 任務開始執行時間
*/
private Long startExeTime;
public DynamicRunnable(Runnable runnable){
this.runnable = runnable;
submitTime = System.currentTimeMillis();
}
@Override
public void run(){
runnable.run();
}
public Long getSubmitTime(){
return submitTime;
}
public void setStartExeTime(Long startExeTime){
this.startExeTime = startExeTime;
}
public Long getStartExeTime(){
return startExeTime;
}
}
繼承線程池+自定義Runnable
核心參數:
/**
* 執行超時,單位(毫秒)
*/
private long runTimeout;
/**
* 等待超時,單位(毫秒)
*/
private long queueTimeout;
/**
* 執行超時數量
*/
private final AtomicInteger runTimeoutCount = new AtomicInteger();
/**
* 等待超時數量
*/
private final AtomicInteger queueTimeoutCount = new AtomicInteger();
重寫ThreadPoolExecutor方法:
@Override
public void execute(Runnable command){
if (runTimeout > 0 || queueTimeout > 0) {
// 記錄任務提交時間
command = new DynamicRunnable(command);
}
super.execute(command);
}
@Override
protected void beforeExecute(Thread t, Runnable r){
if (!(r instanceof DynamicRunnable)) {
super.beforeExecute(t, r);
return;
}
DynamicRunnable runnable = (DynamicRunnable) r;
long currTime = System.currentTimeMillis();
if (runTimeout > 0) {
// 記錄任務開始執行時間
runnable.setStartExeTime(currTime);
}
if (queueTimeout > 0) {
// 任務開始執行時間 - 任務創建(提交)時間
long waitTime = currTime - runnable.getSubmitTime();
if (waitTime > queueTimeout) {
log.error("{} execute queue timeout waitTime: {}ms", this.getThreadPoolName(),waitTime);
}
}
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t){
if (runTimeout > 0) {
DynamicRunnable runnable = (DynamicRunnable) r;
// 任務執行總時間:任務結束執行時間 - 任務開始執行時間
long runTime = System.currentTimeMillis() - runnable.getStartExeTime();
if (runTime > runTimeout) {
runTimeoutCount.incrementAndGet();
log.error("{} execute, run timeout runTime: {}ms", this.getThreadPoolName(), runTime);
}
}
super.afterExecute(r, t);
}