成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

聊一聊 Libuv 的信號機制

開發 前端
本文介紹 Libuv 是如何基于操作系統底層的能力實現信號模塊的,看一下如何在 Libuv 中使用信號模塊。

本文介紹 Libuv 是如何基于操作系統底層的能力實現信號模塊的,看一下如何在 Libuv 中使用信號模塊。

#include "uv.h"
#include "stdio.h"
#include <unistd.h>

void signal_cb(uv_signal_t* handleint sig) {
    printf("receive signal\n");
    uv_signal_stop(handle);
}

int main() {
    printf("%d\n", getpid());
    fflush(stdout);
    uv_loop_t loop;
    uv_signal_t signal;
    uv_loop_init(&loop);
    uv_signal_init(&loop, &signal);
    uv_signal_start(&signal, signal_cb, SIGUSR1);
    uv_run(&loop, UV_RUN_DEFAULT);
    return 0;
}

通過 gcc main.c -luv && ./a.out 編譯執行上面代碼,然后執行 kill -SIGUSR1 pid 給該進程發送信號,可以看到會輸出 receive signal。接著來分析具體的實現。

初始化

Libuv 在初始化第一個事件循環結構體時會初始化信號處理的相關結構體。

void uv__signal_global_once_init(void) {
  uv_once(&uv__signal_global_init_guard, uv__signal_global_init);
}

因為信號處理是支持多線程的,所以這里用了 uv_once 保證只執行一次 uv__signal_global_init。

static void uv__signal_global_init(void) {
  if (uv__signal_lock_pipefd[0] == -1)
    // 如果在子線程里調用了 fork,則需要在 fork 后的子進程調用 uv__signal_global_reinit 重新初始化相關數據結構
    if (pthread_atfork(NULL, NULL, &uv__signal_global_reinit))
      abort();

  uv__signal_global_reinit();
}

繼續看 uv__signal_global_reinit。

static void uv__signal_global_reinit(void) {
  // 清除之前的狀態
  uv__signal_cleanup();
  // 創建兩個 fd 用于加鎖 / 解鎖,工作方式是阻塞模式
  if (uv__make_pipe(uv__signal_lock_pipefd, 0))
    abort();
  // 修改鎖為解鎖狀態
  if (uv__signal_unlock())
    abort();
}

初始化部分沒有太多的邏輯,只是初始化一些數據結構。

加鎖 / 解鎖

因為 Libuv 用一棵全局的紅黑樹維護了信號和訂閱者的關系,而多個線程可以訪問這個全局的數據結構,所以需要加鎖訪問,接著看看 Libuv 的鎖是怎么實現的,下面是加鎖的實現。

static int uv__signal_lock(void) {
  int r;
  char data;

  do {
    r = read(uv__signal_lock_pipefd[0], &data, sizeof data);
  } while (r < 0 && errno == EINTR);

  return (r < 0) ? -1 : 0;
}

下面是解鎖的實現。

static int uv__signal_unlock(void) {
  int r;
  char data = 42;

  do {
    r = write(uv__signal_lock_pipefd[1], &data, sizeof data);
  } while (r < 0 && errno == EINTR);

  return (r < 0) ? -1 : 0;
}

剛才介紹初始化過程時說到了 uv__signal_lock_pipefd 是一個通信管道,Libuv 的加鎖解鎖正是通過 uv__signal_lock_pipefd 實現的,管道初始化時會先寫入一個字節表示處于解鎖狀態,加鎖時會讀出這一個字節,表示加鎖成功,然后解鎖時再次寫入一個字節。因為讀寫一個字節是原子的,所以這就實現了加鎖/解鎖的能力,保證多線程訪問時的安全問題。

