成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Go netpoll 的實現和使用,你學會了嗎?

開發 前端
Go 目前使用的 IO 模型是 IO 多路復用,由 netpoll 模塊實現,這部分沒有特別的地方,重點是 Go 把 IO 模型和協程結合一起,大概的流程如下。

IO 模型是軟件中非常重要的部分,軟件架構也隨著 IO 模型的變化而變化,比如服務器架構經歷了一個請求一個進程+阻塞式 IO,一個請求一個線程+阻塞式 IO,IO 多路復用+非阻塞 IO,異步 IO。現代軟件中,大多數軟件使用的 IO 模型是 IO 多路復用,因為它的平臺兼容性比較好一點,但是慢慢也有不少軟件支持了異步 IO,比如 Node.js 已經支持了 io_uring。Go 目前使用的 IO 模型是 IO 多路復用,由 netpoll 模塊實現,這部分沒有特別的地方,重點是 Go 把 IO 模型和協程結合一起,大概的流程如下。

  1. 當一個協程讀且不滿足條件時,Go 會把協程記錄到 pollDesc 中,接著把它改成等待狀態并觸發重新調度。
  2. Go 會定時或按需調用 netpoll 獲取就緒的事件。
  3. 通過 netpoll 返回的事件信息找到對應的 pollDesc,并根據 pollDesc 找到對應的協程,把協程改成就緒狀態等待調度執行。

本文介紹 netpoll 的實現以及它是如何和協程結合起來的。

netpool

核心數據結構

了解 netpoll 之前,首先需要先了解 netpoll 模塊的核心數據結構 pollDesc。

type pollDesc struct {
 fd    uintptr        // constant for pollDesc usage lifetime
 fdseq atomic.Uintptr // protects against stale pollDesc
  // 記錄一些信息,比如是否有錯誤,是否超時等
 atomicInfo atomic.Uint32 // atomic pollInfo

  /*
  核心字段
    Nil:初始化狀態
    pdReady:事件就緒,
    pdWait:準備進入阻塞狀態
    其他:協程結構體的地址
  */
 rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
 wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil

 lock    mutex // protects the following fields
  // 只在某些系統使用,如 aix,記錄 pollDesc 在數組中的索引
 user    uint32    // user settable cookie
 closing bool
  // 超時管理
 rrun    bool      // whether rt is running
 wrun    bool      // whether wt is running
 rseq    uintptr   // protects from stale read timers
 rt      timer     // read deadline timer
 rd      int64     // read deadline (a nanotime in the future, -1 when expired)
 wseq    uintptr   // protects from stale write timers
 wt      timer     // write deadline timer
 wd      int64     // write deadline (a nanotime in the future, -1 when expired)
 self    *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}

一個 fd 對應一個 pollDesc,pollDesc 負責管理 fd 的讀寫事件、等待事件的超時時間以及記錄阻塞等待 fd 事件的協程。netpoll 根據 pollDesc 的信息,通過操作系統的 IO 多路復用模塊實現對事件的感知,比如 Linux 的 epoll 和 MacOS 的 kqueue 的 IO 多路復用模塊。

IO 多路復用

IO 多路復用模塊需要實現下面幾個接口。

// 初始化,比如創建一個 epoll 實例,后面通過該實例監聽 fd 的事件
func netpollinit() {
}

// 判斷 fd 是不是 netpoll 內部的 fd,IO 多路復用實例的 fd 或內部創建的 pipe fd
func netpollIsPollDescriptor(fd uintptr) bool {
 return false
}


// 注冊 fd 事件
func netpollopen(fd uintptr, pd *pollDesc) int32 {
 return 0
}

// 取消 fd 的事件
func netpollclose(fd uintptr) int32 {
 return 0
}

// 特定平臺才實現,更新 fd 事件,喚醒 IO 多路復用模塊注冊 fd 事件
func netpollarm(pd *pollDesc, mode int) {
}

