成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Node.js子線程調試和診斷指南

開發 前端
本文介紹另外一種對子線程代碼無侵入的調試方式,另外也介紹一下通過子線程調試主線程的方式。

[[419114]]

調試、診斷子線程最直接的方式就是像調試、診斷主線程一樣,但是無論是動態開啟還是靜態開啟,子線程都不可避免地需要內置一些相關的非業務代碼,本文介紹另外一種對子線程代碼無侵入的調試方式,另外也介紹一下通過子線程調試主線程的方式。

1.初始化子線程的Inspector

在Node.js啟動子線程的時候,會初始化Inspector。

  1. env_->InitializeInspector(std::move(inspector_parent_handle_)); 

在分析InitializeInspector之前,我們先看一下inspector_parent_handle_。

  1. std::unique_ptr<inspector::ParentInspectorHandle> inspector_parent_handle_; 

inspector_parent_handle_是一個ParentInspectorHandle對象,這個對象是子線程和主線程通信的橋梁。我們看一下他的初始化邏輯(在主線程里執行)。

  1. inspector_parent_handle_ = env->inspector_agent()->GetParentHandle(thread_id_, url); 

調用agent的GetParentHandle獲取一個ParentInspectorHandle對象。

  1. std::unique_ptr<ParentInspectorHandle> Agent::GetParentHandle(int thread_id, const std::string& url) { 
  2.  return client_->getWorkerManager()->NewParentHandle(thread_id, url); 

內部其實是通過client_->getWorkerManager()對象的NewParentHandle方法獲取ParentInspectorHandle對象,接下來我們看一下WorkerManager的NewParentHandle。

  1. std::unique_ptr<ParentInspectorHandle> WorkerManager::NewParentHandle(int thread_id, const std::string& url) { 
  2.   bool wait = !delegates_waiting_on_start_.empty(); 
  3.   return std::make_unique<ParentInspectorHandle>(thread_id, url, thread_, wait); 
  4.  
  5. ParentInspectorHandle::ParentInspectorHandle( 
  6.     int id, const std::string& url, 
  7.     std::shared_ptr<MainThreadHandle> parent_thread,  
  8.     bool wait_for_connect 
  9.     : id_(id),  
  10.       url_(url),  
  11.       parent_thread_(parent_thread), 
  12.       wait_(wait_for_connect) {} 

最終的架構圖如下入所示。

分析完ParentInspectorHandle后繼續看一下env_->InitializeInspector(std::move(inspector_parent_handle_))的邏輯(在子線程里執行)。

  1. int Environment::InitializeInspector( 
  2.     std::unique_ptr<inspector::ParentInspectorHandle> parent_handle) { 
  3.  
  4.   std::string inspector_path; 
  5.   inspector_path = parent_handle->url(); 
  6.   inspector_agent_->SetParentHandle(std::move(parent_handle)); 
  7.   inspector_agent_->Start(inspector_path, 
  8.                           options_->debug_options(), 
  9.                           inspector_host_port(), 
  10.                           is_main_thread()); 

首先把ParentInspectorHandle對象保存到agent中,然后調用agent的Start方法。

  1. bool Agent::Start(...) { 
  2.     // 新建client對象 
  3.    client_ = std::make_shared<NodeInspectorClient>(parent_env_, is_main); 
  4.    // 調用agent中保存的ParentInspectorHandle對象的WorkerStarted 
  5.    parent_handle_->WorkerStarted(client_->getThreadHandle(), ...); 

Agent::Start創建了一個client對象,然后調用ParentInspectorHandle對象的WorkerStarted方法(剛才SetParentHandle的時候保存的),我們看一下這時候的架構圖。

接著看parent_handle_->WorkerStarted。

  1. void ParentInspectorHandle::WorkerStarted( 
  2.     std::shared_ptr<MainThreadHandle> worker_thread, bool waiting) { 
  3.   std::unique_ptr<Request> request( 
  4.       new WorkerStartedRequest(id_, url_, worker_thread, waiting)); 
  5.   parent_thread_->Post(std::move(request)); 

WorkerStarted創建了一個WorkerStartedRequest請求,然后通過parent_thread_->Post提交,parent_thread_是MainThreadInterface對象。

  1. void MainThreadInterface::Post(std::unique_ptr<Request> request) { 
  2.   Mutex::ScopedLock scoped_lock(requests_lock_); 
  3.   // 之前是空則需要喚醒消費者 
  4.   bool needs_notify = requests_.empty(); 
  5.   // 消息入隊 
  6.   requests_.push_back(std::move(request)); 
  7.   if (needs_notify) { 
  8.        // 獲取當前對象的一個弱引用 
  9.        std::weak_ptr<MainThreadInterface>* interface_ptr = new std::weak_ptr<MainThreadInterface>(shared_from_this()); 
  10.       // 請求V8執行RequestInterrupt入參對應的回調 
  11.       isolate_->RequestInterrupt([](v8::Isolate* isolate, void* opaque) { 
  12.         // 把執行時傳入的參數轉成MainThreadInterface 
  13.         std::unique_ptr<std::weak_ptr<MainThreadInterface>> interface_ptr { 
  14.           static_cast<std::weak_ptr<MainThreadInterface>*>(opaque)  
  15.         }; 
  16.         // 判斷對象是否還有效,是則調用DispatchMessages 
  17.         if (auto iface = interface_ptr->lock()) iface->DispatchMessages(); 
  18.  
  19.       }, static_cast<void*>(interface_ptr)); 
  20.   } 
  21.   // 喚醒消費者 
  22.   incoming_message_cond_.Broadcast(scoped_lock); 

我們看看這時候的架構圖。

接著看回調里執行MainThreadInterface對象DispatchMessages方法的邏輯。

  1. void MainThreadInterface::DispatchMessages() { 
  2.   // 遍歷請求隊列 
  3.   requests_.swap(dispatching_message_queue_); 
  4.   while (!dispatching_message_queue_.empty()) { 
  5.     MessageQueue::value_type task; 
  6.     std::swap(dispatching_message_queue_.front(), task); 
  7.     dispatching_message_queue_.pop_front(); 
  8.     // 執行任務函數 
  9.     task->Call(this); 
  10.   } 

task是WorkerStartedRequest對象,看一下Call方法的代碼。

  1. void Call(MainThreadInterface* thread) override { 
  2.   auto manager = thread->inspector_agent()->GetWorkerManager(); 
  3.   manager->WorkerStarted(id_, info_, waiting_); 

接著調用agent的WorkerManager的WorkerStarted。

  1. void WorkerManager::WorkerStarted(int session_id, 
  2.                                   const WorkerInfo& info, 
  3.                                   bool waiting) { 
  4.   children_.emplace(session_id, info); 
  5.   for (const auto& delegate : delegates_) { 
  6.     Report(delegate.second, info, waiting); 
  7.   } 

WorkerStarted記錄了一個id和上下文,因為delegates_初始化的時候是空的,所以不會執行。至此,子線程Inspector初始化的邏輯就分析完了,結構圖如下。

我們發現,和主線程不一樣,主線程會啟動一個WebSocket服務器接收客戶端的連接請求,而子線程只是初始化了一些數據結構。下面我們看一下基于這些數據結構,主線程是如何動態開啟調試子線程的。

2.主線程開啟調試子線程的能力

我們可以以以下方式開啟對子線程的調試。

  1. const { Worker, workerData } = require('worker_threads'); 
  2. const { Session } = require('inspector'); 
  3. // 新建一個新的通信通道 
  4. const session = new Session(); 
  5. session.connect(); 
  6. // 創建子線程 
  7. const worker = new Worker('./httpServer.js', {workerData: {port: 80}});  
  8. // 子線程啟動成功后開啟調試子線程的能力 
  9. worker.on('online', () => { 
  10.     session.post("NodeWorker.enable"
  11.                  {waitForDebuggerOnStart: false},   
  12.                  (err) => {   
  13.                     err && console.log("NodeWorker.enable", err); 
  14.                  }); 
  15.     }); 
  16. // 防止主線程退出 
  17. setInterval(() => {}, 100000); 

我們先來分析一下connect函數的邏輯。

  1. connect() { 
  2.     this[connectionSymbol] = new Connection((message) => this[onMessageSymbol](message)); 

新建了一個Connection對象并傳入一個回調函數,該回調函數在收到消息時被回調。Connection是C++層導出的對象,由模版類JSBindingsConnection實現。

  1. template <typename ConnectionType> 
  2. class JSBindingsConnection {} 

我們看看導出的路邏輯。

  1. JSBindingsConnection<Connection>::Bind(env, target); 

接著看Bind。

  1. static void Bind(Environment* env, Local<Object> target) { 
  2.     // class_name是Connection 
  3.     Local<String> class_name = ConnectionType::GetClassName(env); 
  4.     Local<FunctionTemplate> tmpl = env->NewFunctionTemplate(JSBindingsConnection::New); 
  5.     tmpl->InstanceTemplate()->SetInternalFieldCount(1); 
  6.     tmpl->SetClassName(class_name); 
  7.     tmpl->Inherit(AsyncWrap::GetConstructorTemplate(env)); 
  8.     env->SetProtoMethod(tmpl, "dispatch", JSBindingsConnection::Dispatch); 
  9.     env->SetProtoMethod(tmpl, "disconnect", JSBindingsConnection::Disconnect); 
  10.     target->Set(env->context(), 
  11.                 class_name, 
  12.                 tmpl->GetFunction(env->context()).ToLocalChecked()) 
  13.         .ToChecked(); 
  14.   } 

當我們在JS層執行new Connection的時候,就會執行JSBindingsConnection::New。

  1. static void New(const FunctionCallbackInfo<Value>& info) { 
  2.    Environment* env = Environment::GetCurrent(info); 
  3.    Local<Function> callback = info[0].As<Function>(); 
  4.    new JSBindingsConnection(env, info.This(), callback); 

我們看看新建一個JSBindingsConnection對象時的邏輯。

  1. JSBindingsConnection(Environment* env, 
  2.                        Local<Object> wrap, 
  3.                        Local<Function> callback) 
  4.                        : AsyncWrap(env, wrap, PROVIDER_INSPECTORJSBINDING), 
  5.                          callback_(env->isolate(), callback) { 
  6.     Agent* inspector = env->inspector_agent(); 
  7.     session_ = LocalConnection::Connect
  8.         inspector, std::make_unique<JSBindingsSessionDelegate>(env, this) 
  9.     );}static std::unique_ptr<InspectorSession> Connect
  10.       Agent* inspector,  
  11.       std::unique_ptr<InspectorSessionDelegate> delegate 
  12. ) { 
  13.     return inspector->Connect(std::move(delegate), false); 

最終是傳入了一個JSBindingsSessionDelegate對象調用Agent的Connect方法。

  1. std::unique_ptr<InspectorSession> Agent::Connect
  2.     std::unique_ptr<InspectorSessionDelegate> delegate, 
  3.     bool prevent_shutdown) { 
  4.   int session_id = client_->connectFrontend(std::move(delegate), 
  5.                                             prevent_shutdown); 
  6.   // JSBindingsConnection對象的session_字段指向的對象                                          
  7.   return std::unique_ptr<InspectorSession>( 
  8.       new SameThreadInspectorSession(session_id, client_) 
  9.   ); 

Agent的Connect方法繼續調用client_->connectFrontend。

  1. int connectFrontend(std::unique_ptr<InspectorSessionDelegate> delegate, 
  2.                       bool prevent_shutdown) { 
  3.     int session_id = next_session_id_++; 
  4.     channels_[session_id] = std::make_unique<ChannelImpl>(env_, 
  5.                                                           client_, 
  6.                                                           getWorkerManager(), 
  7.                                                           std::move(delegate), 
  8.                                                           getThreadHandle(), 
  9.                                                           prevent_shutdown); 
  10.     return session_id; 

connectFrontend新建了一個ChannelImpl對象,在新建ChannelImpl時,會初始化子線程處理的邏輯。

  1. explicit ChannelImpl(Environment* env, 
  2.                        const std::unique_ptr<V8Inspector>& inspector, 
  3.                        std::shared_ptr<WorkerManager> worker_manager, 
  4.                        std::unique_ptr<InspectorSessionDelegate> delegate, 
  5.                        std::shared_ptr<MainThreadHandle> main_thread_, 
  6.                        bool prevent_shutdown) 
  7.       : delegate_(std::move(delegate)), prevent_shutdown_(prevent_shutdown), 
  8.         retaining_context_(false) { 
  9.     session_ = inspector->connect(CONTEXT_GROUP_ID, this, StringView()); 
  10.     // Node.js拓展命令的處理分發器 
  11.     node_dispatcher_ = std::make_unique<protocol::UberDispatcher>(this); 
  12.     // trace相關 
  13.     tracing_agent_ = std::make_unique<protocol::TracingAgent>(env, main_thread_); 
  14.     tracing_agent_->Wire(node_dispatcher_.get()); 
  15.     // 處理子線程相關 
  16.     if (worker_manager) { 
  17.       worker_agent_ = std::make_unique<protocol::WorkerAgent>(worker_manager); 
  18.       worker_agent_->Wire(node_dispatcher_.get()); 
  19.     } 
  20.     // 處理runtime 
  21.     runtime_agent_ = std::make_unique<protocol::RuntimeAgent>(); 
  22.     runtime_agent_->Wire(node_dispatcher_.get()); 

我們這里只關注處理子線程相關的邏輯。看一下 worker_agent_->Wire。

  1. void WorkerAgent::Wire(UberDispatcher* dispatcher) { 
  2.   frontend_.reset(new NodeWorker::Frontend(dispatcher->channel())); 
  3.   NodeWorker::Dispatcher::wire(dispatcher, this); 
  4.   auto manager = manager_.lock(); 
  5.   workers_ = std::make_shared<NodeWorkers>(frontend_, manager->MainThread()); 

這時候的架構圖如下

接著看一下NodeWorker::Dispatcher::wire(dispatcher, this)的邏輯。

  1. void Dispatcher::wire(UberDispatcher* uber, Backend* backend){ 
  2.     std::unique_ptr<DispatcherImpl> dispatcher(new DispatcherImpl(uber->channel(), backend)); 
  3.     uber->setupRedirects(dispatcher->redirects()); 
  4.     uber->registerBackend("NodeWorker", std::move(dispatcher)); 

首先新建了一個DispatcherImpl對象。

  1. DispatcherImpl(FrontendChannel* frontendChannel, Backend* backend) 
  2.         : DispatcherBase(frontendChannel) 
  3.         , m_backend(backend) { 
  4.         m_dispatchMap["NodeWorker.sendMessageToWorker"] = &DispatcherImpl::sendMessageToWorker; 
  5.         m_dispatchMap["NodeWorker.enable"] = &DispatcherImpl::enable; 
  6.         m_dispatchMap["NodeWorker.disable"] = &DispatcherImpl::disable; 
  7.         m_dispatchMap["NodeWorker.detach"] = &DispatcherImpl::detach; 

除了初始化一些字段,另外了一個kv數據結構,這個是一個路由配置,后面我們會看到它的作用。新建完DispatcherImpl后又調用了uber->registerBackend("NodeWorker", std::move(dispatcher))注冊該對象。

  1. void UberDispatcher::registerBackend(const String& name, std::unique_ptr<protocol::DispatcherBase> dispatcher){ 
  2.     m_dispatchers[name] = std::move(dispatcher); 

這時候的架構圖如下。

我們看到這里其實是建立了一個路由體系,后面收到命令時就會根據這些路由配置進行轉發,類似Node.js Express框架路由機制。這時候可以通過session的post給主線程發送NodeWorker.enable命令來開啟子線程的調試。我們分析這個過程。

  1. post(method, params, callback) { 
  2.     // 忽略參數處理 
  3.     // 保存請求對應的回調 
  4.     if (callback) { 
  5.       this[messageCallbacksSymbol].set(id, callback); 
  6.     } 
  7.     // 調用C++的dispatch 
  8.     this[connectionSymbol].dispatch(JSONStringify(message)); 

this[connectionSymbol]對應的是JSBindingsConnection對象。

  1. static void Dispatch(const FunctionCallbackInfo<Value>& info) { 
  2.     Environment* env = Environment::GetCurrent(info); 
  3.     JSBindingsConnection* session; 
  4.     ASSIGN_OR_RETURN_UNWRAP(&session, info.Holder()); 
  5.     if (session->session_) { 
  6.       session->session_->Dispatch( 
  7.           ToProtocolString(env->isolate(), info[0])->string()); 
  8.     } 

session_是一個SameThreadInspectorSession對象。

  1. void SameThreadInspectorSession::Dispatch( 
  2.     const v8_inspector::StringView& message) { 
  3.   auto client = client_.lock(); 
  4.   client->dispatchMessageFromFrontend(session_id_, message);}void dispatchMessageFromFrontend(int session_id, const StringView& message) { 
  5.     channels_[session_id]->dispatchProtocolMessage(message); 

最終調用了ChannelImpl的dispatchProtocolMessage。

  1. void dispatchProtocolMessage(const StringView& message) { 
  2.     std::string raw_message = protocol::StringUtil::StringViewToUtf8(message); 
  3.     std::unique_ptr<protocol::DictionaryValue> value = 
  4.         protocol::DictionaryValue::cast(protocol::StringUtil::parseMessage( 
  5.             raw_message, false)); 
  6.     int call_id; 
  7.     std::string method; 
  8.     // 解析命令 
  9.     node_dispatcher_->parseCommand(value.get(), &call_id, &method); 
  10.     // 判斷命令是V8內置命令還是Node.js拓展的命令 
  11.     if (v8_inspector::V8InspectorSession::canDispatchMethod( 
  12.             Utf8ToStringView(method)->string())) { 
  13.       session_->dispatchProtocolMessage(message); 
  14.     } else { 
  15.       node_dispatcher_->dispatch(call_id, method, std::move(value), 
  16.                                  raw_message); 
  17.     } 

因為NodeWorker.enable是Node.js拓展的命令,所以會走到else里面的邏輯。根據路由配置找到該命令對應的處理邏輯(NodeWorker.enable以.切分,對應兩級路由)。

  1. void UberDispatcher::dispatch(int callId, const String& in_method, std::unique_ptr<Value> parsedMessage, const ProtocolMessage& rawMessage){ 
  2.     // 找到一級路由配置 
  3.     protocol::DispatcherBase* dispatcher = findDispatcher(method); 
  4.     std::unique_ptr<protocol::DictionaryValue> messageObject = DictionaryValue::cast(std::move(parsedMessage)); 
  5.     // 交給一級路由處理器處理 
  6.     dispatcher->dispatch(callId, method, rawMessage, std::move(messageObject)); 

NodeWorker.enable對應的路由處理器代碼如下

  1. void DispatcherImpl::dispatch(int callId, const String& method, const ProtocolMessage& message, std::unique_ptr<protocol::DictionaryValue> messageObject){ 
  2.     // 查找二級路由 
  3.     std::unordered_map<String, CallHandler>::iterator it = m_dispatchMap.find(method); 
  4.     protocol::ErrorSupport errors; 
  5.     // 找到處理函數 
  6.     (this->*(it->second))(callId, method, message, std::move(messageObject), &errors); 

dispatch繼續尋找命令對應的處理函數,最終找到NodeWorker.enable命令的處理函數為DispatcherImpl::enable。

  1. void DispatcherImpl::enable(...){ 
  2.     std::unique_ptr<DispatcherBase::WeakPtr> weak = weakPtr(); 
  3.     DispatchResponse response = m_backend->enable(...); 
  4.     // 返回響應給命令(類似請求/響應模式) 
  5.     weak->get()->sendResponse(callId, response); 

根據架構圖可以知道m_backend是WorkerAgent對象。

  1. DispatchResponse WorkerAgent::enable(bool waitForDebuggerOnStart) { 
  2.   auto manager = manager_.lock(); 
  3.   std::unique_ptr<AgentWorkerInspectorDelegate> delegate(new AgentWorkerInspectorDelegate(workers_)); 
  4.   event_handle_ = manager->SetAutoAttach(std::move(delegate)); 
  5.   return DispatchResponse::OK(); 

繼續調用WorkerManager的SetAutoAttach方法。

  1. std::unique_ptr<WorkerManagerEventHandle> WorkerManager::SetAutoAttach( 
  2.     std::unique_ptr<WorkerDelegate> attach_delegate) { 
  3.   int id = ++next_delegate_id_; 
  4.   // 保存delegate 
  5.   delegates_[id] = std::move(attach_delegate); 
  6.   const auto& delegate = delegates_[id]; 
  7.   // 通知子線程 
  8.   for (const auto& worker : children_) { 
  9.     Report(delegate, worker.secondfalse); 
  10.   } 
  11.   ... 

SetAutoAttach遍歷子線程。

  1. void Report(const std::unique_ptr<WorkerDelegate>& delegate, 
  2.             const WorkerInfo& info, bool waiting) { 
  3.   if (info.worker_thread) 
  4.     delegate->WorkerCreated(info.title, info.url, waiting, info.worker_thread); 

info是一個WorkerInfo對象,該對象是子線程初始化和主線程建立關系的數據結構。delegate是AgentWorkerInspectorDelegate對象。

  1. void WorkerCreated(const std::string& title, 
  2.                      const std::string& url, 
  3.                      bool waiting, 
  4.                      std::shared_ptr<MainThreadHandle> target) override { 
  5.     workers_->WorkerCreated(title, url, waiting, target); 

workers_是一個NodeWorkers對象。

  1. void NodeWorkers::WorkerCreated(const std::string& title, 
  2.                                 const std::string& url, 
  3.                                 bool waiting, 
  4.                                 std::shared_ptr<MainThreadHandle> target) { 
  5.   auto frontend = frontend_.lock(); 
  6.   std::string id = std::to_string(++next_target_id_); 
  7.   // 處理數據通信的delegate 
  8.   auto delegate = thread_->MakeDelegateThreadSafe( 
  9.       std::unique_ptr<InspectorSessionDelegate>( 
  10.           new ParentInspectorSessionDelegate(id, shared_from_this()) 
  11.       ) 
  12.   ); 
  13.   // 建立和子線程V8 Inspector的通信通道 
  14.   sessions_[id] = target->Connect(std::move(delegate), true); 
  15.   frontend->attachedToWorker(id, WorkerInfo(id, title, url), waiting); 

WorkerCreated建立了一條和子線程通信的通道,然后通知命令的發送方通道建立成功。這時候架構圖如下。

接著看attachedToWorker。

  1. void Frontend::attachedToWorker(const String& sessionId, std::unique_ptr<protocol::NodeWorker::WorkerInfo> workerInfo, bool waitingForDebugger){ 
  2.     std::unique_ptr<AttachedToWorkerNotification> messageData = AttachedToWorkerNotification::create() 
  3.         .setSessionId(sessionId) 
  4.         .setWorkerInfo(std::move(workerInfo)) 
  5.         .setWaitingForDebugger(waitingForDebugger) 
  6.         .build(); 
  7.     // 觸發NodeWorker.attachedToWorker 
  8.     m_frontendChannel->sendProtocolNotification(InternalResponse::createNotification("NodeWorker.attachedToWorker", std::move(messageData))); 

繼續看sendProtocolNotification

  1. void sendProtocolNotification( 
  2.       std::unique_ptr<Serializable> message) override { 
  3.     sendMessageToFrontend(message->serializeToJSON()); 
  4.  } 
  5.  
  6.  void sendMessageToFrontend(const StringView& message) { 
  7.     delegate_->SendMessageToFrontend(message); 
  8.  } 

這里的delegate_是一個JSBindingsSessionDelegate對象。

  1. void SendMessageToFrontend(const v8_inspector::StringView& message) 
  2.         override { 
  3.       Isolate* isolate = env_->isolate(); 
  4.       HandleScope handle_scope(isolate); 
  5.       Context::Scope context_scope(env_->context()); 
  6.       MaybeLocal<String> v8string = String::NewFromTwoByte(isolate, 
  7.                                                            message.characters16(), 
  8.                                                            NewStringType::kNormal, message.length() 
  9.       ); 
  10.       Local<Value> argument = v8string.ToLocalChecked().As<Value>(); 
  11.       // 收到消息執行回調 
  12.       connection_->OnMessage(argument); 
  13.  
  14. // 執行JS層回調 
  15. void OnMessage(Local<Value> value) { 
  16.    MakeCallback(callback_.Get(env()->isolate()), 1, &value); 

JS層回調邏輯如下。

  1. [onMessageSymbol](message) { 
  2.     const parsed = JSONParse(message); 
  3.     // 收到的消息如果是某個請求的響應,則有個id字段記錄了請求對應的id,否則則觸發事件 
  4.     if (parsed.id) { 
  5.        const callback = this[messageCallbacksSymbol].get(parsed.id); 
  6.        this[messageCallbacksSymbol].delete(parsed.id); 
  7.        if (callback) { 
  8.          callback(null, parsed.result); 
  9.        } 
  10.      } else { 
  11.        this.emit(parsed.method, parsed); 
  12.        this.emit('inspectorNotification', parsed); 
  13.      } 
  14.   } 

主線程拿到Worker Session對一個的id,后續就可以通過命令NodeWorker.sendMessageToWorker加上該id和子線程通信。大致原理如下,主線程通過自己的channel和子線程的channel進行通信,從而達到控制子線程的目的。

我們分析一下NodeWorker.sendMessageToWorker命令的邏輯,對應處理函數為DispatcherImpl::sendMessageToWorker。

  1. void DispatcherImpl::sendMessageToWorker(...){ 
  2.     std::unique_ptr<DispatcherBase::WeakPtr> weak = weakPtr(); 
  3.     DispatchResponse response = m_backend->sendMessageToWorker(in_message, in_sessionId); 
  4.     // 響應 
  5.     weak->get()->sendResponse(callId, response); 
  6.     return

繼續分析m_backend->sendMessageToWorker。

  1. DispatchResponse WorkerAgent::sendMessageToWorker(const String& message, 
  2.                                                   const String& sessionId) { 
  3.   workers_->Receive(sessionId, message); 
  4.   return DispatchResponse::OK(); 
  5.  
  6. void NodeWorkers::Receive(const std::string& id, const std::string& message) { 
  7.   auto it = sessions_.find(id); 
  8.   it->second->Dispatch(Utf8ToStringView(message)->string()); 

sessions_對應的是和子線程的通信的數據結構CrossThreadInspectorSession。看一下該對象的Dispatch方法。

  1. void Dispatch(const StringView& message) override { 
  2.     state_.Call(&MainThreadSessionState::Dispatch, 
  3.                 StringBuffer::create(message)); 

再次調了MainThreadSessionState::Dispatch

  1. void Dispatch(std::unique_ptr<StringBuffer> message) { 
  2.     session_->Dispatch(message->string()); 

session_是SameThreadInspectorSession對象。繼續看它的Dispatch方法。

  1. void SameThreadInspectorSession::Dispatch( 
  2.     const v8_inspector::StringView& message) { 
  3.   auto client = client_.lock(); 
  4.   client->dispatchMessageFromFrontend(session_id_, message);}void dispatchMessageFromFrontend(int session_id, const StringView& message) { 
  5.     channels_[session_id]->dispatchProtocolMessage(message); 

通過層層調用,最終拿到了一個合子線程通信的channel,dispatchProtocolMessage方法剛才已經分析過,該方法會根據命令做不同的處理,因為我們這里發送的是V8內置的命令,所以會交給V8 Inspector處理。當V8 Inspector處理完后,會通過ChannelImpl的sendResponse返回結果。

  1. void sendResponse( 
  2.       int callId, 
  3.       std::unique_ptr<v8_inspector::StringBuffer> message) override { 
  4.     sendMessageToFrontend(message->string()); 
  5.  
  6.  void sendMessageToFrontend(const StringView& message) { 
  7.     delegate_->SendMessageToFrontend(message); 
  8.  } 

這里的delegate_是ParentInspectorSessionDelegate對象。

  1. void SendMessageToFrontend(const v8_inspector::StringView& msg) override { 
  2.   std::string message = protocol::StringUtil::StringViewToUtf8(msg); 
  3.   workers_->Send(id_, message); 
  4.  
  5. void NodeWorkers::Send(const std::string& id, const std::string& message) { 
  6.   auto frontend = frontend_.lock(); 
  7.   if (frontend) 
  8.     frontend->receivedMessageFromWorker(id, message); 
  9.  
  10. void Frontend::receivedMessageFromWorker(const String& sessionId, const String& message){ 
  11.     std::unique_ptr<ReceivedMessageFromWorkerNotification> messageData = ReceivedMessageFromWorkerNotification::create() 
  12.         .setSessionId(sessionId) 
  13.         .setMessage(message) 
  14.         .build(); 
  15.     // 觸發NodeWorker.receivedMessageFromWorker        
  16.     m_frontendChannel->sendProtocolNotification(InternalResponse::createNotification("NodeWorker.receivedMessageFromWorker", std::move(messageData))); 

m_frontendChannel是主線程的ChannelImpl對象。

  1. void sendProtocolNotification( 
  2.     std::unique_ptr<Serializable> message) override { 
  3.     sendMessageToFrontend(message->serializeToJSON()); 
  4.  
  5. void sendMessageToFrontend(const StringView& message) { 
  6.     delegate_->SendMessageToFrontend(message); 

delegate_是C++層傳入的JSBindingsSessionDelegate對象。最終通過JSBindingsSessionDelegate對象回調JS層,之前已經分析過就不再贅述。至此,主線程就具備了控制子線程的能力,但是控制方式有很多種。

2.1 使用通用的V8命令

通過下面代碼收集子線程的CPU Profile信息。

  1. const { Worker, workerData } = require('worker_threads'); 
  2. const { Session } = require('inspector'); 
  3. const session = new Session(); 
  4. session.connect(); 
  5. let id = 1; 
  6. function post(sessionId, method, params, callback) { 
  7.     session.post('NodeWorker.sendMessageToWorker', { 
  8.         sessionId, 
  9.         message: JSON.stringify({ id: id++, method, params }) 
  10.     }, callback); 
  11.  
  12. session.on('NodeWorker.attachedToWorker', (data) => { 
  13.     post(data.params.sessionId, 'Profiler.enable'); 
  14.     post(data.params.sessionId, 'Profiler.start'); 
  15.     // 收集一段時間后提交停止收集命令 
  16.     setTimeout(() => { 
  17.         post(data.params.sessionId, 'Profiler.stop'); 
  18.     }, 10000) 
  19. }); 
  20.  
  21. session.on('NodeWorker.receivedMessageFromWorker', ({ params: { message }}) => {  
  22.     const data = JSON.parse(message); 
  23.     console.log(data); 
  24. }); 
  25.  
  26. const worker = new Worker('./httpServer.js', {workerData: {port: 80}});  
  27. worker.on('online', () => { 
  28.     session.post("NodeWorker.enable",{waitForDebuggerOnStart: false},  (err) => {  console.log(err, "NodeWorker.enable");}); 
  29. }); 
  30. setInterval(() => {}, 100000); 

通過這種方式可以通過命令控制子線程的調試和數據收集。

2.2 在子線程中動態執行腳本

可以通過執行腳本開啟子線程的WebSocket服務,像調試主線程一樣。

  1. const { Worker, workerData } = require('worker_threads'); 
  2. const { Session } = require('inspector'); 
  3. const session = new Session(); 
  4. session.connect(); 
  5. let workerSessionId; 
  6. let id = 1; 
  7. function post(method, params) { 
  8.     session.post('NodeWorker.sendMessageToWorker', { 
  9.         sessionId: workerSessionId, 
  10.         message: JSON.stringify({ id: id++, method, params }) 
  11.     }); 
  12.  
  13. session.on('NodeWorker.receivedMessageFromWorker', ({ params: { message }}) => {  
  14.     const data = JSON.parse(message); 
  15.     console.log(data); 
  16. }); 
  17.  
  18. session.on('NodeWorker.attachedToWorker', (data) => { 
  19.     workerSessionId = data.params.sessionId; 
  20.     post("Runtime.evaluate", { 
  21.         includeCommandLineAPI: true,  
  22.         expression: `const inspector = process.binding('inspector'); 
  23.                     inspector.open(); 
  24.                     inspector.url(); 
  25.                     ` 
  26.         }  
  27.     ); 
  28. }); 
  29.  
  30. const worker = new Worker('./httpServer.js', {workerData: {port: 80}});  
  31. worker.on('online', () => { 
  32.     session.post("NodeWorker.enable",{waitForDebuggerOnStart: false},  (err) => {  err && console.log("NodeWorker.enable", err);}); 
  33. }); 
  34. setInterval(() => {}, 100000); 

執行上面的代碼就拿到以下輸出

  1.   id: 1, 
  2.   result: { 
  3.     result: { 
  4.       type: 'string'
  5.       value: 'ws://127.0.0.1:9229/c0ca16c8-55aa-4651-9776-fca1b27fc718' 
  6.     } 
  7.   } 

通過該地址,客戶端就可以對子線程進行調試了。上面代碼里使用process.binding而不是require加載inspector,因為剛才通過NodeWorker.enable命令為子線程創建了一個到子線程Inspector的channel,而JS模塊里判斷如果channel非空則報錯Inspector已經打開。所以這里需要繞過這個限制,直接加載C++模塊開啟WebSocket服務器。

3.子線程調試主線程

不僅可以通過主線程調試子線程,還可以通過子線程調試主線程。Node.js在子線程暴露了connectToMainThread方法連接到主線程的Inspector(只能在work_threads中使用),實現的原理和之前分析的類似,主要是子線程連接到主線程的V8 Inspector,通過和該Inspector完成對主線程的控制。看下面一個例子。主線程代碼

  1. const { Worker, workerData } = require('worker_threads');const http = require('http');const worker = new Worker('./worker.js', {workerData: {port: 80}}); 
  2.  
  3. http.createServer((_, res) => { 
  4.     res.end('main'); 
  5. }).listen(8000); 

worker.js代碼如下:

  1. const fs = require('fs'); 
  2. const { workerData: { port } } = require('worker_threads'); 
  3. const { Session } = require('inspector'); 
  4. const session = new Session(); 
  5. session.connectToMainThread(); 
  6. session.post('Profiler.enable'); 
  7. session.post('Profiler.start'); 
  8. setTimeout(() => { 
  9.     session.post('Profiler.stop', (err, data) => { 
  10.         if (data.profile) { 
  11.             fs.writeFileSync('./profile.cpuprofile', JSON.stringify(data.profile)); 
  12.         } 
  13.     }); 
  14. }, 5000) 

 

責任編輯:武曉燕 來源: 編程雜技
相關推薦

2019-03-29 16:40:02

Node.js多線程前端

2011-11-10 08:55:00

Node.js

2022-06-23 06:34:56

Node.js子線程

2015-03-10 10:59:18

Node.js開發指南基礎介紹

2021-02-01 15:42:45

Node.jsSQL應用程序

2022-01-29 22:27:31

內核子線程應用

2021-08-25 06:33:52

Node.jsVscode調試工具

2012-02-02 15:14:29

Node.js

2021-08-04 23:30:28

Node.js開發線程

2021-04-20 12:39:52

Node.js多線程多進程

2021-01-18 08:06:38

Node.js 追蹤JSON

2021-03-09 08:03:21

Node.js 線程JavaScript

2021-05-21 09:36:42

開發技能代碼

2014-08-01 09:57:52

Node.jsNode.js插件

2013-11-01 09:34:56

Node.js技術

2020-09-28 06:57:39

Node.jsGraphQLAPI

2012-10-24 14:56:30

IBMdw

2011-09-08 13:46:14

node.js

2011-11-01 10:30:36

Node.js

2011-09-02 14:47:48

Node
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 日韩久久成人 | 一区二区成人 | 91精品国产麻豆 | 毛片免费观看 | 欧美成年人网站 | 日本污视频 | 亚洲一区二区三区四区视频 | 亚洲人成网亚洲欧洲无码 | 午夜影院网站 | 亚洲国产第一页 | 欧美一区二区三区国产精品 | 视频精品一区二区三区 | 91av在线不卡 | 国产一区二区三区 | 亚洲91av| 密色视频 | 欧美特级黄色 | 亚洲精品一区二区另类图片 | 欧美日韩一区二区电影 | 国产精品亚洲片在线播放 | 国产一级一级毛片 | 福利精品 | 成人福利在线观看 | 亚洲精品在线播放 | 亚洲精品中文在线观看 | 精品毛片视频 | 午夜一区| 91不卡在线 | 中文字幕免费视频 | 欧美日韩在线一区二区 | 国产97在线看 | 99精品国产一区二区三区 | 中文字幕第一页在线 | 在线观看第一页 | 国内精品视频在线 | 国产高清一区二区三区 | 影音先锋久久 | 精品在线一区二区 | 欧美精品一区二区三区在线 | 久久一二| 夏同学福利网 |