那么為什么 Libuv 不使用傳統的 mutex 來實現多線程安全訪問呢?這里涉及到一個概念叫做異步信號安全,它表示一個函數可以安全地在信號處理函數中使用,因為信號是異步發生的并且信號處理函數具有非常高的優先級,假設進程正在執行 a 函數修改一些數據,突然收到信號然后執行信號處理函數,處理函數中又執行了 a 函數修改數據,這時候可能會導致問題。解決這個問題的方式通常有兩種:

  • 在信號處理函數里只調用異步信號安全的函數。
  • 在執行非異步信號安全的函數時屏蔽信號,避免在信號處理函數里再次執行該函數。

因為 Libuv 在信號處理函數里需要訪問全局數據結構,而 mutex 相關的函數不是異步信號安全的,所以不能使用 mutex 實現,而是通過 read / write 實現(它們是異步信號安全的函數)。

信號屏蔽

加鎖解鎖解決了多個線程訪問全局數據結構的問題,但是還有一個問題是同線程的數據競爭訪問問題?這里大家可能會好奇,單線程內的代碼是順序執行的,為什么會存在數據競爭訪問?原因是信號機制的存在,比如我們正在執行 a 函數修改數據結構,突然收到了一個信號,然后在信號處理函數里又執行 a 函數修改數據結構,這樣可能就會導致問題,所以在執行某些函數時需要先屏蔽信號,執行完后再允許接收信號。我們看看相關的處理邏輯。

static void uv__signal_block_and_lock(sigset_t* saved_sigmask) {
  sigset_t new_mask;
  // 把 new_mask 所有比特位設置為 1
  if (sigfillset(&new_mask))
    abort();
  // 屏蔽(當前線程的)所有信號
  if (pthread_sigmask(SIG_SETMASK, &new_mask, saved_sigmask))
    abort();
  // 加鎖
  if (uv__signal_lock())
    abort();
}

為什么需要屏蔽所有信號呢?因為執行 uv__signal_block_and_lock 后,需要往操作系統注冊信號處理函數,如果剛注冊完信號處理函數,還沒有執行 uv__signal_unlock 釋放鎖,這時候突然收到一個信號,然后在信號處理函數中又嘗試加鎖則會導致死鎖。過程大致如下:

  • 加鎖成功,注冊信號處理函數到操作系統。
  • 時鐘中斷觸發,觸發進程調度,當前進程事件片還沒到,繼續執行。
  • 進程調度完后,發現有信號需要處理,然后執行信號處理函數。
  • 信號處理函數嘗試加鎖,但是鎖已經被持有,然后進入等待狀態。
  • 因為信號處理函數沒有返回,導致后續的代碼無法執行,進程因為無法進行解鎖操作,最終陷入死鎖。

初始化信號結構體

int uv_signal_init(uv_loop_t* loop, uv_signal_t* handle) {
  int err;

  err = uv__signal_loop_once_init(loop);
  if (err)
    return err;

  uv__handle_init(loop, (uv_handle_t*) handle, UV_SIGNAL);
  handle->signum = 0;
  handle->caught_signals = 0;
  handle->dispatched_signals = 0;

  return 0;
}

初始化的邏輯很簡單,只是做一些字段的初始化,但是有一個比較重要的邏輯是 uv__signal_loop_once_init。

static int uv__signal_loop_once_init(uv_loop_t* loop) {
  int err;

  // 已經初始化過了,直接返回
  if (loop->signal_pipefd[0] != -1)
    return 0;
  // 創建一個非阻塞模式的通信管道
  err = uv__make_pipe(loop->signal_pipefd, UV_NONBLOCK_PIPE);
  if (err)
    return err;
  // 初始化 IO 觀察者
  uv__io_init(&loop->signal_io_watcher,
              uv__signal_event,
              loop->signal_pipefd[0]);
  // 注冊 IO 觀察者
  uv__io_start(loop, &loop->signal_io_watcher, POLLIN);

  return 0;
}

uv__signal_loop_once_init 的作用是創建一個通信管道,然后注冊一個 IO 觀察者到事件循環中,當收到信號時,信號處理函數就會通過這個管道通知事件循環,事件循環會在某個階段通知信號的訂閱者。

訂閱信號

訂閱信號可以通過下面兩個函數。