// 喚醒 IO 多路復用模塊
func netpollBreak() {
}

/*
  獲取就緒的 fd
  delay = 0: 非阻塞
  delay < 0: 阻塞直到有就緒 fd,阻塞期間可以通過 netpollBreak 喚醒
  delay > 0: 帶超時的阻塞
*/
func netpoll(delay int64) (gList, int32) {
 return gList{}, 0
}

接下來看一下 MacOS 系統下 kqueue 的實現。

package runtime

var (
 kq             int32         = -1
 netpollWakeSig atomic.Uint32 // 是否已經發送了喚醒 kqueue
)

func netpollinit() {
  // 創建 IO 多路復用模塊的實例
 kq = kqueue()
  // 設置 _FD_CLOEXEC 標記,fork + execve 時自動關閉該 fd,避免子進程繼承
 closeonexec(kq)
  // 注冊 EVFILT_USER,后續可以手動喚醒 IO 多路復用模塊
 ev := keventt{
  ident:  kqIdent,
  filter: _EVFILT_USER,
  flags:  _EV_ADD,
 }
 kevent(kq, &ev, 1, nil, 0, nil)
}

func netpollopen(fd uintptr, pd *pollDesc) int32 {
 var ev [2]keventt
  // 記錄 fd
 *(*uintptr)(unsafe.Pointer(&ev[0].ident)) = fd
  // 注冊讀寫事件,工作方式是邊緣觸發
 ev[0].filter = _EVFILT_READ
 ev[0].flags = _EV_ADD | _EV_CLEAR
 ev[0].fflags = 0
 ev[0].data = 0
  // 記錄對應的 pollDesc,就緒處理時使用
  ev[0].udata = (*byte)(unsafe.Pointer(pd))
  // 復制結構體
 ev[1] = ev[0]
 ev[1].filter = _EVFILT_WRITE
 kevent(kq, &ev[0], 2, nil, 0, nil)
}

func netpollclose(fd uintptr) int32 {
 // Don't need to unregister because calling close()
 // on fd will remove any kevents that reference the descriptor.
 return 0
}

func netpollarm(pd *pollDesc, mode int) {
 throw("runtime: unused")
}

func netpollBreak() {
 // Failing to cas indicates there is an in-flight wakeup, so we're done here.
 if !netpollWakeSig.CompareAndSwap(0, 1) {
  return
 }
  // 喚醒 IO 多路復用模塊
 ev := keventt{
  ident:  kqIdent,
  filter: _EVFILT_USER,
  flags:  _EV_ENABLE,
  fflags: _NOTE_TRIGGER,
 }
 kevent(kq, &ev, 1, nil, 0, nil)
}

// delay < 0: 一直阻塞,直到有就緒事件或被主動喚醒
// delay == 0: 非阻塞
// delay > 0: 帶超時的阻塞
// 返回就緒的協程和數量
func netpoll(delay int64) (gList, int32) {
 var tp *timespec
 var ts timespec
  
 if delay < 0 {
  tp = nil
 } else if delay == 0 {
  tp = &ts
 } else {
  ts.setNsec(delay)
  if ts.tv_sec > 1e6 {
   // Darwin returns EINVAL if the sleep time is too long.
   ts.tv_sec = 1e6
  }
  tp = &ts
 }
 var events [64]keventt
retry:
  // 獲取就緒事件
 n := kevent(kq, nil, 0, &events[0], int32(len(events)), tp)
 var toRun gList
 delta := int32(0)
 for i := 0; i < int(n); i++ {
  ev := &events[i]
    // 是否是手動喚醒事件
  if isWakeup(ev) {
      // 是阻塞式調用時才處理
   if delay != 0 {
    // netpollBreak could be picked up by a nonblocking poll.
    // Only call drainWakeupEvent and reset the netpollWakeSig if blocking.
    ev := keventt{
          ident:  kqIdent,
          filter: _EVFILT_USER,
          flags:  _EV_DISABLE,
        }
        // 禁用,下次手動觸發時再開啟
        kevent(kq, &ev, 1, nil, 0, nil)
        // 恢復標記
    netpollWakeSig.Store(0)
   }
   continue
  }

  var mode int32
  switch ev.filter {
  case _EVFILT_READ:
   mode += 'r'
  case _EVFILT_WRITE:
   mode += 'w'
  }
  if mode != 0 {
   var pd *pollDesc
      // 獲取 pollDesc
   pd = (*pollDesc)(unsafe.Pointer(ev.udata))
      // 記錄錯誤
   pd.setEventErr(ev.flags == _EV_ERROR, tag)
      // 修改協程為就緒狀態,返回喚醒的協程數
   delta += netpollready(&toRun, pd, mode)
  }
 }
  // 返回就緒的協程和數量
 return toRun, delta
}

