聊一聊 Libuv 的信號機制
本文介紹 Libuv 是如何基于操作系統底層的能力實現信號模塊的,看一下如何在 Libuv 中使用信號模塊。
#include "uv.h"
#include "stdio.h"
#include <unistd.h>
void signal_cb(uv_signal_t* handleint sig) {
printf("receive signal\n");
uv_signal_stop(handle);
}
int main() {
printf("%d\n", getpid());
fflush(stdout);
uv_loop_t loop;
uv_signal_t signal;
uv_loop_init(&loop);
uv_signal_init(&loop, &signal);
uv_signal_start(&signal, signal_cb, SIGUSR1);
uv_run(&loop, UV_RUN_DEFAULT);
return 0;
}
通過 gcc main.c -luv && ./a.out 編譯執行上面代碼,然后執行 kill -SIGUSR1 pid 給該進程發送信號,可以看到會輸出 receive signal。接著來分析具體的實現。
初始化
Libuv 在初始化第一個事件循環結構體時會初始化信號處理的相關結構體。
void uv__signal_global_once_init(void) {
uv_once(&uv__signal_global_init_guard, uv__signal_global_init);
}
因為信號處理是支持多線程的,所以這里用了 uv_once 保證只執行一次 uv__signal_global_init。
static void uv__signal_global_init(void) {
if (uv__signal_lock_pipefd[0] == -1)
// 如果在子線程里調用了 fork,則需要在 fork 后的子進程調用 uv__signal_global_reinit 重新初始化相關數據結構
if (pthread_atfork(NULL, NULL, &uv__signal_global_reinit))
abort();
uv__signal_global_reinit();
}
繼續看 uv__signal_global_reinit。
static void uv__signal_global_reinit(void) {
// 清除之前的狀態
uv__signal_cleanup();
// 創建兩個 fd 用于加鎖 / 解鎖,工作方式是阻塞模式
if (uv__make_pipe(uv__signal_lock_pipefd, 0))
abort();
// 修改鎖為解鎖狀態
if (uv__signal_unlock())
abort();
}
初始化部分沒有太多的邏輯,只是初始化一些數據結構。
加鎖 / 解鎖
因為 Libuv 用一棵全局的紅黑樹維護了信號和訂閱者的關系,而多個線程可以訪問這個全局的數據結構,所以需要加鎖訪問,接著看看 Libuv 的鎖是怎么實現的,下面是加鎖的實現。
static int uv__signal_lock(void) {
int r;
char data;
do {
r = read(uv__signal_lock_pipefd[0], &data, sizeof data);
} while (r < 0 && errno == EINTR);
return (r < 0) ? -1 : 0;
}
下面是解鎖的實現。
static int uv__signal_unlock(void) {
int r;
char data = 42;
do {
r = write(uv__signal_lock_pipefd[1], &data, sizeof data);
} while (r < 0 && errno == EINTR);
return (r < 0) ? -1 : 0;
}
剛才介紹初始化過程時說到了 uv__signal_lock_pipefd 是一個通信管道,Libuv 的加鎖解鎖正是通過 uv__signal_lock_pipefd 實現的,管道初始化時會先寫入一個字節表示處于解鎖狀態,加鎖時會讀出這一個字節,表示加鎖成功,然后解鎖時再次寫入一個字節。因為讀寫一個字節是原子的,所以這就實現了加鎖/解鎖的能力,保證多線程訪問時的安全問題。
那么為什么 Libuv 不使用傳統的 mutex 來實現多線程安全訪問呢?這里涉及到一個概念叫做異步信號安全,它表示一個函數可以安全地在信號處理函數中使用,因為信號是異步發生的并且信號處理函數具有非常高的優先級,假設進程正在執行 a 函數修改一些數據,突然收到信號然后執行信號處理函數,處理函數中又執行了 a 函數修改數據,這時候可能會導致問題。解決這個問題的方式通常有兩種:
- 在信號處理函數里只調用異步信號安全的函數。
- 在執行非異步信號安全的函數時屏蔽信號,避免在信號處理函數里再次執行該函數。
因為 Libuv 在信號處理函數里需要訪問全局數據結構,而 mutex 相關的函數不是異步信號安全的,所以不能使用 mutex 實現,而是通過 read / write 實現(它們是異步信號安全的函數)。
信號屏蔽
加鎖解鎖解決了多個線程訪問全局數據結構的問題,但是還有一個問題是同線程的數據競爭訪問問題?這里大家可能會好奇,單線程內的代碼是順序執行的,為什么會存在數據競爭訪問?原因是信號機制的存在,比如我們正在執行 a 函數修改數據結構,突然收到了一個信號,然后在信號處理函數里又執行 a 函數修改數據結構,這樣可能就會導致問題,所以在執行某些函數時需要先屏蔽信號,執行完后再允許接收信號。我們看看相關的處理邏輯。
static void uv__signal_block_and_lock(sigset_t* saved_sigmask) {
sigset_t new_mask;
// 把 new_mask 所有比特位設置為 1
if (sigfillset(&new_mask))
abort();
// 屏蔽(當前線程的)所有信號
if (pthread_sigmask(SIG_SETMASK, &new_mask, saved_sigmask))
abort();
// 加鎖
if (uv__signal_lock())
abort();
}
為什么需要屏蔽所有信號呢?因為執行 uv__signal_block_and_lock 后,需要往操作系統注冊信號處理函數,如果剛注冊完信號處理函數,還沒有執行 uv__signal_unlock 釋放鎖,這時候突然收到一個信號,然后在信號處理函數中又嘗試加鎖則會導致死鎖。過程大致如下:
- 加鎖成功,注冊信號處理函數到操作系統。
- 時鐘中斷觸發,觸發進程調度,當前進程事件片還沒到,繼續執行。
- 進程調度完后,發現有信號需要處理,然后執行信號處理函數。
- 信號處理函數嘗試加鎖,但是鎖已經被持有,然后進入等待狀態。
- 因為信號處理函數沒有返回,導致后續的代碼無法執行,進程因為無法進行解鎖操作,最終陷入死鎖。
初始化信號結構體
int uv_signal_init(uv_loop_t* loop, uv_signal_t* handle) {
int err;
err = uv__signal_loop_once_init(loop);
if (err)
return err;
uv__handle_init(loop, (uv_handle_t*) handle, UV_SIGNAL);
handle->signum = 0;
handle->caught_signals = 0;
handle->dispatched_signals = 0;
return 0;
}
初始化的邏輯很簡單,只是做一些字段的初始化,但是有一個比較重要的邏輯是 uv__signal_loop_once_init。
static int uv__signal_loop_once_init(uv_loop_t* loop) {
int err;
// 已經初始化過了,直接返回
if (loop->signal_pipefd[0] != -1)
return 0;
// 創建一個非阻塞模式的通信管道
err = uv__make_pipe(loop->signal_pipefd, UV_NONBLOCK_PIPE);
if (err)
return err;
// 初始化 IO 觀察者
uv__io_init(&loop->signal_io_watcher,
uv__signal_event,
loop->signal_pipefd[0]);
// 注冊 IO 觀察者
uv__io_start(loop, &loop->signal_io_watcher, POLLIN);
return 0;
}
uv__signal_loop_once_init 的作用是創建一個通信管道,然后注冊一個 IO 觀察者到事件循環中,當收到信號時,信號處理函數就會通過這個管道通知事件循環,事件循環會在某個階段通知信號的訂閱者。
訂閱信號
訂閱信號可以通過下面兩個函數。
int uv_signal_start(uv_signal_t* handle, uv_signal_cb signal_cb, int signum) {
return uv__signal_start(handle, signal_cb, signum, 0);
}
int uv_signal_start_oneshot(uv_signal_t* handle,
uv_signal_cb signal_cb,
int signum) {
return uv__signal_start(handle, signal_cb, signum, 1);
}
最終是由 uv__signal_start 實現的,其實 oneshot 表示最多只執行一次信號處理函數。
static int uv__signal_start(uv_signal_t* handle,
uv_signal_cb signal_cb,
int signum,
int oneshot) {
sigset_t saved_sigmask;
int err;
uv_signal_t* first_handle;
// 之前已經監聽過這個信號,這里只需要更新下回調就行
if (signum == handle->signum) {
handle->signal_cb = signal_cb;
return 0;
}
// 如果之前監聽過了,先刪除,比如同一個 handle 監聽了另一個信號
if (handle->signum != 0) {
uv__signal_stop(handle);
}
uv__signal_block_and_lock(&saved_sigmask);
// 注冊信號,待會分析
uv__signal_unlock_and_unblock(&saved_sigmask);
return 0;
}
uv__signal_start 首先做了一些前置判斷,然后調 uv__signal_block_and_lock 加鎖和屏蔽所有的信號,加鎖主要是準備要修改共享的數據結構,避免多線程引起的問題,屏蔽所有的信號則是因為信號處理函數也會訪問這個數據結構,所以需要避免它的執行,否則會引起死鎖問題。接著分析信號注冊的邏輯。
// 查找這個信號對應的 handle
first_handle = uv__signal_first_handle(signum);
// 還沒注冊過則直接注冊
// 之前注冊過且設置了 UV_SIGNAL_ONE_SHOT 標記,但是當前注冊的還沒有設置 UV_SIGNAL_ONE_SHOT 則注冊
if (first_handle == NULL ||
(!oneshot && (first_handle->flags & UV_SIGNAL_ONE_SHOT))) {
uv__signal_register_handler(signum, oneshot);
}
// oneshot 表示訂閱者最多只被通知一次
if (oneshot)
handle->flags |= UV_SIGNAL_ONE_SHOT;
// 插入紅黑樹
RB_INSERT(uv__signal_tree_s, &uv__signal_tree, handle);
handle->signum = signum;
handle->signal_cb = signal_cb;
uv__handle_start(handle);
注冊信號包括兩步。
第一步是注冊到操作系統。
static int uv__signal_register_handler(int signum, int oneshot) {
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
if (sigfillset(&sa.sa_mask))
abort();
// 設置信號處理函數
sa.sa_handler = uv__signal_handler;
sa.sa_flags = SA_RESTART;
// oneshot 則設置 SA_RESETHAND,操作系統執行完信號處理函數后會重置為默認處理行為
if (oneshot)
sa.sa_flags |= SA_RESETHAND;
// 注冊到操作系統
if (sigaction(signum, &sa, NULL))
return UV__ERR(errno);
return 0;
}
uv__signal_register_handler 實現了信號的注冊,Libuv 并不是每次注冊信號時都會執行 uv__signal_register_handler,而是做了一個優化,只有滿足兩個條件才會注冊信號到操作系統。
1.還沒注冊過信號:這個是很自然的邏輯,不需要分析。
2.!oneshot && (first_handle->flags & UV_SIGNAL_ONE_SHOT)):
a.oneshot 為 true:則不論之前的 handle 是否設置了 UV_SIGNAL_ONE_SHOT 都不需要調操作系統進行注冊了,因為之前已經注冊過了,并且保證設置了 UV_SIGNAL_ONE_SHOT 的 handle 可以被執行。
b.oneshot 為 false,first_handle->flags & UV_SIGNAL_ONE_SHOT 為 false:之前的 handle 沒有設置 UV_SIGNAL_ONE_SHOT,則也不需要調操作系統注冊信號了,因為之前已經注冊過了,并且保證所有的 handle 可以觸發多次。
c.oneshot 為 false,first_handle->flags & UV_SIGNAL_ONE_SHOT 為 true:如果之前注冊的信號設置了 UV_SIGNAL_ONE_SHOT 但是本次需要注冊的沒有設置該 flag,則需要調用 uv__signal_register_handler 重新進行注冊,因為設置了 UV_SIGNAL_ONE_SHOT 的話操作系統執行完一次自定義的信號處理函數后就不會再執行了,這樣會導致沒有設置 UV_SIGNAL_ONE_SHOT 的訂閱者得不到通知。
大家可能會疑惑,這里為什么只需要判斷第一個 handle,因為紅黑樹的查找時會先找沒有設置 UV_SIGNAL_ONE_SHOT 的 handle,然后再找設置了 UV_SIGNAL_ONE_SHOT 的 handle,所以如果找到的第一個 handle 設置了 UV_SIGNAL_ONE_SHOT,那說明所有 handle 都設置了 UV_SIGNAL_ONE_SHOT。
第一步注冊完信號后,第二步是注冊到 Libuv 維護的紅黑樹,因為一個信號最多只能注冊一個處理函數,為了支持一個信號可以有多個訂閱者,Libuv 自己維護了訂閱者,然后把信號處理函數統一注冊為 uv__signal_handler,然后在收到信號時再由 uv__signal_handler 進行處理和分發信號。
停止訂閱信號
停止訂閱信號的最終實現函數是 uv__signal_stop。
static void uv__signal_stop(uv_signal_t* handle) {
sigset_t saved_sigmask;
uv_signal_t* first_handle;
int rem_oneshot;
int first_oneshot;
uv__signal_block_and_lock(&saved_sigmask);
// 從紅黑樹中刪除該 handle
RB_REMOVE(uv__signal_tree_s, &uv__signal_tree, handle);
// 找到第一個訂閱了該信號的 handle
first_handle = uv__signal_first_handle(handle->signum);
// 沒有訂閱者了,則告訴操作系統收到該信號時不需要通知 Libuv 了
if (first_handle == NULL) {
uv__signal_unregister_handler(handle->signum);
} else {
// 判斷是否設置了 UV_SIGNAL_ONE_SHOT
rem_oneshot = handle->flags & UV_SIGNAL_ONE_SHOT;
first_oneshot = first_handle->flags & UV_SIGNAL_ONE_SHOT;
// 如果剩下的 handle 設置了 UV_SIGNAL_ONE_SHOT,但是當前被刪除的 handle 沒有
// 設置 UV_SIGNAL_ONE_SHOT 則需要重新注冊信號處理函數為 oneshot
if (first_oneshot && !rem_oneshot) {
uv__signal_register_handler(handle->signum, 1);
}
}
uv__signal_unlock_and_unblock(&saved_sigmask);
handle->signum = 0;
uv__handle_stop(handle);
}
如果 first_oneshot 為 true 說明剩下的 handle 都是設置了 UV_SIGNAL_ONE_SHOT,如果 first_oneshot 為 true 并且 rem_oneshot 為 false 說明目前注冊到操作系統的信號函數沒有設置 oneshot,因為只要有一個 handle 沒有設置UV_SIGNAL_ONE_SHOT,那么注冊到操作系統的信號處理函數都不會設置 oneshot 標記,這時候需要修改重新更新信號處理函數為 oneshot。
信號的處理
從前面的分析可以看到,信號的處理函數統一設置為 uv__signal_handler,所以收到信號時,操作系統就會執行 uv__signal_handler。
// signum 為收到的信息
static void uv__signal_handler(int signum) {
uv__signal_msg_t msg;
uv_signal_t* handle;
int saved_errno;
saved_errno = errno;
memset(&msg, 0, sizeof msg);
// 需要加鎖,避免另一個線程在修改紅黑樹
uv__signal_lock();
// 找出 signum 對應的訂閱者
for (handle = uv__signal_first_handle(signum);
handle != NULL && handle->signum == signum;
handle = RB_NEXT(uv__signal_tree_s, &uv__signal_tree, handle)) {
int r;
msg.signum = signum;
msg.handle = handle;
// 寫入消息通知事件循環
do {
r = write(handle->loop->signal_pipefd[1], &msg, sizeof msg);
} while (r == -1 && errno == EINTR);
}
uv__signal_unlock();
errno = saved_errno;
}
收到信號時并不是直接通知訂閱者,而是通知事件循環,在事件循環的某個階段才會真正通知訂閱者。通知事件循環的方式是通過寫入多個消息到管道中,事件循環在 Poll IO 階段就會判斷這個管道可讀,從而讀出所有的消息進行處理。前面介紹初始化信號結構體時說過,第一次初始化時會 uv__signal_loop_once_init 往事件循環中注冊一個 IO 觀察者,對應的處理函數是 uv__signal_event。
static void uv__signal_event(uv_loop_t* loop,
uv__io_t* w,
unsigned int events) {
uv__signal_msg_t* msg;
uv_signal_t* handle;
char buf[sizeof(uv__signal_msg_t) * 32];
size_t bytes, end, i;
int r;
bytes = 0;
end = 0;
do {
// 讀出消息
r = read(loop->signal_pipefd[0], buf + bytes, sizeof(buf) - bytes);
bytes += r;
end = (bytes / sizeof(uv__signal_msg_t)) * sizeof(uv__signal_msg_t);
// 逐個處理消息
for (i = 0; i < end; i += sizeof(uv__signal_msg_t)) {
msg = (uv__signal_msg_t*) (buf + i);
handle = msg->handle;
// 執行回調
if (msg->signum == handle->signum) {
handle->signal_cb(handle, handle->signum);
}
// 設置了 UV_SIGNAL_ONE_SHOT,則解除訂閱關系
if (handle->flags & UV_SIGNAL_ONE_SHOT)
uv__signal_stop(handle);
}
bytes -= end;
if (bytes) {
memmove(buf, buf + end, bytes);
continue;
}
} while (end == sizeof buf);
}
信號的使用
信號的處理具有非常高的優先級,這個能力在很多場景下非常有用。下面看一個簡單的場景。
#include <signal.h>
#include <unistd.h>
#include <stdio.h>
void handler(int s) {
printf("receive signal\n");
}
int main () {
printf("%d\n", getpid());
fflush(stdout);
signal(SIGUSR1, handler);
while(1) {}
return 0;
}
通過 gcc main.c -luv && ./a.out 編譯執行上面代碼,然后再執行 kill -SIGUSR1 pid(執行 ./a.out 輸出的 pid),可以看到會輸出 receive signal,也就是說,盡管進程處于死循環,信號機制依然可以正常工作。下面繼續來看一下兩個具體的應用場景。
第一個是在 Node.js 中。假設業務中有以下一段代碼。
console.log(process.pid);
function a() {
while(1) {
b();
}
}
function b() {
while(1) {}
}
a();
有一天我們發現服務的某個進程處于 100% 了,那么我們應該如何排查呢?我們知道 Node.js 是單線程的,JS 線程處于死循環時,是無法處理外部進來的請求了,也就意味著我們不能手動采集 CPU Profile 了。這時候信號機制的作用就來了,我們找到這個進程的 pid,然后執行 kill -SIGUSR1 pid 會發現 Node.js 的調試器(本質上是創建了一個線程監聽了一個端口)被打開了,通過 Chrome Dev Tools 連接上調試器我們就可以采集 CPU Profile 了(重點是打開調試器,采集方式很多種)。結果如下。
可以看到通過 Profile 我們就可以輕松分析出是哪里的代碼導致了死循環,從而快速解決業務中的問題。
接著再看一個 GO 的例子。
package main
import (
"fmt"
"runtime"
)
func main() {
// 設置只有單個線程
runtime.GOMAXPROCS(1)
go func() {
for {
fmt.Println("worker goroutine")
}
}()
for {
}
}
在 go1.13 下執行上面的代碼,沒有任務輸出,然后切換到 go1.23 再試試(可以通過 gvm 管理 Go 版本),可以看到不斷輸出 worker goroutine。為什么會這樣呢?Go 雖然通過協程原生支持了并發,但是在單個線程中,如果一個 goroutine 正在執行時,其他 goroutine 是無法執行的,Go 為了避免 goroutine 饑餓問題,實現了搶占機制,但是早期實現的是基于協作式的搶占機制(比如 go1.13 版本),協作式搶占的原理是在函數中插入搶占代碼,goroutine 執行到函數時 Go runtime 會判斷 goroutine 的事件片是不是用完了,用完了則進行調度,這種搶占機制受限于函數,如果我們不執行函數的話就繞過了這個檢測,比如上面的例子,鑒于這個限制,Go 現在已經實現了基于信號的搶占式機制(比如 go1.23 版本),基于信號的搶占式機制正式使用了信號高優先級的能力,盡管 goroutine 處于死循環,Go runtime 依然有能力介入,從而實現 goroutine 的調度。