響應時間是接口監控的黃金指標之一:假設接口接收請求的時間是t1,接口處理完請求,響應的時間是t2,則接口響應時間是:t2-t1,將響應時間指標接入監控報警系統,當響應時間大于閾值的時候則進行報警;但是在線程被阻塞的情況下,由于接口一直沒有返回,響應時間也就無法監控到。
背景介紹
在過去處理過的服務故障中,有一類比較典型的場景是業務線程被阻塞(造成阻塞的原因也是多種多樣),慢慢導致業務線程池中的全部線程被阻塞,最終造成無法對外提供服務(現象則是CPU、Load、內存等指標都比較低,請求接口后響應超時或者沒有響應)。
問題分析
響應時間是接口監控的黃金指標之一:假設接口接收請求的時間是t1,接口處理完請求,響應的時間是t2,則接口響應時間是:t2-t1,將響應時間指標接入監控報警系統,當響應時間大于閾值的時候則進行報警;但是在線程被阻塞的情況下,由于接口一直沒有返回,響應時間也就無法監控到。
阻塞的線程往往是業務線程,這些業務線程可能是:
- 基于tomcat提供http服務的tomcat線程,線程名類似:http-nio-8080-exec-1
- 基于RocketMQ的消息消費者線程,線程名類似:ConsumeMessageThread_1
- 基于HSF Provider的線程,線程名類似:HSFBizProcessor-DEFAULT-12-thread-3
- … …
如果我們能夠在這些業務線程執行的必經路徑上進行攔截,那么就能記錄下線程開始執行的時間,同時啟動定時器不斷檢查線程已執行的時間,當已執行時間大于設定的閾值則打印出線程棧進行報警;當線程正常返回則刪除該線程記錄,所以需要解決的主要是兩個問題:
解決思路
通過問題分析,可以確定主要需要解決以下兩個問題
檢測阻塞線程
該模塊主要做三件事:
- 業務線程開始執行的時候,進行線程注冊
- 業務線程結束執行或拋異常的時候,刪除線程注冊信息
- 定時檢測注冊的線程是否發生阻塞,如果發生阻塞則打印線程棧
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockedThreadChecker {
protected final static Log logger = LogFactory.getLog(BlockedThreadChecker.class);
private static volatile BlockedThreadChecker instance;
private final static int DELAY = 10;
private final static int PERIOD = 1000;
private ScheduledThreadPoolExecutor executor;
private final Map<Thread, Task> threads = new ConcurrentHashMap<>();
private BlockedThreadChecker(){
logger.info("init BlockedThreadChecker... ...classloader:" + this.getClass().getClassLoader() + ",parent classloader:" + this.getClass().getClassLoader().getParent());
int coreSize = Runtime.getRuntime().availableProcessors();
ThreadFactory threadFactory = new ThreadFactory() {
final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r){
Thread thread = new Thread(r, "BlockThreadCheckerTimer-" + counter.incrementAndGet());
thread.setDaemon(true);
return thread;
}
};
executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run(){
long now = System.currentTimeMillis();
for(Map.Entry<Thread,Task> entry : threads.entrySet()){
long execStart = entry.getValue().startTime;
long dur = now - execStart;
if(dur >= entry.getValue().maxExecTime){
BlockedThreadException e = new BlockedThreadException(entry.getKey().getName() + " has been blocked " + dur + " ms");
e.setStackTrace(entry.getKey().getStackTrace());
logger.error(e.getMessage(),e);
}
}
}
},DELAY,PERIOD, TimeUnit.MILLISECONDS);
}
public static BlockedThreadChecker getInstance(){
if(instance != null){
return instance;
}
synchronized (BlockedThreadChecker.class){
if(instance != null){
return instance;
}
instance = new BlockedThreadChecker();
}
return instance;
}
public void registerThread(Thread thread){
registerThread(thread, new Task());
}
public void registerThread(Thread thread,Task task){
threads.put(thread, task);
logger.info("registerThread " + thread.getName());
}
public void unregisterThread(Thread thread){
threads.remove(thread);
logger.info("unregisterThread " + thread.getName());
}
class Task {
long startTime = System.currentTimeMillis();
long maxExecTime = 10000L;
}
}
攔截線程
方案一
服務中幾種常見業務線程:
- 基于tomcat提供http服務的tomcat線程,通過實現自定義Filter,在Filter中完成線程的注冊和取消注冊操作;
- 基于RocketMQ的消息消費者線程,根據業務需求統一實現MessageListenerConcurrently、MessageListenerOrderly等,在統一實現類中完成線程的注冊和取消注冊;
- 基于HSF Provider的線程,通過實現自定義Filter,在Filter中完成線程的注冊和取消注冊操作。
該方案實現簡單,但是對于業務侵入性比較強,侵入性強意味著業務在意識不到問題的時候,沒有改變的動力。
方案二
基于jvm-sandbox實現自定義module,實現思路如下:
import com.alibaba.jvm.sandbox.api.Information;
import com.alibaba.jvm.sandbox.api.LoadCompleted;
import com.alibaba.jvm.sandbox.api.Module;
import com.alibaba.jvm.sandbox.api.listener.ext.Advice;
import com.alibaba.jvm.sandbox.api.listener.ext.AdviceListener;
import com.alibaba.jvm.sandbox.api.listener.ext.EventWatchBuilder;
import com.alibaba.jvm.sandbox.api.resource.ModuleEventWatcher;
import org.kohsuke.MetaInfServices;
import sun.misc.Unsafe;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.util.Properties;
@MetaInfServices(Module.class)
@Information(id = "blocked-thread-module", version = "0.0.1", author = "yuji")
public class BlockedThreadModule implements Module, LoadCompleted {
@Resource
private ModuleEventWatcher moduleEventWatcher;
private AdviceListener adviceListener = new AdviceListener() {
@Override
protected void before(Advice advice) throws Throwable {
if (!advice.isProcessTop()) {
return;
}
BlockedThreadChecker.getInstance().registerThread(Thread.currentThread());
}
@Override
protected void afterReturning(Advice advice){
if (!advice.isProcessTop()) {
return;
}
BlockedThreadChecker.getInstance().unregisterThread(Thread.currentThread());
}
@Override
protected void afterThrowing(Advice advice){
if (!advice.isProcessTop()) {
return;
}
BlockedThreadChecker.getInstance().unregisterThread(Thread.currentThread());
}
};
@Override
public void loadCompleted(){
new EventWatchBuilder(moduleEventWatcher)
.onClass("javax.servlet.http.HttpServlet")
.onBehavior("service")
.onWatch(adviceListener);
new EventWatchBuilder(moduleEventWatcher)
.onClass("com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently")
.includeSubClasses()
.onBehavior("consumeMessage")
.onWatch(adviceListener);
new EventWatchBuilder(moduleEventWatcher)
.onClass("com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly")
.includeSubClasses()
.onBehavior("consumeMessage")
.onWatch(adviceListener);
new EventWatchBuilder(moduleEventWatcher)
.onClass("com.taobao.hsf.remoting.provider.ReflectInvocationHandler")
.includeSubClasses()
.onBehavior("invoke")
.onWatch(adviceListener);
}
}
通過在應用啟動參數中增加javaagent=jvm-sandbox agent的方式來使用,相比較方案一業務應用不需要改動任何代碼,也不需要對已有封裝的框架進行修改,缺點是jvm-sandbox需要提前部署到每個應用的機器上,會給運維帶來工作量,個人認為這種方案是最穩定的。
方案三
為了避免方案二中運維工作,一種思路是以jar包的形式提供給業務方使用,業務方引入jar包就可以了
,主要面臨兩個問題需要解決。
如何觸發jar包執行初始化邏輯
一種方式是通過spring boot starter的方式,比如
arthas-spring-boot-starter;
一種是根據spring容器初始化流程,選擇某個切入點,比如通過實現ApplicationListener接口,監聽spring初始化完成的ApplicationEvent來實現。
如何初始化jvm-sandbox
初始化的核心邏輯如下:
//通過ByteBuddyAgent獲取Instrumentation
Instrumentation inst = ByteBuddyAgent.install();
//將相應版本的sandbox-spy.jar添加到BootstrapClassLoader搜索路徑中
//這一步的操作是由于sandbox-spy中包名是以java開頭的,所以只能通過BootstrapClassLoader進行加載
JarFile spyJarFile = new JarFile("/目錄/sandbox-spy-version.jar");
inst.appendToBootstrapClassLoaderSearch(spyJarFile);
//構造jvm-sandbox CoreFeatureString
String sandboxCoreFeatureString = String.format(";system_module=%s;mode=%s;sandbox_home=%s;provider=%s;namespace=%s;unsafe.enable=true;",systemModule, "agent", sandboxHome, provider, NAMESPACE );
CoreConfigure coreConfigure = CoreConfigure.toConfigure(sandboxCoreFeatureString,null);
CoreLoadedClassDataSource classDataSource = new DefaultCoreLoadedClassDataSource(inst,true);
ProviderManager providerManager = new DefaultProviderManager(coreConfigure);
//核心類,用戶自定義的module是在這個類中完成加載和初始化的
CoreModuleManager coreModuleManager = new DefaultCoreModuleManager(coreConfigure,inst,classDataSource,providerManager);
//初始化命名空間與SpyHandler對于關系
SpyUtils.init(NAMESPACE);
//加載各種module
coreModuleManager.reset();
上面代碼總體邏輯是沒有問題的,需要考慮的細節是上面代碼在不同類加載器體系下的兼容問題。
Tomcat

tomcat類加載器關系
pandora可運行jar包

pandora類加載器關系
idea

idea應用類加載器關系
經驗總結
從目前的三種方案來說,個人比較傾向方案二。
參考資料
bytebuddy
jvm-sandbox
arthas