通過N-API使用Libuv線程池
本文轉載自微信公眾號「編程雜技」,作者theanarkh。轉載本文請聯系編程雜技公眾號。
Node.js不適合處理耗時操作是一直存在的問題,為此Node.js提供了三種解決方案。
1 子進程
2 子線程
3 Libuv線程池
前兩種是開發效率比較高的,因為我們只需要寫js。但是也有些缺點
1 執行js的成本
2 雖然可以間接使用Libuv線程池,但是受限于Node.js提供的API。
3 無法利用c/c++層提供的解決方案(內置或業界的)。
這時候我們可以嘗試第三種解決方案。直接通過N-API使用Libuv線程池。下面我們看看這么做。N-API提供了幾個API。
- napi_create_async_work // 創建一個worr,但是還沒有執行
- napi_delete_async_work // 釋放上面創建的work的內存
- napi_queue_async_work // 往Libuv提交一個work
- napi_cancel_async_work // 取消Libuv中的任務,如果已經在執行則無法取消
接下來我們看看如何通過N-API使用Libuv線程池。首先看看js層。
- const { submitWork } = require('./build/Release/test.node');
- submitWork((sum) => {
- console.log(sum)
- })
js提交一個任務,然后傳入一個回調。接著看看N-API的代碼。
- napi_value Init(napi_env env, napi_value exports) {
- napi_value func;
- napi_create_function(env,
- NULL,
- NAPI_AUTO_LENGTH,
- submitWork,
- NULL,
- &func);
- napi_set_named_property(env, exports, "submitWork", func);
- return exports;
- }
- NAPI_MODULE(NODE_GYP_MODULE_NAME, Init)
首先定義導出的函數,接著看核心邏輯。
1 定義一個結構體保存上下文
- struct info
- {
- int sum; // 保存計算結果
- napi_ref func; // 保存回調
- napi_async_work worker; // 保存work對象
- };
2 提交任務到Libuv
- static napi_value submitWork(napi_env env, napi_callback_info info) {
- napi_value resource_name;
- napi_status status;
- size_t argc = 1;
- napi_value args[1];
- struct info data = {0, nullptr, nullptr};
- struct info * ptr = &data;
- status = napi_get_cb_info(env, info, &argc, args, NULL, NULL);
- if (status != napi_ok) {
- goto done;
- }
- napi_create_reference(env, args[0], 1, &ptr->func);
- status = napi_create_string_utf8(env,"test", NAPI_AUTO_LENGTH, &resource_name);
- if (status != napi_ok) {
- goto done;
- }
- // 創建一個work,ptr保存的上下文會在work函數和done函數里使用
- status = napi_create_async_work(env, nullptr, resource_name, work, done, (void *) ptr, &ptr->worker);
- if (status != napi_ok) {
- goto done;
- }
- // 提及work到Libuv
- status = napi_queue_async_work(env, ptr->worker);
- done:
- napi_value ret;
- napi_create_int32(env, status == napi_ok ? 0 : -1, &ret);
- return ret;
- }
執行上面的函數,任務就會被提交到Libuv線程池了。
3 Libuv子線程執行任務
- void work(napi_env env, void* data) {
- struct info *arg = (struct info *)data;
- printf("doing...\n");
- int sum = 0;
- for (int i = 0; i < 10; i++) {
- sum += i;
- }
- arg->sum = sum;
- }
很簡單,計算幾個數。并且保存結果。
4 回調js
- void done(napi_env env, napi_status status, void* data) {
- struct info *arg = (struct info *)data;
- if (status == napi_cancelled) {
- printf("cancel...");
- } else if (status == napi_ok) {
- printf("done...\n");
- napi_value callback;
- napi_value global;
- napi_value result;
- napi_value sum;
- // 拿到結果
- napi_create_int32(env, arg->sum, &sum);
- napi_get_reference_value(env, arg->func, &callback);
- napi_get_global(env, &global);
- // 回調js
- napi_call_function(env, global, callback, 1, &sum, &result);
- // 清理
- napi_delete_reference(env, arg->func);
- napi_delete_async_work(env, arg->worker);
- }
- }
并且執行后,我們看到輸出了45。接下來我們分析大致的過程。首先我呢看看ThreadPoolWork,ThreadPoolWork是對Libuv work的封裝。
- class ThreadPoolWork {
- public:
- explicit inline ThreadPoolWork(Environment* env) : env_(env) {
- CHECK_NOT_NULL(env);
- }
- inline virtual ~ThreadPoolWork() = default;
- inline void ScheduleWork();
- inline int CancelWork();
- virtual void DoThreadPoolWork() = 0;
- virtual void AfterThreadPoolWork(int status) = 0;
- Environment* env() const { return env_; }
- private:
- Environment* env_;
- uv_work_t work_req_;
- };
類的定義很簡單,主要是封裝了uv_work_t。我們看看每個函數的意義。DoThreadPoolWork和AfterThreadPoolWork是虛函數,由子類實現,我們一會看子類的時候再分析。我們看看ScheduleWork
- void ThreadPoolWork::ScheduleWork() {
- env_->IncreaseWaitingRequestCounter();
- int status = uv_queue_work(
- env_->event_loop(),
- &work_req_,
- // Libuv子線程里執行的任務函數
- [](uv_work_t* req) {
- ThreadPoolWork* self = ContainerOf(&ThreadPoolWork::work_req_, req);
- self->DoThreadPoolWork();
- },
- // 任務處理完后的回調
- [](uv_work_t* req, int status) {
- ThreadPoolWork* self = ContainerOf(&ThreadPoolWork::work_req_, req);
- self->env_->DecreaseWaitingRequestCounter();
- self->AfterThreadPoolWork(status);
- });
- CHECK_EQ(status, 0);
- }
ScheduleWork是負責給Libuv提交任務的函數。接著看看CancelWork。
- int ThreadPoolWork::CancelWork() {
- return uv_cancel(reinterpret_cast<uv_req_t*>(&work_req_));
- }
直接調用Libuv的函數取消任務。看完父類,我們看看子類的定義,子類在N-API里實現。
- class Work : public node::AsyncResource, public node::ThreadPoolWork {
- private:
- explicit Work(node_napi_env env,
- v8::Local<v8::Object> async_resource,
- v8::Local<v8::String> async_resource_name,
- napi_async_execute_callback execute,
- napi_async_complete_callback complete = nullptr,
- void* data = nullptr)
- : AsyncResource(env->isolate,
- async_resource,
- *v8::String::Utf8Value(env->isolate, async_resource_name)),
- ThreadPoolWork(env->node_env()),
- _env(env),
- _data(data),
- _execute(execute),
- _complete(complete) {
- }
- ~Work() override = default;
- public:
- static Work* New(node_napi_env env,
- v8::Local<v8::Object> async_resource,
- v8::Local<v8::String> async_resource_name,
- napi_async_execute_callback execute,
- napi_async_complete_callback complete,
- void* data) {
- return new Work(env, async_resource, async_resource_name,
- execute, complete, data);
- }
- // 釋放該類對象的內存
- static void Delete(Work* work) {
- delete work;
- }
- // 執行用戶設置的函數
- void DoThreadPoolWork() override {
- _execute(_env, _data);
- }
- void AfterThreadPoolWork(int status) override {
- // 執行用戶設置的回調
- _complete(env, ConvertUVErrorCode(status), _data);
- }
- private:
- node_napi_env _env;
- // 用戶設置的數據,用于保存執行結果等
- void* _data;
- // 執行任務的函數
- napi_async_execute_callback _execute;
- // 任務處理完的回調
- napi_async_complete_callback _complete;
- };
在Work類我們看到了虛函數DoThreadPoolWork和AfterThreadPoolWork的實現,沒有太多邏輯。最后我們看看N-API提供的API的實現。
- napi_status napi_create_async_work(napi_env env,
- napi_value async_resource,
- napi_value async_resource_name,
- napi_async_execute_callback execute,
- napi_async_complete_callback complete,
- void* data,
- napi_async_work* result) {
- v8::Local<v8::Context> context = env->context();
- v8::Local<v8::Object> resource;
- if (async_resource != nullptr) {
- CHECK_TO_OBJECT(env, context, resource, async_resource);
- } else {
- resource = v8::Object::New(env->isolate);
- }
- v8::Local<v8::String> resource_name;
- CHECK_TO_STRING(env, context, resource_name, async_resource_name);
- uvimpl::Work* work = uvimpl::Work::New(reinterpret_cast<node_napi_env>(env),
- resource,
- resource_name,
- execute,
- complete,
- data);
- *result = reinterpret_cast<napi_async_work>(work);
- return napi_clear_last_error(env);
- }
napi_create_async_work本質上是對Work的簡單封裝,創建一個Work并返回給用戶。
2 napi_delete_async_work
- napi_status napi_delete_async_work(napi_env env, napi_async_work work) {
- CHECK_ENV(env);
- CHECK_ARG(env, work);
- uvimpl::Work::Delete(reinterpret_cast<uvimpl::Work*>(work));
- return napi_clear_last_error(env);
- }
napi_delete_async_work用于任務執行完后釋放Work對應的內存。
3 napi_queue_async_work
- napi_status napi_queue_async_work(napi_env env, napi_async_work work) {
- CHECK_ENV(env);
- CHECK_ARG(env, work);
- napi_status status;
- uv_loop_t* event_loop = nullptr;
- status = napi_get_uv_event_loop(env, &event_loop);
- if (status != napi_ok)
- return napi_set_last_error(env, status);
- uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
- w->ScheduleWork();
- return napi_clear_last_error(env);
- }
napi_queue_async_work是對ScheduleWork的封裝,作用是給Libuv線程池提交任務。
4 napi_cancel_async_work
- napi_status napi_cancel_async_work(napi_env env, napi_async_work work) {
- CHECK_ENV(env);
- CHECK_ARG(env, work);
- uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
- CALL_UV(env, w->CancelWork());
- return napi_clear_last_error(env);
- }
napi_cancel_async_work是對CancelWork的封裝,即取消Libuv線程池的任務。我們看到一層層套,沒有太多邏輯,主要是要符合N-API的規范。
總結:通過N-API提供的API,使得我們不再受限于Nod.js本身提供的一些異步接口(使用Libuv線程池的接口),而是直接使用Libuv線程池,這樣我們不僅可以自己寫c/c++,還可以復用業界的一些解決方案解決Node.js里的一些耗時任務。
倉庫:https://github.com/theanarkh/learn-to-write-nodejs-addons