IO 多路復用的工作方式有兩種,Go 使用的是邊緣觸發。

  1. 邊緣觸發:邊緣觸發是從無到有數據時才會通知,需要通過非阻塞方式(避免讀到沒數據時阻塞)不斷讀取數據,否則不會收到新通知。
  2. 水平觸發:水平觸發就是有數據時會一直通知用戶,用戶可以選擇什么時候讀,讀取時依然需要使用非阻塞的方式。比如驚群場景下,一個連接的到來導致多個進程被喚醒收到可讀事件,但是先被調度的進程會消費這個連接,導致其他進程讀取時已經沒有連接了,如果以阻塞方式調用會引起進程阻塞。

netpoll 模塊除了提供常用的 fd 事件訂閱發布外,還有一個重要的能力就是可喚醒,比如當前定時器最早超時時間是 5s,然后 IO 多路復用模塊阻塞等待 5s,過去 1s 時突然新增了一個超時時間為 1s 的定時器,這時候需要提前喚醒 IO 多路復用模塊,從而及時處理定時器。在某些系統的中可以通過特殊的事件直接喚醒,比如上面的 EVFILT_USER 類型的事件,而比較常用實現方式是通過 pipe 創建兩個 fd,分別為讀 fd 和 寫 fd,把讀 fd 注冊到 IO 多路復用模塊中,然后通過往寫 fd 寫入數據來喚醒 IO 多路復用模塊。

// 創建兩個 fd,一個注冊到 kqueue
func addWakeupEvent(kq int32) {
 r, w, errno := nonblockingPipe()
 ev := keventt{
  filter: _EVFILT_READ,
  flags:  _EV_ADD,
 }
 *(*uintptr)(unsafe.Pointer(&ev.ident)) = uintptr(r)
 n := kevent(kq, &ev, 1, nil, 0, nil)
 netpollBreakRd = uintptr(r)
 netpollBreakWr = uintptr(w)
}

// 通過往 fd 寫喚醒 kqueue
func wakeNetpoll(_ int32) {
 for {
  var b byte
  n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
  if n == 1 || n == -_EAGAIN {
   break
  }
  if n == -_EINTR {
   continue
  }
 }
}

// 判斷是不是內部 fd,即上面創建的 fd
func isWakeup(ev *keventt) bool {
 if uintptr(ev.ident) == netpollBreakRd {
  if ev.filter == _EVFILT_READ {
   return true
  }
  println("runtime: netpoll: break fd ready for", ev.filter)
  throw("runtime: netpoll: break fd ready for something unexpected")
 }
 return false
}

// 消費喚醒時寫入的數據,一個喚醒結束
func drainWakeupEvent(_ int32) {
 var buf [16]byte
 read(int32(netpollBreakRd), noescape(unsafe.Pointer(&buf[0])), int32(len(buf)))
}

超時管理

netpoll 支持等待事件就緒的超時時間,其原理是設置一個定時器,然后超時時把協程改成就緒狀態等待調度執行。

