成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

格物致知-記一次Nodejs源碼分析的經歷

開發 前端
昨天分析http模塊相關的代碼時,遇到了一個晦澀的邏輯,看了想,想了看還是沒看懂。百度、谷歌了很多帖子也沒看到合適的答案。

 [[353532]]

本文轉載自微信公眾號「編程雜技」,作者theanarkh。轉載本文請聯系編程雜技公眾號。 

昨天分析http模塊相關的代碼時,遇到了一個晦澀的邏輯,看了想,想了看還是沒看懂。百度、谷歌了很多帖子也沒看到合適的答案。突然看到一個題目有點相識的搜索結果,點進去是Stack Overflow上的帖子,但是已經404,最后還是通過快照功能成功看到內容。這個帖子[1]和我的疑惑不相關,但是突然給了我一些靈感。沿著這個靈感去看了代碼,最后下載nodejs源碼,加了一些log,編譯了一夜(太久了,等不及編譯完成,得睡覺了)。上午起來驗證,終于揭開了疑惑。這個問題源于下面這段代碼。

  1. function connectionListenerInternal(server, socket) { 
  2.   socket.server = server; 
  3.   // 分配一個http解析器 
  4.   const parser = parsers.alloc(); 
  5.   // 解析請求報文 
  6.   parser.initialize( 
  7.     HTTPParser.REQUEST, 
  8.     new HTTPServerAsyncResource('HTTPINCOMINGMESSAGE', socket), 
  9.     server.maxHeaderSize || 0, 
  10.     server.insecureHTTPParser === undefined ? 
  11.       isLenient() : server.insecureHTTPParser, 
  12.   ); 
  13.   parser.socket = socket; 
  14.   // 開始解析頭部的開始時間 
  15.   parser.parsingHeadersStart = nowDate(); 
  16.   socket.parser = parser; 
  17.   const state = { 
  18.     onData: null
  19.     onEnd: null
  20.     onClose: null
  21.     onDrain: null
  22.     // 同一tcp連接上,請求和響應的的隊列 
  23.     outgoing: [], 
  24.     incoming: [], 
  25.     outgoingData: 0, 
  26.     keepAliveTimeoutSet: false 
  27.   }; 
  28.   state.onData = socketOnData.bind(undefined, server, socket, parser, state); 
  29.   socket.on('data', state.onData); 
  30.  
  31.   if (socket._handle && socket._handle.isStreamBase && 
  32.       !socket._handle._consumed) { 
  33.     parser._consumed = true
  34.     socket._handle._consumed = true
  35.     parser.consume(socket._handle); 
  36.   } 
  37.   parser[kOnExecute] = 
  38.     onParserExecute.bind(undefined, server, socket, parser, state); 
  39.  
  40.   socket._paused = false

這段代碼看起來很多,這是啟動http服務器后,有新的tcp連接建立時執行的回調。問題在于tcp上有數據到來時,是怎么處理的,上面代碼中nodejs監聽了socket的data事件,同時注冊了鉤子kOnExecute。data事件我們都知道是流上有數據到來時觸發的事件。我們看一下socketOnData做了什么事情。

  1. function socketOnData(server, socket, parser, state, d) { 
  2.   // 交給http解析器處理,返回已經解析的字節數 
  3.   const ret = parser.execute(d); 
  4.   onParserExecuteCommon(server, socket, parser, state, ret, d); 

這看起來沒有問題,socket上有數據,然后交給http解析器處理。幾乎所有http模塊源碼解析的文章也是這樣分析的,我第一反應也覺得這個沒問題,那kOnExecute是做什么的呢?kOnExecute鉤子函數的值是onParserExecute,這個看起來也是解析tcp上的數據的,看起來和onSocketData是一樣的作用,難道tcp上的數據有兩個消費者?我們看一下kOnExecute什么時候被回調的。

  1. void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override { 
  2.  
  3.     Local<Value> ret = Execute(buf.base, nread); 
  4.     Local<Value> cb = 
  5.         object()->Get(env()->context(), kOnExecute).ToLocalChecked(); 
  6.     MakeCallback(cb.As<Function>(), 1, &ret); 
  7.   } 

在node_http_parser.cc中的OnStreamRead中被回調,那么OnStreamRead又是什么時候被回調的呢?OnStreamRead是nodejs中c++層流操作的通用函數,當流有數據的時候就會執行該回調。而且OnStreamRead中也會把數據交給http解析器解析。這看起來真的有兩個消費者?這就很奇怪,為什么一份數據會交給http解析器處理兩次?這時候我的想法就是這兩個地方肯定是互斥的。但是我一直沒有找到是哪里做了處理。最后在connectionListenerInternal的一段代碼中找到了答案。

  1. if (socket._handle && socket._handle.isStreamBase && !socket._handle._consumed) { 
  2.     parser._consumed = true
  3.     socket._handle._consumed = true
  4.     parser.consume(socket._handle); 
  5.   } 

因為tcp流是繼承StreamBase類的,所以if成立(后面會具體分析)。我們看一下consume的實現。

  1. static void Consume(const FunctionCallbackInfo<Value>& args) { 
  2.     Parser* parser; 
  3.     ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder()); 
  4.     CHECK(args[0]->IsObject()); 
  5.     StreamBase* stream = StreamBase::FromObjject(args[0].As<Object>()); 
  6.     CHECK_NOT_NULL(stream); 
  7.     stream->PushStreamListener(parser); 
  8.   } 

http解析器把自己注冊為tcp stream的一個listener。這里涉及到了c++層對流的設計。我們從頭開始。看一下PushStreamListener做了什么事情。c++層中,流的操作由類StreamResource進行了封裝。

  1. class StreamResource { 
  2.  public
  3.   virtual ~StreamResource(); 
  4.   virtual int ReadStart() = 0; 
  5.   virtual int ReadStop() = 0; 
  6.   virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; 
  7.   virtual int DoTryWrite(uv_buf_t** bufs, size_t* count); 
  8.   virtual int DoWrite(WriteWrap* w, 
  9.                       uv_buf_t* bufs, 
  10.                       size_t count
  11.                       uv_stream_t* send_handle) = 0; 
  12.   void PushStreamListener(StreamListener* listener); 
  13.   void RemoveStreamListener(StreamListener* listener); 
  14.  
  15.  protected: 
  16.   uv_buf_t EmitAlloc(size_t suggested_size); 
  17.   void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0)); 
  18.  
  19.   StreamListener* listener_ = nullptr; 
  20.   uint64_t bytes_read_ = 0; 
  21.   uint64_t bytes_written_ = 0; 
  22.   friend class StreamListener; 
  23. }; 

