Go netpoll 的實現和使用,你學會了嗎?
IO 模型是軟件中非常重要的部分,軟件架構也隨著 IO 模型的變化而變化,比如服務器架構經歷了一個請求一個進程+阻塞式 IO,一個請求一個線程+阻塞式 IO,IO 多路復用+非阻塞 IO,異步 IO。現代軟件中,大多數軟件使用的 IO 模型是 IO 多路復用,因為它的平臺兼容性比較好一點,但是慢慢也有不少軟件支持了異步 IO,比如 Node.js 已經支持了 io_uring。Go 目前使用的 IO 模型是 IO 多路復用,由 netpoll 模塊實現,這部分沒有特別的地方,重點是 Go 把 IO 模型和協程結合一起,大概的流程如下。
- 當一個協程讀且不滿足條件時,Go 會把協程記錄到 pollDesc 中,接著把它改成等待狀態并觸發重新調度。
- Go 會定時或按需調用 netpoll 獲取就緒的事件。
- 通過 netpoll 返回的事件信息找到對應的 pollDesc,并根據 pollDesc 找到對應的協程,把協程改成就緒狀態等待調度執行。
本文介紹 netpoll 的實現以及它是如何和協程結合起來的。
netpool
核心數據結構
了解 netpoll 之前,首先需要先了解 netpoll 模塊的核心數據結構 pollDesc。
type pollDesc struct {
fd uintptr // constant for pollDesc usage lifetime
fdseq atomic.Uintptr // protects against stale pollDesc
// 記錄一些信息,比如是否有錯誤,是否超時等
atomicInfo atomic.Uint32 // atomic pollInfo
/*
核心字段
Nil:初始化狀態
pdReady:事件就緒,
pdWait:準備進入阻塞狀態
其他:協程結構體的地址
*/
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil
lock mutex // protects the following fields
// 只在某些系統使用,如 aix,記錄 pollDesc 在數組中的索引
user uint32 // user settable cookie
closing bool
// 超時管理
rrun bool // whether rt is running
wrun bool // whether wt is running
rseq uintptr // protects from stale read timers
rt timer // read deadline timer
rd int64 // read deadline (a nanotime in the future, -1 when expired)
wseq uintptr // protects from stale write timers
wt timer // write deadline timer
wd int64 // write deadline (a nanotime in the future, -1 when expired)
self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}
一個 fd 對應一個 pollDesc,pollDesc 負責管理 fd 的讀寫事件、等待事件的超時時間以及記錄阻塞等待 fd 事件的協程。netpoll 根據 pollDesc 的信息,通過操作系統的 IO 多路復用模塊實現對事件的感知,比如 Linux 的 epoll 和 MacOS 的 kqueue 的 IO 多路復用模塊。
IO 多路復用
IO 多路復用模塊需要實現下面幾個接口。
// 初始化,比如創建一個 epoll 實例,后面通過該實例監聽 fd 的事件
func netpollinit() {
}
// 判斷 fd 是不是 netpoll 內部的 fd,IO 多路復用實例的 fd 或內部創建的 pipe fd
func netpollIsPollDescriptor(fd uintptr) bool {
return false
}
// 注冊 fd 事件
func netpollopen(fd uintptr, pd *pollDesc) int32 {
return 0
}
// 取消 fd 的事件
func netpollclose(fd uintptr) int32 {
return 0
}
// 特定平臺才實現,更新 fd 事件,喚醒 IO 多路復用模塊注冊 fd 事件
func netpollarm(pd *pollDesc, mode int) {
}
// 喚醒 IO 多路復用模塊
func netpollBreak() {
}
/*
獲取就緒的 fd
delay = 0: 非阻塞
delay < 0: 阻塞直到有就緒 fd,阻塞期間可以通過 netpollBreak 喚醒
delay > 0: 帶超時的阻塞
*/
func netpoll(delay int64) (gList, int32) {
return gList{}, 0
}
接下來看一下 MacOS 系統下 kqueue 的實現。
package runtime
var (
kq int32 = -1
netpollWakeSig atomic.Uint32 // 是否已經發送了喚醒 kqueue
)
func netpollinit() {
// 創建 IO 多路復用模塊的實例
kq = kqueue()
// 設置 _FD_CLOEXEC 標記,fork + execve 時自動關閉該 fd,避免子進程繼承
closeonexec(kq)
// 注冊 EVFILT_USER,后續可以手動喚醒 IO 多路復用模塊
ev := keventt{
ident: kqIdent,
filter: _EVFILT_USER,
flags: _EV_ADD,
}
kevent(kq, &ev, 1, nil, 0, nil)
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev [2]keventt
// 記錄 fd
*(*uintptr)(unsafe.Pointer(&ev[0].ident)) = fd
// 注冊讀寫事件,工作方式是邊緣觸發
ev[0].filter = _EVFILT_READ
ev[0].flags = _EV_ADD | _EV_CLEAR
ev[0].fflags = 0
ev[0].data = 0
// 記錄對應的 pollDesc,就緒處理時使用
ev[0].udata = (*byte)(unsafe.Pointer(pd))
// 復制結構體
ev[1] = ev[0]
ev[1].filter = _EVFILT_WRITE
kevent(kq, &ev[0], 2, nil, 0, nil)
}
func netpollclose(fd uintptr) int32 {
// Don't need to unregister because calling close()
// on fd will remove any kevents that reference the descriptor.
return 0
}
func netpollarm(pd *pollDesc, mode int) {
throw("runtime: unused")
}
func netpollBreak() {
// Failing to cas indicates there is an in-flight wakeup, so we're done here.
if !netpollWakeSig.CompareAndSwap(0, 1) {
return
}
// 喚醒 IO 多路復用模塊
ev := keventt{
ident: kqIdent,
filter: _EVFILT_USER,
flags: _EV_ENABLE,
fflags: _NOTE_TRIGGER,
}
kevent(kq, &ev, 1, nil, 0, nil)
}
// delay < 0: 一直阻塞,直到有就緒事件或被主動喚醒
// delay == 0: 非阻塞
// delay > 0: 帶超時的阻塞
// 返回就緒的協程和數量
func netpoll(delay int64) (gList, int32) {
var tp *timespec
var ts timespec
if delay < 0 {
tp = nil
} else if delay == 0 {
tp = &ts
} else {
ts.setNsec(delay)
if ts.tv_sec > 1e6 {
// Darwin returns EINVAL if the sleep time is too long.
ts.tv_sec = 1e6
}
tp = &ts
}
var events [64]keventt
retry:
// 獲取就緒事件
n := kevent(kq, nil, 0, &events[0], int32(len(events)), tp)
var toRun gList
delta := int32(0)
for i := 0; i < int(n); i++ {
ev := &events[i]
// 是否是手動喚醒事件
if isWakeup(ev) {
// 是阻塞式調用時才處理
if delay != 0 {
// netpollBreak could be picked up by a nonblocking poll.
// Only call drainWakeupEvent and reset the netpollWakeSig if blocking.
ev := keventt{
ident: kqIdent,
filter: _EVFILT_USER,
flags: _EV_DISABLE,
}
// 禁用,下次手動觸發時再開啟
kevent(kq, &ev, 1, nil, 0, nil)
// 恢復標記
netpollWakeSig.Store(0)
}
continue
}
var mode int32
switch ev.filter {
case _EVFILT_READ:
mode += 'r'
case _EVFILT_WRITE:
mode += 'w'
}
if mode != 0 {
var pd *pollDesc
// 獲取 pollDesc
pd = (*pollDesc)(unsafe.Pointer(ev.udata))
// 記錄錯誤
pd.setEventErr(ev.flags == _EV_ERROR, tag)
// 修改協程為就緒狀態,返回喚醒的協程數
delta += netpollready(&toRun, pd, mode)
}
}
// 返回就緒的協程和數量
return toRun, delta
}
IO 多路復用的工作方式有兩種,Go 使用的是邊緣觸發。
- 邊緣觸發:邊緣觸發是從無到有數據時才會通知,需要通過非阻塞方式(避免讀到沒數據時阻塞)不斷讀取數據,否則不會收到新通知。
- 水平觸發:水平觸發就是有數據時會一直通知用戶,用戶可以選擇什么時候讀,讀取時依然需要使用非阻塞的方式。比如驚群場景下,一個連接的到來導致多個進程被喚醒收到可讀事件,但是先被調度的進程會消費這個連接,導致其他進程讀取時已經沒有連接了,如果以阻塞方式調用會引起進程阻塞。
netpoll 模塊除了提供常用的 fd 事件訂閱發布外,還有一個重要的能力就是可喚醒,比如當前定時器最早超時時間是 5s,然后 IO 多路復用模塊阻塞等待 5s,過去 1s 時突然新增了一個超時時間為 1s 的定時器,這時候需要提前喚醒 IO 多路復用模塊,從而及時處理定時器。在某些系統的中可以通過特殊的事件直接喚醒,比如上面的 EVFILT_USER 類型的事件,而比較常用實現方式是通過 pipe 創建兩個 fd,分別為讀 fd 和 寫 fd,把讀 fd 注冊到 IO 多路復用模塊中,然后通過往寫 fd 寫入數據來喚醒 IO 多路復用模塊。
// 創建兩個 fd,一個注冊到 kqueue
func addWakeupEvent(kq int32) {
r, w, errno := nonblockingPipe()
ev := keventt{
filter: _EVFILT_READ,
flags: _EV_ADD,
}
*(*uintptr)(unsafe.Pointer(&ev.ident)) = uintptr(r)
n := kevent(kq, &ev, 1, nil, 0, nil)
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
// 通過往 fd 寫喚醒 kqueue
func wakeNetpoll(_ int32) {
for {
var b byte
n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
if n == 1 || n == -_EAGAIN {
break
}
if n == -_EINTR {
continue
}
}
}
// 判斷是不是內部 fd,即上面創建的 fd
func isWakeup(ev *keventt) bool {
if uintptr(ev.ident) == netpollBreakRd {
if ev.filter == _EVFILT_READ {
return true
}
println("runtime: netpoll: break fd ready for", ev.filter)
throw("runtime: netpoll: break fd ready for something unexpected")
}
return false
}
// 消費喚醒時寫入的數據,一個喚醒結束
func drainWakeupEvent(_ int32) {
var buf [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&buf[0])), int32(len(buf)))
}
超時管理
netpoll 支持等待事件就緒的超時時間,其原理是設置一個定時器,然后超時時把協程改成就緒狀態等待調度執行。
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
lock(&pd.lock)
// 舊值
rd0, wd0 := pd.rd, pd.wd
// 讀寫超時
combo0 := rd0 > 0 && rd0 == wd0
if d > 0 {
// 絕對超時時間
d += nanotime()
}
// 記錄設置了讀或寫超時
if mode == 'r' || mode == 'r'+'w' {
pd.rd = d
}
if mode == 'w' || mode == 'r'+'w' {
pd.wd = d
}
pd.publishInfo()
// 設置了超時并且讀寫一樣
combo := pd.rd > 0 && pd.rd == pd.wd
// 設置定時器回調,處理讀寫超時
rtf := netpollReadDeadline
if combo {
rtf = netpollDeadline
}
// 第一次設置
if !pd.rrun {
// 大于 0,修改定時器時間為 rd,處理函數為 rtf
if pd.rd > 0 {
pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
pd.rrun = true
}
} else if pd.rd != rd0 || combo != combo0 { // 新舊的超時時間不一樣
pd.rseq++ // invalidate current timers
// 修改
if pd.rd > 0 {
pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
} else {
// 停止定時器
pd.rt.stop()
pd.rrun = false
}
}
// 寫也差不多,忽略
// 超時時間小于 0 才會執行下面的邏輯
delta := int32(0)
var rg, wg *g
// 新的超時時間小于 0,則修改 pollDesc 狀態為 Nil 并喚醒阻塞的協程
if pd.rd < 0 {
rg = netpollunblock(pd, 'r', false, &delta)
}
if pd.wd < 0 {
wg = netpollunblock(pd, 'w', false, &delta)
}
unlock(&pd.lock)
// 把協程改成就緒狀態,等待調度
if rg != nil {
netpollgoready(rg, 3)
}
if wg != nil {
netpollgoready(wg, 3)
}
// 等待者數量減去 delta
netpollAdjustWaiters(delta)
}
func netpollgoready(gp *g, traceskip int) {
goready(gp, traceskip+1)
}
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
func ready(gp *g, traceskip int, next bool) {
status := readgstatus(gp)
mp := acquirem()
// 改成就緒狀態
casgstatus(gp, _Gwaiting, _Grunnable)
// 加入 p 的隊列等待調度執行
runqput(mp.p.ptr(), gp, next)
wakep()
releasem(mp)
}
poll_runtime_pollSetDeadline 用于設置一個協程超時等待某個 fd 事件,支持取消超時設置。當超時時執行 netpolldeadlineimpl 把協程改成就緒狀態等待調度執行。
func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
lock(&pd.lock)
delta := int32(0)
var rg *g
// 修改 pollDesc 的狀態
if read {
pd.rd = -1
pd.publishInfo()
rg = netpollunblock(pd, 'r', false, &delta)
}
// write 寫同上
unlock(&pd.lock)
// 把協程改成就緒狀態
if rg != nil {
netpollgoready(rg, 0)
}
netpollAdjustWaiters(delta)
}
netpool 在 Go 中的使用
IO 多路復用模塊只是提供了一些基礎的能力,那么這些能力是如何和 Go 結合起來的呢?下面從啟動一個 TCP 服務器為例,看看 netpoll 模塊是如何和 Go 結合起來的。
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
l, err = sl.listenTCP(ctx, la)
return l, nil
}
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
return sl.listenTCPProto(ctx, laddr, 0)
}
func (sl *sysListener) listenTCPProto(ctx context.Context, laddr *TCPAddr, proto int) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, proto, "listen", ctrlCtxFn)
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {
family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlCtxFn)
}
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {
s, err := sysSocket(family, sotype, proto)
fd, err = newFD(s, family, sotype, net)
fd.listenStream(ctx, laddr, listenerBacklog(), ctrlCtxFn)
return fd, nil
}
func (fd *netFD) listenStream(ctx context.Context, laddr sockaddr, backlog int, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) error {
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
if err = fd.init(); err != nil {
return err
}
return nil
}
創建一個 TCP 服務器首先創建了一個 socket,接著綁定到監聽到地址和把 socket 改成 listen 狀態,這樣就完成了服務器的啟動,啟動成功后執行 fd.init() 把 socket 對應的 fd 注冊到 IO 多路復用模塊。
func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}
func (fd *FD) Init(net string, pollable bool) error {
err := fd.pd.init(fd)
return err
}
func (pd *pollDesc) init(fd *FD) error {
// 懶初始化 IO 多路復用模塊
serverInit.Do(runtime_pollServerInit)
// 注冊 fd 讀寫事件
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
// 記錄這個上下文,對應一個 pollDesc 結構體,后續用到
pd.runtimeCtx = ctx
return nil
}
接著就調 netpoll 模塊的 poll_runtime_pollOpen。
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
// 從 cache 里分配一個 pollDesc
pd := pollcache.alloc()
lock(&pd.lock)
wg := pd.wg.Load()
rg := pd.rg.Load()
pd.fd = fd
pd.closing = false
pd.setEventErr(false, 0)
pd.rseq++
pd.rg.Store(pdNil)
pd.rd = 0
pd.wseq++
pd.wg.Store(pdNil)
pd.wd = 0
pd.self = pd
pd.publishInfo()
unlock(&pd.lock)
// 注冊 fd
errno := netpollopen(fd, pd)
return pd, 0
}
這樣就完成了服務器的啟動和 socket fd 事件的注冊,接著就調 accept 等待連接的到來。
func (l *TCPListener) Accept() (Conn, error) {
c, err := l.accept()
return c, nil
}
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
return newTCPConn(fd, ...), nil
}
func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
// 拿到 fd,創建一個 netFD
netfd, err = newFD(d, fd.family, fd.sotype, fd.net)
// 再次注冊到 IO 多路復用模塊,進行數據通信
netfd.init()
return netfd, nil
}
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
// 加鎖
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()
// 讀判斷是否有錯誤,有則返回
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EAGAIN:
if fd.pd.pollable() {
// 阻塞等待,重新 accept
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
return -1, nil, errcall, err
}
}
這里有兩個地方涉及到 netpoll,分別是 prepareRead 和 accept。
func (pd *pollDesc) prepareRead(isFile bool) error {
return pd.prepare('r', isFile)
}
func convertErr(res int, isFile bool) error {
switch res {
case pollNoError:
return nil
case pollErrClosing:
return errClosing(isFile)
case pollErrTimeout:
return ErrDeadlineExceeded
case pollErrNotPollable:
return ErrNotPollable
}
}
func (pd *pollDesc) prepare(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return nil
}
res := runtime_pollReset(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
func poll_runtime_pollReset(pd *pollDesc, mode int) int {
// 判斷是否有錯誤
errcode := netpollcheckerr(pd, int32(mode))
// 有則返回
if errcode != pollNoError {
return errcode
}
if mode == 'r' {
pd.rg.Store(pdNil)
} else if mode == 'w' {
pd.wg.Store(pdNil)
}
return pollNoError
}
func netpollcheckerr(pd *pollDesc, mode int32) int {
info := pd.info()
if info.closing() {
return pollErrClosing
}
// 超時
if (mode == 'r' && info.expiredReadDeadline()) || (mode == 'w' && info.expiredWriteDeadline()) {
return pollErrTimeout
}
if mode == 'r' && info.eventErr() {
return pollErrNotPollable
}
return pollNoError
}
如果沒有發生錯誤則繼續調 accept,當沒有連接時,accept 會返回 EAGAIN(非阻塞調用條件不滿足時的錯誤碼),接著執行 waitRead。
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// 阻塞當前協程
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
}
return pollNoError
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
// 執行到這時,可能已經就緒了,則返回,因為 Go 是多線程的,一個線程讀的時候,另一個線程可能寫
if gpp.CompareAndSwap(pdReady, pdNil) {
return true
}
// 一般情況,設置為 pdWait 狀態,表示準備進入阻塞狀態
if gpp.CompareAndSwap(pdNil, pdWait) {
break
}
}
// waitio 是 false,但一般沒有 error,執行 gopark 阻塞協程
if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)
}
// 被喚醒,重置為 pdNil
old := gpp.Swap(pdNil)
return old == pdReady
}
netpollblock 最終調 gopark 阻塞協程。
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, ...) {
mp.waitlock = lock
mp.waitunlockf = unlockf
releasem(mp)
mcall(park_m)
}
func park_m(gp *g) {
mp := getg().m
// 修改協程為 _Gwaiting 狀態
casgstatus(gp, _Grunning, _Gwaiting)
// 執行 waitunlockf,即 netpollblockcommit
if fn := mp.waitunlockf; fn != nil {
ok := fn(gp, mp.waitlock)
mp.waitunlockf = nil
mp.waitlock = nil
// 如果返回 false,則說明就緒了,修改協程為 _Grunnable,繼續執行它
if !ok {
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
// 否則重新調度其他協程
schedule()
}
正常情況下,gopark 設置協程為 Gwaiting 狀態,然后重新調度,相當于該協程就暫停執行了。但是從開始執行 gopark 到現在可能情況已經發生了變化,所以 Go 還會執行一下鉤子函數 waitunlockf,這里是 netpollblockcommit。
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// 如果當前還是 pdWait 狀態則修改 pollDesc 的 rg 或 wg 字段為當前協程結構體
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
// 修改成功則等待者加 1
if r {
netpollAdjustWaiters(1)
}
// 返回是否修改成功
return r
}
正常情況下,這個過程是 pollDesc 的 rg 或 wg 字段從 Nil 到 pdWait 再到協程結構體地址,除非執行過程中事件已經就緒。從前面的分析可以看到啟動一個服務器并調用 Accept 后為什么協程會阻塞了,從中也可以看到 Go 中以同步方式寫異步代碼的底層實現。
那么協程阻塞后,什么時候才會喚醒呢?又是怎么喚醒的?Go 會在某些時機調用 IO 多路復用模塊的 netpoll 獲取就緒的事件,從而喚醒關聯的協程。這種時機有幾個,比如在 sysmon 線程中定時獲取,或者調度時獲取,我們搜索 netpoll 函數的調用就可以看到,下面以 sysmon 線程的處理為例。
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
// 更新上次 poll 的時間
sched.lastpoll.CompareAndSwap(lastpoll, now)
// 通過 IO 多路復用模塊獲取就緒的事件(所以關聯的 g)列表
list, delta := netpoll(0) // non-blocking - returns list of goroutines
if !list.empty() {
// 把g 改成就緒并放入隊列等待調度
injectglist(&list)
incidlelocked(1)
// 等待者減 delta(負數)
netpollAdjustWaiters(delta)
}
}
前面介紹過 netpoll 最終會調 netpollready。
func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
// 喚醒的協程個數
delta := int32(0)
var rg, wg *g
// 喚醒對應的等待者,讀或寫
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true, &delta)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true, &delta)
}
// 喚醒成功插入隊列
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
return delta
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
// 當前狀態
old := gpp.Load()
// 之前就是 pdReady 則不需要再處理,等待上層消費后改成 Nil
if old == pdReady {
return nil
}
// 新狀態
new := pdNil
// 這里是 true,設置超時并且超時時 ioready 是 false
if ioready {
new = pdReady
}
// 改成 pdReady 狀態
if gpp.CompareAndSwap(old, new) {
// 當前狀態如果是 pdWait 則改成 old,即不需要處理該協程,
// 前面介紹阻塞協程時講到,pdWait 是準備進入阻塞狀態,然后在 netpollblockcommit 會再進一步判斷,如果這里改成 pdReady 狀態,則協程不會阻塞。
if old == pdWait {
old = pdNil
} else if old != pdNil { // old 是等待讀寫事件的協程
*delta -= 1 // 等待者減一
}
// 返回待喚醒的協程
return (*g)(unsafe.Pointer(old))
}
}
}
netpollunblock 第一種處理場景是事件就緒時把狀態改成 pdReady,第二種處理場景是事件沒有就緒,但設置了超時時間并且已經超時,則把狀態改成 Nil,把該協程改成就緒狀態,然后返回阻塞在該事件的協程并把它加入到隊列中等待調度執行。
func injectglist(glist *gList) {
head := glist.head.ptr()
var tail *g
qsize := 0
// 改成就緒狀態
for gp := head; gp != nil; gp = gp.schedlink.ptr() {
tail = gp
qsize++
casgstatus(gp, _Gwaiting, _Grunnable)
}
// Turn the gList into a gQueue.
var q gQueue
q.head.set(head)
q.tail.set(tail)
*glist = gList{}
// 啟動新的線程處理
startIdle := func(n int) {
for i := 0; i < n; i++ {
mp := acquirem() // See comment in startm.
lock(&sched.lock)
pp, _ := pidlegetSpinning(0)
if pp == nil {
unlock(&sched.lock)
releasem(mp)
break
}
startm(pp, false, true)
unlock(&sched.lock)
releasem(mp)
}
}
pp := getg().m.p.ptr()
if pp == nil {
lock(&sched.lock)
// 放入全局隊列
globrunqputbatch(&q, int32(qsize))
unlock(&sched.lock)
startIdle(qsize)
return
}
// 放到 p 本地隊列
if !q.empty() {
runqputbatch(pp, &q, qsize)
}
}
協程和 IO 多路復用的結合思路也可以參考 Russ Cox 寫的 libtask 和這個分析文章 https://zhuanlan.zhihu.com/p/360477474。