func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
 lock(&pd.lock)
  // 舊值
 rd0, wd0 := pd.rd, pd.wd
  // 讀寫超時
 combo0 := rd0 > 0 && rd0 == wd0
 if d > 0 {
    // 絕對超時時間
  d += nanotime()
 }
  // 記錄設置了讀或寫超時
 if mode == 'r' || mode == 'r'+'w' {
  pd.rd = d
 }
 if mode == 'w' || mode == 'r'+'w' {
  pd.wd = d
 }
 pd.publishInfo()
  // 設置了超時并且讀寫一樣
 combo := pd.rd > 0 && pd.rd == pd.wd
  // 設置定時器回調,處理讀寫超時
 rtf := netpollReadDeadline
 if combo {
  rtf = netpollDeadline
 }
  // 第一次設置
 if !pd.rrun {
    // 大于 0,修改定時器時間為 rd,處理函數為 rtf
  if pd.rd > 0 {
   pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
   pd.rrun = true
  }
 } else if pd.rd != rd0 || combo != combo0 { // 新舊的超時時間不一樣
  pd.rseq++ // invalidate current timers
    // 修改
  if pd.rd > 0 {
   pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
  } else {
      // 停止定時器
   pd.rt.stop()
   pd.rrun = false
  }
 }
  // 寫也差不多,忽略
  
  // 超時時間小于 0 才會執行下面的邏輯
 delta := int32(0)
 var rg, wg *g
  // 新的超時時間小于 0,則修改 pollDesc 狀態為 Nil 并喚醒阻塞的協程
 if pd.rd < 0 {
  rg = netpollunblock(pd, 'r', false, &delta)
 }
 if pd.wd < 0 {
  wg = netpollunblock(pd, 'w', false, &delta)
 }
 unlock(&pd.lock)
  // 把協程改成就緒狀態,等待調度
 if rg != nil {
  netpollgoready(rg, 3)
 }
 if wg != nil {
  netpollgoready(wg, 3)
 }
  // 等待者數量減去 delta
 netpollAdjustWaiters(delta)
}

func netpollgoready(gp *g, traceskip int) {
 goready(gp, traceskip+1)
}

func goready(gp *g, traceskip int) {
 systemstack(func() {
  ready(gp, traceskip, true)
 })
}

func ready(gp *g, traceskip int, next bool) {
 status := readgstatus(gp)
 mp := acquirem() 
  // 改成就緒狀態
 casgstatus(gp, _Gwaiting, _Grunnable)
  // 加入 p 的隊列等待調度執行
 runqput(mp.p.ptr(), gp, next)
 wakep()
 releasem(mp)
}

poll_runtime_pollSetDeadline 用于設置一個協程超時等待某個 fd 事件,支持取消超時設置。當超時時執行 netpolldeadlineimpl 把協程改成就緒狀態等待調度執行。

func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
 lock(&pd.lock)
 delta := int32(0)
 var rg *g
  // 修改 pollDesc 的狀態
 if read {
  pd.rd = -1
  pd.publishInfo()
  rg = netpollunblock(pd, 'r', false, &delta)
 }
 // write 寫同上
 unlock(&pd.lock)
  // 把協程改成就緒狀態
 if rg != nil {
  netpollgoready(rg, 0)
 }
 netpollAdjustWaiters(delta)
}

netpool 在 Go 中的使用

IO 多路復用模塊只是提供了一些基礎的能力,那么這些能力是如何和 Go 結合起來的呢?下面從啟動一個 TCP 服務器為例,看看 netpoll 模塊是如何和 Go 結合起來的。

func Listen(network, address string) (Listener, error) {
 var lc ListenConfig
 return lc.Listen(context.Background(), network, address)
}

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
 l, err = sl.listenTCP(ctx, la)
 return l, nil
}

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
 return sl.listenTCPProto(ctx, laddr, 0)
}