int uv_signal_start(uv_signal_t* handle, uv_signal_cb signal_cb, int signum) {
  return uv__signal_start(handle, signal_cb, signum, 0);
}

int uv_signal_start_oneshot(uv_signal_t* handle,
                            uv_signal_cb signal_cb,
                            int signum) {
  return uv__signal_start(handle, signal_cb, signum, 1);
}

最終是由 uv__signal_start 實現的,其實 oneshot 表示最多只執行一次信號處理函數。

static int uv__signal_start(uv_signal_t* handle,
                            uv_signal_cb signal_cb,
                            int signum,
                            int oneshot) {
  sigset_t saved_sigmask;
  int err;
  uv_signal_t* first_handle;
  // 之前已經監聽過這個信號,這里只需要更新下回調就行
  if (signum == handle->signum) {
    handle->signal_cb = signal_cb;
    return 0;
  }

  // 如果之前監聽過了,先刪除,比如同一個 handle 監聽了另一個信號
  if (handle->signum != 0) {
    uv__signal_stop(handle);
  }

  uv__signal_block_and_lock(&saved_sigmask);

  // 注冊信號,待會分析

  uv__signal_unlock_and_unblock(&saved_sigmask);

  return 0;
}

uv__signal_start 首先做了一些前置判斷,然后調 uv__signal_block_and_lock 加鎖和屏蔽所有的信號,加鎖主要是準備要修改共享的數據結構,避免多線程引起的問題,屏蔽所有的信號則是因為信號處理函數也會訪問這個數據結構,所以需要避免它的執行,否則會引起死鎖問題。接著分析信號注冊的邏輯。

// 查找這個信號對應的 handle
  first_handle = uv__signal_first_handle(signum);
  // 還沒注冊過則直接注冊
  // 之前注冊過且設置了 UV_SIGNAL_ONE_SHOT 標記,但是當前注冊的還沒有設置 UV_SIGNAL_ONE_SHOT 則注冊
  if (first_handle == NULL ||
      (!oneshot && (first_handle->flags & UV_SIGNAL_ONE_SHOT))) {
    uv__signal_register_handler(signum, oneshot);
  }
  // oneshot 表示訂閱者最多只被通知一次
  if (oneshot)
    handle->flags |= UV_SIGNAL_ONE_SHOT;
  // 插入紅黑樹
  RB_INSERT(uv__signal_tree_s, &uv__signal_tree, handle);
  handle->signum = signum;
  handle->signal_cb = signal_cb;
  uv__handle_start(handle);

注冊信號包括兩步。

第一步是注冊到操作系統。

static int uv__signal_register_handler(int signum, int oneshot) {
  struct sigaction sa;

  memset(&sa, 0, sizeof(sa));
  if (sigfillset(&sa.sa_mask))
    abort();
  // 設置信號處理函數
  sa.sa_handler = uv__signal_handler;
  sa.sa_flags = SA_RESTART;
  // oneshot 則設置 SA_RESETHAND,操作系統執行完信號處理函數后會重置為默認處理行為
  if (oneshot)
    sa.sa_flags |= SA_RESETHAND;

  // 注冊到操作系統
  if (sigaction(signum, &sa, NULL))
    return UV__ERR(errno);

  return 0;
}

uv__signal_register_handler 實現了信號的注冊,Libuv 并不是每次注冊信號時都會執行 uv__signal_register_handler,而是做了一個優化,只有滿足兩個條件才會注冊信號到操作系統。

1.還沒注冊過信號:這個是很自然的邏輯,不需要分析。

2.!oneshot && (first_handle->flags & UV_SIGNAL_ONE_SHOT)):

a.oneshot 為 true:則不論之前的 handle 是否設置了 UV_SIGNAL_ONE_SHOT 都不需要調操作系統進行注冊了,因為之前已經注冊過了,并且保證設置了 UV_SIGNAL_ONE_SHOT 的 handle 可以被執行。