我們看到StreamResource是一個基類,定義了操作流的公共方法。其中有一個成員是StreamListener類的實例。我們看看StreamListener的實現。

  1. class StreamListener { 
  2.  public
  3.   virtual ~StreamListener(); 
  4.   virtual uv_buf_t OnStreamAlloc(size_t suggested_size) = 0; 
  5.   virtual void OnStreamRead(ssize_t nread, 
  6.                             const uv_buf_t& buf) = 0; 
  7.   virtual void OnStreamDestroy() {} 
  8.   inline StreamResource* stream() { return stream_; } 
  9.  
  10.  protected: 
  11.   void PassReadErrorToPreviousListener(ssize_t nread); 
  12.  
  13.   StreamResource* stream_ = nullptr; 
  14.   StreamListener* previous_listener_ = nullptr; 
  15.   friend class StreamResource; 
  16. }; 

StreamListener是一個負責消費流數據的類。StreamListener 和StreamResource類的關系如下。

null我們看到一個流可以注冊多個listener,多個listener形成一個鏈表。接著我們看一下創建一個c++層的tcp對象是怎樣的。下面是TCPWrap的繼承關系。

  1. class TCPWrap : public ConnectionWrap<TCPWrap, uv_tcp_t>{} 
  2. class ConnectionWrap : public LibuvStreamWrap{} 
  3. class LibuvStreamWrap : public HandleWrap, public StreamBase{} 
  4. class StreamBase : public StreamResource {} 