func (sl *sysListener) listenTCPProto(ctx context.Context, laddr *TCPAddr, proto int) (*TCPListener, error) {
 fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, proto, "listen", ctrlCtxFn)
 return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {
 family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
 return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlCtxFn)
}

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {
 s, err := sysSocket(family, sotype, proto)
 fd, err = newFD(s, family, sotype, net)
  fd.listenStream(ctx, laddr, listenerBacklog(), ctrlCtxFn)
 return fd, nil
}

func (fd *netFD) listenStream(ctx context.Context, laddr sockaddr, backlog int, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) error {
 if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
  return os.NewSyscallError("bind", err)
 }
 if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
  return os.NewSyscallError("listen", err)
 }
 if err = fd.init(); err != nil {
  return err
 }
 return nil
}

創建一個 TCP 服務器首先創建了一個 socket,接著綁定到監聽到地址和把 socket 改成 listen 狀態,這樣就完成了服務器的啟動,啟動成功后執行 fd.init() 把 socket 對應的 fd 注冊到 IO 多路復用模塊。

func (fd *netFD) init() error {
 return fd.pfd.Init(fd.net, true)
}

func (fd *FD) Init(net string, pollable bool) error {
 err := fd.pd.init(fd)
 return err
}

func (pd *pollDesc) init(fd *FD) error {
  // 懶初始化 IO 多路復用模塊
 serverInit.Do(runtime_pollServerInit)
  // 注冊 fd 讀寫事件
 ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
  // 記錄這個上下文,對應一個 pollDesc 結構體,后續用到
 pd.runtimeCtx = ctx
 return nil
}

接著就調 netpoll 模塊的 poll_runtime_pollOpen。

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
  // 從 cache 里分配一個 pollDesc
 pd := pollcache.alloc()
 lock(&pd.lock)
 wg := pd.wg.Load()
 rg := pd.rg.Load()
 pd.fd = fd
 pd.closing = false
 pd.setEventErr(false, 0)
 pd.rseq++
 pd.rg.Store(pdNil)
 pd.rd = 0
 pd.wseq++
 pd.wg.Store(pdNil)
 pd.wd = 0
 pd.self = pd
 pd.publishInfo()
 unlock(&pd.lock)
  // 注冊 fd 
 errno := netpollopen(fd, pd)
 return pd, 0
}

這樣就完成了服務器的啟動和 socket fd 事件的注冊,接著就調 accept 等待連接的到來。

func (l *TCPListener) Accept() (Conn, error) {
 c, err := l.accept()
 return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
 fd, err := ln.fd.accept()
 return newTCPConn(fd, ...), nil
}

func (fd *netFD) accept() (netfd *netFD, err error) {
 d, rsa, errcall, err := fd.pfd.Accept()
  // 拿到 fd,創建一個 netFD
 netfd, err = newFD(d, fd.family, fd.sotype, fd.net)
  // 再次注冊到 IO 多路復用模塊,進行數據通信
 netfd.init()
 return netfd, nil
}

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
  // 加鎖
 if err := fd.readLock(); err != nil {
  return -1, nil, "", err
 }
 defer fd.readUnlock()
  // 讀判斷是否有錯誤,有則返回
 if err := fd.pd.prepareRead(fd.isFile); err != nil {
  return -1, nil, "", err
 }
  
 for {
  s, rsa, errcall, err := accept(fd.Sysfd)
  if err == nil {
   return s, rsa, "", err
  }
  switch err {
  case syscall.EAGAIN:
   if fd.pd.pollable() {
        // 阻塞等待,重新 accept
    if err = fd.pd.waitRead(fd.isFile); err == nil {
     continue
    }
   }
  return -1, nil, errcall, err
 }
}

這里有兩個地方涉及到 netpoll,分別是 prepareRead 和 accept。

func (pd *pollDesc) prepareRead(isFile bool) error {
 return pd.prepare('r', isFile)
}