b.oneshot 為 false,first_handle->flags & UV_SIGNAL_ONE_SHOT 為 false:之前的 handle 沒有設置 UV_SIGNAL_ONE_SHOT,則也不需要調操作系統注冊信號了,因為之前已經注冊過了,并且保證所有的 handle 可以觸發多次。

c.oneshot 為 false,first_handle->flags & UV_SIGNAL_ONE_SHOT 為 true:如果之前注冊的信號設置了 UV_SIGNAL_ONE_SHOT 但是本次需要注冊的沒有設置該 flag,則需要調用 uv__signal_register_handler 重新進行注冊,因為設置了 UV_SIGNAL_ONE_SHOT 的話操作系統執行完一次自定義的信號處理函數后就不會再執行了,這樣會導致沒有設置 UV_SIGNAL_ONE_SHOT 的訂閱者得不到通知。

大家可能會疑惑,這里為什么只需要判斷第一個 handle,因為紅黑樹的查找時會先找沒有設置 UV_SIGNAL_ONE_SHOT 的 handle,然后再找設置了 UV_SIGNAL_ONE_SHOT 的 handle,所以如果找到的第一個 handle 設置了 UV_SIGNAL_ONE_SHOT,那說明所有 handle 都設置了 UV_SIGNAL_ONE_SHOT。

第一步注冊完信號后,第二步是注冊到 Libuv 維護的紅黑樹,因為一個信號最多只能注冊一個處理函數,為了支持一個信號可以有多個訂閱者,Libuv 自己維護了訂閱者,然后把信號處理函數統一注冊為 uv__signal_handler,然后在收到信號時再由 uv__signal_handler 進行處理和分發信號。

停止訂閱信號

停止訂閱信號的最終實現函數是 uv__signal_stop。

static void uv__signal_stop(uv_signal_t* handle) {
  sigset_t saved_sigmask;
  uv_signal_t* first_handle;
  int rem_oneshot;
  int first_oneshot;

  uv__signal_block_and_lock(&saved_sigmask);
  // 從紅黑樹中刪除該 handle
  RB_REMOVE(uv__signal_tree_s, &uv__signal_tree, handle);
  // 找到第一個訂閱了該信號的 handle
  first_handle = uv__signal_first_handle(handle->signum);
  // 沒有訂閱者了,則告訴操作系統收到該信號時不需要通知 Libuv 了
  if (first_handle == NULL) {
    uv__signal_unregister_handler(handle->signum);
  } else {
    // 判斷是否設置了 UV_SIGNAL_ONE_SHOT
    rem_oneshot = handle->flags & UV_SIGNAL_ONE_SHOT;
    first_oneshot = first_handle->flags & UV_SIGNAL_ONE_SHOT;
    // 如果剩下的 handle 設置了 UV_SIGNAL_ONE_SHOT,但是當前被刪除的 handle 沒有
    // 設置 UV_SIGNAL_ONE_SHOT 則需要重新注冊信號處理函數為 oneshot
    if (first_oneshot && !rem_oneshot) {
      uv__signal_register_handler(handle->signum, 1);
    }
  }

  uv__signal_unlock_and_unblock(&saved_sigmask);

  handle->signum = 0;
  uv__handle_stop(handle);
}

如果 first_oneshot 為 true 說明剩下的 handle 都是設置了 UV_SIGNAL_ONE_SHOT,如果 first_oneshot 為 true 并且 rem_oneshot 為 false 說明目前注冊到操作系統的信號函數沒有設置 oneshot,因為只要有一個 handle 沒有設置UV_SIGNAL_ONE_SHOT,那么注冊到操作系統的信號處理函數都不會設置 oneshot 標記,這時候需要修改重新更新信號處理函數為 oneshot。

信號的處理

從前面的分析可以看到,信號的處理函數統一設置為 uv__signal_handler,所以收到信號時,操作系統就會執行 uv__signal_handler。

