在 APM 中,保證及時并準確地獲取應用的信息是非常重要的,這樣才能保證應用出現問題時,我們可以高效地找到并解決問題。本文以之前提交給 Node.js 的 PR 為例,介紹如何實現一個 APM watchdog 來對應用進行監控。
Hello,大家好,之前說不打算更新公眾號了,后面有時間的話還是會偶爾更新下,記錄和分享下一些技術相關的內容,今天分享下如何實現一個 APM watchdog。
在 APM 中,保證及時并準確地獲取應用的信息是非常重要的,這樣才能保證應用出現問題時,我們可以高效地找到并解決問題。本文以之前提交給 Node.js 的 PR 為例,介紹如何實現一個 APM watchdog 來對應用進行監控。這個 PR 的實現思想來自我們在內部實現的 APM watchdog,但是因為邏輯復雜,目前暫時還沒有時間去推進。
首先來看一下如何使用,然后看看一下如何實現。
new MemoryProfileWatchdog({
// 內存閾值,達到該閾值則采集堆快照
maxRss: 1024 * 1024,
maxUsedHeapSize: 1024 * 1024,
// 輪詢間隔
interval: 1000,
// 快照寫到哪個文件
filename: filepath,
});
可以看到,啟動一個 watchdog 非常簡單,我們只需要配置一些監控的閾值和輪訓時間。監控的數據是基于定時輪詢的,因為沒有相關的訂閱發布機制,當 watchdog 監控到數據達到閾值時就會采集堆快照,因為這里是一個內存 watchdog,我們也可以實現 CPU watchdog,原理是一樣的。接著看看實現,首先看 JS 層的實現。
class MemoryProfileWatchdog {
#handle;
constructor(options) {
this.#handle = new profiler.MemoryProfileWatchdog({
...options,
filename,
});
this.#handle.start();
}
stop() {
if (this.#handle) {
this.#handle.stop();
this.#handle = null;
}
}
}
JS 層的實現非常簡單,只是對 C++ 層的簡單封裝,所以直接來看 C++ 層的實現,我們忽略一些細節,只關注核心邏輯。
class ProfileWatchdog : public BaseObject {
public:
enum class ProfileWatchdogState { kInitialized, kRunning, kClosing, kClosed };
ProfileWatchdog(Environment* env, v8::Local<v8::Object> object);
~ProfileWatchdog() override;
static v8::Local<v8::FunctionTemplate> GetConstructorTemplate(Environment* env);
// 啟動 / 停止 watchdog
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
void Start(Environment* env);
void Stop();
// 提交一個任務
template <typename Fn>
void AddTask(Fn&& cb, CallbackFlags::Flags flags = CallbackFlags::Flags::kRefed);
// 處理一個任務
void HandleTasks();
// 啟動一個定時器
void SetTimeout();
// 定時器回調,具體的邏輯由子類實現
virtual bool TimeoutHandler() = 0;
protected:
// 輪詢間隔
uint64_t interval_;
private:
static void Run(void* arg);
static void Timer(uv_timer_t* timer);
// 子線程
uv_thread_t thread_;
uv_loop_t loop_;
// 主線程和子線程的通信結構體
uv_async_t async_;
// 定時器
uv_timer_t timer_;
// 任務隊列
CallbackQueue<void> tasks_;
Mutex task_mutex_;
};
ProfileWatchdog 實現了 watchdog 機制,具體需要監控什么數據由子類實現,比如內存 watchdog。
class MemoryProfileWatchdog : public ProfileWatchdog {
public:
MemoryProfileWatchdog(Environment* env,
v8::Local<v8::Object> object,
v8::Local<v8::Object> options);
static void Init(Environment* env, v8::Local<v8::Object> target);
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
bool TimeoutHandler() override;
private:
// 需要監控的數據指標
size_t max_rss_ = 0;
size_t max_used_heap_size_ = 0;
std::string filename_;
};
有了基本的了解后,接下來看具體實現。
void ProfileWatchdog::Start(Environment* env) {
int rc;
// 初始化一個事件循環結構體
rc = uv_loop_init(&loop_);
// 初始化線程間通信結構體
rc = uv_async_init(&loop_, &async_, [](uv_async_t* task_async) {
ProfileWatchdog* w = ContainerOf(&ProfileWatchdog::async_, task_async);
w->HandleTasks();
});
// 初始化并啟動一個定時器
rc = uv_timer_init(&loop_, &timer_);
rc = uv_timer_start(&timer_, &ProfileWatchdog::Timer, interval_, 0);
// 創建 watchdog 線程
rc = uv_thread_create(&thread_, &ProfileWatchdog::Run, this);
}
當啟動一個 watchdog 時就會執行 Start,Start 函數中主要初始化了線程間通信的結構體,然后啟動一個定時器,最后創建一個 watchdog 線程。因為 Node.js 是單線程的,為了保證 watchdog 在 JS 繁忙時仍可正常工作,我們需要借助子線程。創建子線程后,子線程就會開始執行 ProfileWatchdog::Run。
void ProfileWatchdog::Run(void* arg) {
ProfileWatchdog* wd = static_cast<ProfileWatchdog*>(arg);
uv_run(&wd->loop_, UV_RUN_DEFAULT);
CheckedUvLoopClose(&wd->loop_);
}
Run 的邏輯很簡單,就是啟動一個事件循環,因為我們前面啟動了一個定時器,所以這個事件循環里就會定時執行定時器回調 ProfileWatchdog::Timer。
void ProfileWatchdog::Timer(uv_timer_t* timer) {
ProfileWatchdog* w = ContainerOf(&ProfileWatchdog::timer_, timer);
// 往主線程插入一個任務
env->RequestInterrupt([watchdog = std::move(w)](Environment* env) {
// 執行定時器的邏輯,由具體的 watchdog 實現,返回 true 表示重啟定時器,否則監控到此為止
if (watchdog->TimeoutHandler()) {
// 往子線程里插入一個任務,該任務是重啟定時器
watchdog->AddTask(
[watchdog = std::move(watchdog)]() { watchdog->SetTimeout(); });
}
});
}
Timer 中通過 env->RequestInterrupt 往主線程插入一個任務,因為有些代碼是不能在子線程里執行的,另外 RequestInterrupt 可以保證在 JS 繁忙或阻塞在事件驅動模塊時仍然可以執行我們的任務,那么這個任務具體做什么呢?看看內存 watchdog 的 TimeoutHandler 實現。
bool MemoryProfileWatchdog::TimeoutHandler() {
bool reached = false;
if (max_rss_) {
size_t rss = 0;
uv_resident_set_memory(&rss);
if (rss >= max_rss_) {
reached = true;
}
}
if (!reached && max_used_heap_size_) {
Isolate* isolate = env()->isolate();
HeapStatistics heap_statistics;
isolate->GetHeapStatistics(&heap_statistics);
if (heap_statistics.used_heap_size() >= max_used_heap_size_) {
reached = true;
}
}
// 內存達到閾值,采集快照
if (reached) {
HeapProfiler::HeapSnapshotOptions options;
options.numerics_mode = HeapProfiler::NumericsMode::kExposeNumericValues;
options.snapshot_mode = HeapProfiler::HeapSnapshotMode::kExposeInternals;
heap::WriteSnapshot(env(), filename_.c_str(), options);
// 采集完快照,停止 watchdog
return false;
}
return true;
}
TimeoutHandler 就是獲取主線程的內存信息,并判斷是否超過了我們配置的閾值,是的話則采集堆快照并停止 watchdog,防止采集過多的重復信息,我們也可以改成隔久一點再開始重新監控,而內存如果沒有超過閾值,則重啟定時器,等待下一輪判斷。從前面的代碼可以看到,如果沒有達到閾值,我們會調用 AddTask 往子線程插入一個任務。
watchdog->AddTask([watchdog = std::move(watchdog)]() {
watchdog->SetTimeout();
});
看一下 AddTask 的實現。
template <typename Fn>
void ProfileWatchdog::AddTask(Fn&& cb, CallbackFlags::Flags flags) {
auto callback = tasks_.CreateCallback(std::move(cb), flags);
{
Mutex::ScopedLock lock(task_mutex_);
// 追加一個任務
tasks_.Push(std::move(callback));
}
// 通知子線程有任務處理
uv_async_send(&async_);
}
AddTask 往子線程的任務隊列中插入一個任務,并通知子線程處理,接著看看子線程如何處理任務。
void ProfileWatchdog::HandleTasks() {
while (tasks_.size() > 0) {
CallbackQueue<void> queue;
{
Mutex::ScopedLock lock(task_mutex_);
queue.ConcatMove(std::move(tasks_));
}
while (auto head = queue.Shift()) head->Call();
}
}
HandleTasks 會逐個任務處理,也就是執行一個個函數,我們剛才插入的函數如下。
void ProfileWatchdog::SetTimeout() {
uv_timer_start(&timer_, &ProfileWatchdog::Timer, interval_, 0);
}
也就是重啟定時器,這樣就開始等待下次超時,直到觸發了閾值。
這就是 APM watchdog 的實現原理,核心思想是利用子線程和 env->RequestInterrupt 機制,保證我們對目的線程進行相對實時的監控(取決于設置的輪詢時間),并在發現問題采集相關信息來協助我們排查問題,利用這個思路,我們可以實現不同類型的 watchdog 來解決不同的問題,比如 CPU watchdog 可以在 JS 死循環時采集 CPU Profile 信息幫助我們找到有問題的代碼,本文就分享到這里,最后貼上目前的實現 PR(見文章末尾)。因為涉及到多線程和 Node.js 內部的一些知識,實現起來有很多地方需要考慮的,希望后面有時間繼續推進。
PR:https://github.com/nodejs/node/pull/45714