func convertErr(res int, isFile bool) error {
 switch res {
 case pollNoError:
  return nil
 case pollErrClosing:
  return errClosing(isFile)
 case pollErrTimeout:
  return ErrDeadlineExceeded
 case pollErrNotPollable:
  return ErrNotPollable
 }
}

func (pd *pollDesc) prepare(mode int, isFile bool) error {
 if pd.runtimeCtx == 0 {
  return nil
 }
 res := runtime_pollReset(pd.runtimeCtx, mode)
 return convertErr(res, isFile)
}

func poll_runtime_pollReset(pd *pollDesc, mode int) int {
  // 判斷是否有錯誤
 errcode := netpollcheckerr(pd, int32(mode))
  // 有則返回
 if errcode != pollNoError {
  return errcode
 }
 if mode == 'r' {
  pd.rg.Store(pdNil)
 } else if mode == 'w' {
  pd.wg.Store(pdNil)
 }
 return pollNoError
}

func netpollcheckerr(pd *pollDesc, mode int32) int {
 info := pd.info()
 if info.closing() {
  return pollErrClosing
 }
  // 超時
 if (mode == 'r' && info.expiredReadDeadline()) || (mode == 'w' && info.expiredWriteDeadline()) {
  return pollErrTimeout
 }
 if mode == 'r' && info.eventErr() {
  return pollErrNotPollable
 }
 return pollNoError
}

如果沒有發生錯誤則繼續調 accept,當沒有連接時,accept 會返回 EAGAIN(非阻塞調用條件不滿足時的錯誤碼),接著執行 waitRead。

if fd.pd.pollable() {
  if err = fd.pd.waitRead(fd.isFile); err == nil {
    continue
  }
}

func (pd *pollDesc) waitRead(isFile bool) error {
 return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
 if pd.runtimeCtx == 0 {
  return errors.New("waiting for unsupported file type")
 }
 res := runtime_pollWait(pd.runtimeCtx, mode)
 return convertErr(res, isFile)
}

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
 errcode := netpollcheckerr(pd, int32(mode))
 if errcode != pollNoError {
  return errcode
 }
  // 阻塞當前協程
 for !netpollblock(pd, int32(mode), false) {
  errcode = netpollcheckerr(pd, int32(mode))
  if errcode != pollNoError {
   return errcode
  }
 }
 return pollNoError
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
 gpp := &pd.rg
 if mode == 'w' {
  gpp = &pd.wg
 }

 for {
  // 執行到這時,可能已經就緒了,則返回,因為 Go 是多線程的,一個線程讀的時候,另一個線程可能寫
  if gpp.CompareAndSwap(pdReady, pdNil) {
   return true
  }
    // 一般情況,設置為 pdWait 狀態,表示準備進入阻塞狀態
  if gpp.CompareAndSwap(pdNil, pdWait) {
   break
  }
 }
  // waitio 是 false,但一般沒有 error,執行 gopark 阻塞協程
 if waitio || netpollcheckerr(pd, mode) == pollNoError {
  gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)
 }
  // 被喚醒,重置為 pdNil
 old := gpp.Swap(pdNil)
 return old == pdReady
}

netpollblock 最終調 gopark 阻塞協程。

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, ...) {
 mp.waitlock = lock
 mp.waitunlockf = unlockf
 releasem(mp)
 mcall(park_m)
}

func park_m(gp *g) {
 mp := getg().m
  // 修改協程為 _Gwaiting 狀態
 casgstatus(gp, _Grunning, _Gwaiting)
  // 執行 waitunlockf,即 netpollblockcommit
 if fn := mp.waitunlockf; fn != nil {
  ok := fn(gp, mp.waitlock)
  mp.waitunlockf = nil
  mp.waitlock = nil
    // 如果返回 false,則說明就緒了,修改協程為 _Grunnable,繼續執行它
  if !ok {
   casgstatus(gp, _Gwaiting, _Grunnable)
   execute(gp, true) // Schedule it back, never returns.
  }
 }
  // 否則重新調度其他協程
 schedule()
}