// signum 為收到的信息
static void uv__signal_handler(int signum) {
  uv__signal_msg_t msg;
  uv_signal_t* handle;
  int saved_errno;

  saved_errno = errno;
  memset(&msg, 0, sizeof msg);
  // 需要加鎖,避免另一個線程在修改紅黑樹
  uv__signal_lock();
  // 找出 signum 對應的訂閱者
  for (handle = uv__signal_first_handle(signum);
       handle != NULL && handle->signum == signum;
       handle = RB_NEXT(uv__signal_tree_s, &uv__signal_tree, handle)) {
    int r;

    msg.signum = signum;
    msg.handle = handle;

    // 寫入消息通知事件循環
    do {
      r = write(handle->loop->signal_pipefd[1], &msg, sizeof msg);
    } while (r == -1 && errno == EINTR);
  }
  uv__signal_unlock();
  errno = saved_errno;
}

收到信號時并不是直接通知訂閱者,而是通知事件循環,在事件循環的某個階段才會真正通知訂閱者。通知事件循環的方式是通過寫入多個消息到管道中,事件循環在 Poll IO 階段就會判斷這個管道可讀,從而讀出所有的消息進行處理。前面介紹初始化信號結構體時說過,第一次初始化時會 uv__signal_loop_once_init 往事件循環中注冊一個 IO 觀察者,對應的處理函數是 uv__signal_event。

static void uv__signal_event(uv_loop_t* loop,
                             uv__io_t* w,
                             unsigned int events) {
  uv__signal_msg_t* msg;
  uv_signal_t* handle;
  char buf[sizeof(uv__signal_msg_t) * 32];
  size_t bytes, end, i;
  int r;

  bytes = 0;
  end = 0;

  do {
    // 讀出消息
    r = read(loop->signal_pipefd[0], buf + bytes, sizeof(buf) - bytes);
    bytes += r;
    end = (bytes / sizeof(uv__signal_msg_t)) * sizeof(uv__signal_msg_t);
    // 逐個處理消息
    for (i = 0; i < end; i += sizeof(uv__signal_msg_t)) {
      msg = (uv__signal_msg_t*) (buf + i);
      handle = msg->handle;
      // 執行回調
      if (msg->signum == handle->signum) {
        handle->signal_cb(handle, handle->signum);
      }
      // 設置了 UV_SIGNAL_ONE_SHOT,則解除訂閱關系
      if (handle->flags & UV_SIGNAL_ONE_SHOT)
        uv__signal_stop(handle);
    }

    bytes -= end;

    if (bytes) {
      memmove(buf, buf + end, bytes);
      continue;
    }
  } while (end == sizeof buf);
}

信號的使用

信號的處理具有非常高的優先級,這個能力在很多場景下非常有用。下面看一個簡單的場景。

#include <signal.h>
#include <unistd.h>
#include <stdio.h>

void handler(int s) {
    printf("receive signal\n");
}

int main () {
    printf("%d\n", getpid());
    fflush(stdout);
    signal(SIGUSR1, handler);
    while(1) {}
    return 0;
}

通過 gcc main.c -luv && ./a.out 編譯執行上面代碼,然后再執行 kill -SIGUSR1 pid(執行 ./a.out 輸出的 pid),可以看到會輸出 receive signal,也就是說,盡管進程處于死循環,信號機制依然可以正常工作。下面繼續來看一下兩個具體的應用場景。

第一個是在 Node.js 中。假設業務中有以下一段代碼。

console.log(process.pid);

function a() {
    while(1) {
        b();
    }
}

function b() {
    while(1) {}
}
a();

有一天我們發現服務的某個進程處于 100% 了,那么我們應該如何排查呢?我們知道 Node.js 是單線程的,JS 線程處于死循環時,是無法處理外部進來的請求了,也就意味著我們不能手動采集 CPU Profile 了。這時候信號機制的作用就來了,我們找到這個進程的 pid,然后執行 kill -SIGUSR1 pid 會發現 Node.js 的調試器(本質上是創建了一個線程監聽了一個端口)被打開了,通過 Chrome Dev Tools 連接上調試器我們就可以采集 CPU Profile 了(重點是打開調試器,采集方式很多種)。結果如下。