我們看到tcp流是繼承于StreamResource的。新建一個tcp的c++的對象時(tcp_wrap.cc),會不斷往上調用父類的構造函數,其中在StreamBase中有一個關鍵的操作。

  1. inline StreamBase::StreamBase(Environment* env) : env_(env) { 
  2.   PushStreamListener(&default_listener_); 
  3.  
  4. EmitToJSStreamListener default_listener_; 

StreamBase會默認給流注冊一個listener。我們看下EmitToJSStreamListener 具體的定義。

  1. class ReportWritesToJSStreamListener : public StreamListener { 
  2.  public
  3.   void OnStreamAfterWrite(WriteWrap* w, int status) override; 
  4.   void OnStreamAfterShutdown(ShutdownWrap* w, int status) override; 
  5.  
  6.  private: 
  7.   void OnStreamAfterReqFinished(StreamReq* req_wrap, int status); 
  8. }; 
  9.  
  10. class EmitToJSStreamListener : public ReportWritesToJSStreamListener { 
  11.  public
  12.   uv_buf_t OnStreamAlloc(size_t suggested_size) override; 
  13.   void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; 
  14. }; 

EmitToJSStreamListener繼承StreamListener ,定義了分配內存和讀取接收數據的函數。接著我們看一下PushStreamListener做了什么事情。

  1. inline void StreamResource::PushStreamListener(StreamListener* listener) { 
  2.   // 頭插法  
  3.   listener->previous_listener_ = listener_; 
  4.   listener->stream_ = this; 
  5.   listener_ = listener; 

PushStreamListener就是構造出上圖的結構。對應到創建一個c++層的tcp對象中,如下圖。

然后我們看一下對于流來說,讀取數據的整個鏈路。首先是js層調用readStart

  1. function tryReadStart(socket) { 
  2.   socket._handle.reading = true
  3.   const err = socket._handle.readStart(); 
  4.   if (err) 
  5.     socket.destroy(errnoException(err, 'read')); 
  6.  
  7. // 注冊等待讀事件 
  8. Socket.prototype._read = function(n) { 
  9.   tryReadStart(this); 
  10. }; 

我們看看readStart

  1. int LibuvStreamWrap::ReadStart() { 
  2.   return uv_read_start(stream(), [](uv_handle_t* handle, 
  3.                                     size_t suggested_size, 
  4.                                     uv_buf_t* buf) { 
  5.     static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf); 
  6.   }, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { 
  7.     static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf); 
  8.   }); 

ReadStart調用libuv的uv_read_start注冊等待可讀事件,并且注冊了兩個回調函數OnUvAlloc和OnUvRead。

  1. void LibuvStreamWrap::OnUvRead(ssize_t nread, const uv_buf_t* buf) { 
  2.    EmitRead(nread, *buf); 
  3.  
  4. inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) { 
  5.   // bytes_read_表示已讀的字節數 
  6.   if (nread > 0) 
  7.     bytes_read_ += static_cast<uint64_t>(nread); 
  8.   listener_->OnStreamRead(nread, buf); 

通過層層調用最后會調用listener_的OnStreamRead。我們看看tcp的OnStreamRead

  1. void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { 
  2.   StreamBase* stream = static_cast<StreamBase*>(stream_); 
  3.   Environment* env = stream->stream_env(); 
  4.   HandleScope handle_scope(env->isolate()); 
  5.   Context::Scope context_scope(env->context()); 
  6.   AllocatedBuffer buf(env, buf_); 
  7.   stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer()); 

繼續回調CallJSOnreadMethod

  1. MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread, 
  2.                                                  Local<ArrayBuffer> ab, 
  3.                                                  size_t offset, 
  4.                                                  StreamBaseJSChecks checks) { 
  5.   Environment* env = env_; 
  6.   // ... 
  7.   AsyncWrap* wrap = GetAsyncWrap(); 
  8.   CHECK_NOT_NULL(wrap); 
  9.   Local<Value> onread = wrap->object()->GetInternalField(kOnReadFunctionField); 
  10.   CHECK(onread->IsFunction()); 
  11.   return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv); 

CallJSOnreadMethod會回調js層的onread回調函數。onread會把數據push到流中,然后觸發data事件。這是tcp里默認的數據讀取過程。而文章開頭講到的parser.consume打破了這個默認行為。stream->PushStreamListener(parser);修改了tcp流的listener鏈,http parser把自己作為數據的接收者。所以這時候tcp流上的數據是直接由node_http_parser.cc的OnStreamRead消費的。而不是觸發socket的data事件,最后通過在nodejs源碼中加log,重新編譯驗證的確如文中所述。最后提一個這個過程中還有一個關鍵的地方是調用consume函數的前提是socket._handle.isStreamBase為true。isStreamBase是在StreamBase::AddMethods中定義為true的,而tcp對象創建的過程中,調用了這個方法,所以tcp的isStreamBase是true,才會執行consume,才會執行kOnExecute回調。

References

[1] 帖子: http://cache.baiducontent.com/c?m=rZy2XovtTdJJuXWLM-s8wgpaz8NFubewtolyiC19iAKFJrbGdx2EFnArzlAIDisNP70zWWsCPv-4jwMHTGNcLaUsMVr-lvLqYmmHD-w_fUYz6a5K6OQRC9kZmLYN5RXsb34OdINb8xHIJsdyClaEWOtCGKMQ2saYK7ed7OG8v0E1pRKR4K46phl0rCBrw6amXE3QpPo62dMhvu_VASYYqq&p=cb77c64ad49111a05bee9e264d5693&newp=882a9646dc9712a05ab7cc374f0ccc231615d70e3ad3d501298ffe0cc4241a1a1a3aecbf2d29170ed6c27f630bae4856ecf630723d0834f1f689df08d2ecce7e7b&s=cfcd208495d565ef&user=baidu&fm=sc&query=onParserExecute&qid=869f73bc002e44f5&p1=11

 

責任編輯:武曉燕 來源: 編程雜技
相關推薦

2013-04-01 10:27:37

程序員失業

2013-01-17 10:31:13

JavaScriptWeb開發firebug

2021-01-22 05:35:19

Lvm模塊Multipath

2012-08-28 09:21:59

Ajax查錯經歷Web

2018-12-06 16:25:39

數據庫服務器線程池

2020-02-10 10:15:31

技術研發指標

2021-01-08 13:52:15

Consul微服務服務注冊中心

2023-03-31 09:22:40

Hi3861芯片Flash

2021-12-06 19:29:17

LRU內存算法

2023-03-29 09:36:32

2025-03-17 10:01:07

2014-08-06 11:24:24

Elasticsear劫持掛馬

2011-08-08 13:31:44

數據分析數據倉庫

2021-04-13 18:17:48

Hbase集群配置

2011-04-13 09:21:30

死鎖SQL Server

2016-12-06 09:34:33

線程框架經歷

2023-06-26 00:12:46

2024-12-27 13:31:18

.NETdump調試

2023-04-06 10:52:18

2024-03-28 12:56:36

點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 日韩免费成人av | 日韩精品视频网 | 欧美精品a∨在线观看不卡 国产精品久久国产精品 | 久久久久久久一区 | 成在线人视频免费视频 | 日本一区二区三区四区 | 91欧美精品成人综合在线观看 | 久在线观看 | www国产精品| 一区在线播放 | 中文字幕av网 | 特级做a爱片免费69 精品国产鲁一鲁一区二区张丽 | 国产激情 | 成人午夜在线观看 | 四虎在线观看 | 一区网站 | 日韩毛片免费看 | 日韩精品在线播放 | 欧美激情一区二区 | www.狠狠干| 国产99精品 | 韩日在线观看视频 | 精品国产一区一区二区三亚瑟 | 精品日韩一区二区三区av动图 | 一区二区三区久久久 | 日韩中文字幕在线 | 久久精品国产亚洲一区二区三区 | 99精品一级欧美片免费播放 | 国产精品视频网站 | 精品自拍视频 | 成人免费网站视频 | 久久成人一区 | 久久久久久国产精品免费免费 | 91亚洲精品久久久电影 | 99pao成人国产永久免费视频 | 91大神在线资源观看无广告 | 欧美日韩国产综合在线 | 精品中文字幕在线观看 | 欧美三级久久久 | 成人精品鲁一区一区二区 | 日韩中文字幕网 |