正常情況下,gopark 設置協程為 Gwaiting 狀態,然后重新調度,相當于該協程就暫停執行了。但是從開始執行 gopark 到現在可能情況已經發生了變化,所以 Go 還會執行一下鉤子函數 waitunlockf,這里是 netpollblockcommit。

func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
  // 如果當前還是 pdWait 狀態則修改 pollDesc 的 rg 或 wg 字段為當前協程結構體
 r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
  // 修改成功則等待者加 1
 if r {
  netpollAdjustWaiters(1)
 }
  // 返回是否修改成功
 return r
}

正常情況下,這個過程是 pollDesc 的 rg 或 wg 字段從 Nil 到 pdWait 再到協程結構體地址,除非執行過程中事件已經就緒。從前面的分析可以看到啟動一個服務器并調用 Accept 后為什么協程會阻塞了,從中也可以看到 Go 中以同步方式寫異步代碼的底層實現。

那么協程阻塞后,什么時候才會喚醒呢?又是怎么喚醒的?Go 會在某些時機調用 IO 多路復用模塊的 netpoll 獲取就緒的事件,從而喚醒關聯的協程。這種時機有幾個,比如在 sysmon 線程中定時獲取,或者調度時獲取,我們搜索 netpoll 函數的調用就可以看到,下面以 sysmon 線程的處理為例。

if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
  // 更新上次 poll 的時間
  sched.lastpoll.CompareAndSwap(lastpoll, now)
  // 通過 IO 多路復用模塊獲取就緒的事件(所以關聯的 g)列表
  list, delta := netpoll(0) // non-blocking - returns list of goroutines
  if !list.empty() {
    // 把g 改成就緒并放入隊列等待調度
    injectglist(&list)
    incidlelocked(1)
    // 等待者減 delta(負數)
    netpollAdjustWaiters(delta)
  }
}

前面介紹過 netpoll 最終會調 netpollready。

func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
  // 喚醒的協程個數
 delta := int32(0)
 var rg, wg *g
  // 喚醒對應的等待者,讀或寫
 if mode == 'r' || mode == 'r'+'w' {
  rg = netpollunblock(pd, 'r', true, &delta)
 }
 if mode == 'w' || mode == 'r'+'w' {
  wg = netpollunblock(pd, 'w', true, &delta)
 }
  // 喚醒成功插入隊列
 if rg != nil {
  toRun.push(rg)
 }
 if wg != nil {
  toRun.push(wg)
 }
 return delta
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
 gpp := &pd.rg
 if mode == 'w' {
  gpp = &pd.wg
 }

 for {
    // 當前狀態
  old := gpp.Load()
    // 之前就是 pdReady 則不需要再處理,等待上層消費后改成 Nil
  if old == pdReady {
   return nil
  }
    // 新狀態
  new := pdNil
    // 這里是 true,設置超時并且超時時 ioready 是 false
  if ioready {
   new = pdReady
  }
    // 改成 pdReady 狀態
  if gpp.CompareAndSwap(old, new) {
      // 當前狀態如果是 pdWait 則改成 old,即不需要處理該協程,
      // 前面介紹阻塞協程時講到,pdWait 是準備進入阻塞狀態,然后在 netpollblockcommit 會再進一步判斷,如果這里改成 pdReady 狀態,則協程不會阻塞。
   if old == pdWait {
    old = pdNil
   } else if old != pdNil { // old 是等待讀寫事件的協程
    *delta -= 1 // 等待者減一
   }
      // 返回待喚醒的協程
   return (*g)(unsafe.Pointer(old))
  }
 }
}

netpollunblock 第一種處理場景是事件就緒時把狀態改成 pdReady,第二種處理場景是事件沒有就緒,但設置了超時時間并且已經超時,則把狀態改成 Nil,把該協程改成就緒狀態,然后返回阻塞在該事件的協程并把它加入到隊列中等待調度執行。