可以看到通過 Profile 我們就可以輕松分析出是哪里的代碼導致了死循環,從而快速解決業務中的問題。

接著再看一個 GO 的例子。

package main

import (
    "fmt"
    "runtime"
)

func main() {
    // 設置只有單個線程
    runtime.GOMAXPROCS(1)
    go func() {
        for {
            fmt.Println("worker goroutine")
        }
    }()
    for {

    }
}

在 go1.13 下執行上面的代碼,沒有任務輸出,然后切換到 go1.23 再試試(可以通過 gvm 管理 Go 版本),可以看到不斷輸出 worker goroutine。為什么會這樣呢?Go 雖然通過協程原生支持了并發,但是在單個線程中,如果一個 goroutine 正在執行時,其他 goroutine 是無法執行的,Go 為了避免 goroutine 饑餓問題,實現了搶占機制,但是早期實現的是基于協作式的搶占機制(比如 go1.13 版本),協作式搶占的原理是在函數中插入搶占代碼,goroutine 執行到函數時 Go runtime 會判斷 goroutine 的事件片是不是用完了,用完了則進行調度,這種搶占機制受限于函數,如果我們不執行函數的話就繞過了這個檢測,比如上面的例子,鑒于這個限制,Go 現在已經實現了基于信號的搶占式機制(比如 go1.23 版本),基于信號的搶占式機制正式使用了信號高優先級的能力,盡管 goroutine 處于死循環,Go runtime 依然有能力介入,從而實現 goroutine 的調度。

責任編輯:姜華 來源: 編程雜技
相關推薦

2023-07-06 13:56:14

微軟Skype

2020-09-08 06:54:29

Java Gradle語言

2023-09-22 17:36:37

2021-01-28 22:31:33

分組密碼算法

2020-05-22 08:16:07

PONGPONXG-PON

2018-06-07 13:17:12

契約測試單元測試API測試

2021-08-01 09:55:57

Netty時間輪中間件

2023-09-27 16:39:38

2024-10-28 21:02:36

消息框應用程序

2011-07-05 18:40:19

QT 信號 機制

2021-12-06 09:43:01

鏈表節點函數

2021-07-16 11:48:26

模型 .NET微軟

2023-09-20 23:01:03

Twitter算法

2021-03-01 18:37:15

MySQL存儲數據

2024-07-16 10:52:09

2011-07-05 18:32:52

QT 信號 機制

2019-02-13 14:15:59

Linux版本Fedora

2021-08-04 09:32:05

Typescript 技巧Partial

2021-01-29 08:32:21

數據結構數組

2021-02-06 08:34:49

函數memoize文檔
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久久成人国产精品 | 99精品网 | 午夜电影网 | 视频一区二区在线观看 | 欧美国产亚洲一区二区 | 国产精品一区二区av | 81精品国产乱码久久久久久 | 久久亚洲精品久久国产一区二区 | 国产精品日本一区二区在线播放 | 精品国产久 | 又黑又粗又长的欧美一区 | 噜噜噜噜狠狠狠7777视频 | 日本一区二区视频 | 国产精品一区二区三 | 欧美一级做性受免费大片免费 | 久久黄网 | 日韩欧美一区二区三区免费观看 | 第四色影音先锋 | 粉嫩粉嫩芽的虎白女18在线视频 | 亚洲视频免费在线播放 | 91性高湖久久久久久久久_久久99 | 欧美日韩综合 | 色综合99| 一区二区三区亚洲精品国 | 国产午夜精品视频 | 四虎永久免费在线 | 精品国产欧美一区二区三区成人 | 亚洲大片一区 | 色视频成人在线观看免 | 亚洲资源站 | 日本不卡一区二区 | 欧洲一区二区视频 | 久久久久久免费毛片精品 | 日本在线播放一区二区 | 亚洲伊人精品酒店 | 久久久久无码国产精品一区 | 亚洲免费影院 | 久久草在线视频 | 天堂亚洲 | 999视频在线播放 | 中文字幕高清av |