func injectglist(glist *gList) {
 head := glist.head.ptr()
 var tail *g
 qsize := 0
  // 改成就緒狀態
 for gp := head; gp != nil; gp = gp.schedlink.ptr() {
  tail = gp
  qsize++
  casgstatus(gp, _Gwaiting, _Grunnable)
 }

 // Turn the gList into a gQueue.
 var q gQueue
 q.head.set(head)
 q.tail.set(tail)
 *glist = gList{}
  // 啟動新的線程處理
 startIdle := func(n int) {
  for i := 0; i < n; i++ {
   mp := acquirem() // See comment in startm.
   lock(&sched.lock)

   pp, _ := pidlegetSpinning(0)
   if pp == nil {
    unlock(&sched.lock)
    releasem(mp)
    break
   }

   startm(pp, false, true)
   unlock(&sched.lock)
   releasem(mp)
  }
 }

 pp := getg().m.p.ptr()
 if pp == nil {
  lock(&sched.lock)
    // 放入全局隊列
  globrunqputbatch(&q, int32(qsize))
  unlock(&sched.lock)
  startIdle(qsize)
  return
 }
  // 放到 p 本地隊列
 if !q.empty() {
  runqputbatch(pp, &q, qsize)
 }
}

協程和 IO 多路復用的結合思路也可以參考 Russ Cox 寫的 libtask 和這個分析文章 https://zhuanlan.zhihu.com/p/360477474。

責任編輯:武曉燕 來源: 編程雜技
相關推薦

2024-02-02 11:03:11

React數據Ref

2024-02-21 19:02:05

Go模板化方式

2022-01-17 07:50:37

Go代碼規范

2022-08-29 08:05:44

Go類型JSON

2022-11-08 08:45:30

Prettier代碼格式化工具

2022-03-05 23:29:18

LibuvwatchdogNode.js

2024-07-29 10:35:44

KubernetesCSI存儲

2024-08-19 10:24:14

2022-06-16 07:50:35

數據結構鏈表

2022-11-21 16:57:20

2023-12-27 07:31:45

json產品場景

2023-10-30 07:05:31

2024-01-18 09:38:00

Java注解JDK5

2024-04-28 08:24:27

分布式架構Istio

2022-07-08 09:27:48

CSSIFC模型

2024-01-19 08:25:38

死鎖Java通信

2024-02-04 00:00:00

Effect數據組件

2023-07-26 13:11:21

ChatGPT平臺工具

2023-01-10 08:43:15

定義DDD架構

2024-03-18 08:06:59

JavaGo開發
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲精品一二三 | 欧美五月婷婷 | 精品美女久久久久久免费 | 国产精品视频久久久久 | 怡红院怡春院一级毛片 | a久久 | 毛片区| 国产精品视频网 | 综合激情久久 | 久久国产精品-久久精品 | 成人在线一区二区三区 | 国产精品免费一区二区三区 | 久久五月婷 | 日韩在线免费播放 | 在线观看视频你懂得 | 国产一区二区三区在线 | 九九精品在线 | 国产精品国产 | 午夜免费精品视频 | 一区二区三区中文 | 在线精品亚洲欧美日韩国产 | 亚洲精品一区二三区不卡 | 日韩免费视频一区二区 | 亚洲一区导航 | 久久99精品久久久久久青青日本 | 91在线看| 欧美不卡在线 | 久久99蜜桃综合影院免费观看 | 久久免费视频观看 | 亚洲欧美成人影院 | japan25hdxxxx日本| 亚洲精品日韩在线 | 蜜桃视频在线观看免费视频网站www | 青青草视频免费观看 | 懂色av蜜桃av | 久久伊人在 | jizz亚洲人 | 国产一区二区免费在线 | 日本a v在线播放 | 一区二区免费 | 中午字幕在线观看 |