成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Go BIO/NIO探討:Go netpoll 是如何工作的

開發 前端
一般我們聊到 netpoll 時,是指 Go runtime 中借助于epoll對套接字進行批量監聽、數據到來時喚醒特定goroutine的機制。

一般我們聊到 netpoll 時,是指 Go runtime 中借助于epoll對套接字進行批量監聽、數據到來時喚醒特定goroutine的機制。對應的代碼存放在runtime/netpoll.go 和 runtime/netpoll_epoll.go (只考慮linux) 中。為此 runtime 提供了兩大類函數:

第一類:調用方是 Go Runtime。

  1. netpoll: 檢查有事件發生的套接字,并返回處于pdReady狀態的goroutine列表,基于epoll_wait。
  2. netpollBreak: 向 netpollBreakWr 寫入一個字節數據,通過管道傳到 netpollBreakRd,epoll_wait 監聽到read pipe上的event,立即返回。

第二類:調用方是internal/poll、net、net/http等。

  1. poll_runtime_pollServerInit(netpollGenericInit): 初始化poller,基于epoll_create1。
  2. poll_runtime_pollOpen: 將套接字添加到監聽列表,基于 epoll_ctl。
  3. poll_runtime_pollWait: 等待套接字上的事件,可以休眠(gopark)當前goroutine, 借助于netpollblock函數。
  4. poll_runtime_pollUnblock: 使用Unblock模式進行poll。
  5. poll_runtime_pollClose: 將套接字從監聽列表刪除,基于 epoll_ctl。
  6. poll_runtime_pollReset: nonblock模式下 prepareRead/prepareWrite 使用。

這些函數都會被link到 internal/poll.runtime_xxx, xxx 可以是。
runtime_pollServerInit/runtime_pollOpen等。

后面我們挑一些主要的函數來說一下。

netpollGenericInit 初始化 poller

netpollGenericInit 保證 poller 被初始化,原子變量netpollInited保證其僅被初始化一次。

func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}

這個函數只是一個殼,初始化邏輯封裝在netpollinit函數中,依賴于平臺具體的實現。linux下,init的邏輯是:

  1. 通過epoll_create1系統調用創建 epoll fd。
  2. 創建一對 read/write pipe。pipe的一個特性是向 write pipe寫入數據,read pipe 就能收到同樣的數據。
  3. 通過epoll_ctl將 write pipe 對應的fd 加入到監聽列表。

單獨創建一對pipe后,runtime就能夠按需中斷epoll_wait,讓netpoll函數立即返回。

func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd < 0 {
epfd = epollcreate(1024)
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}

netpoll函數

netpoll函數的功能是檢查可用的網絡連接,它的工作流程是(happy path):

  1. 創建size=128的epollevent數組, 以接收事件。
  2. 調用epollwait等待事件: 依賴epoll_wait系統調用。
  3. 遍歷epoll events,對于每個event創建一個pollDesc對象調用netpollready,找到對應的goroutine,并將其狀態從pdWait修改為pdReady。
  4. 返回pdReady狀態的 goroutine列表 (gList)。

struct pollDesc中包含兩個信號量字段,可以表示四種狀態:

  1. pdReady: io ready信號等待被接收,goroutine可以消費這個信號,邏輯上是把信號量改成nil。
  2. pdWait: goroutine已經準備好在該信號量上阻塞,但還沒有阻塞;如果goroutine通過gopark阻塞,狀態會變成G pointer如果并發的io ready信號到達,狀態會改成pdReady如果并發的timeout/close信號到達,狀態會被改成nil。
  3. G pointer: goroutine被阻塞在信號量上,可以被下面兩類事件喚醒:io ready信號到來時,狀態被修改好pdReadytimeout/close信號到來時,狀態被修改為nil。
  4. nil: 不是上面三種狀態。

對應一些輔助函數:

  • netpollblock 函數將goroutine狀態從 pdReady 轉化成 pdWait,并gopark當前goroutine。
  • netpollunblock 函數將goroutine狀態從 pdWait 轉換為 pdReady 或 nil。

netpoll函數的代碼在runtime/netpoll_epoll.go中,部分代碼如下:

func netpoll(delay int64) gList {
// epoll fd 為-1,說明不需要poll
if epfd == -1 {
return gList{}
}
var waitms int32
// ...省略一段代碼
var events [128]epollevent
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}

if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
// ... read pipe 有數據
// 不需要喚醒任何goroutine
}

var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
// 將goroutine置為 pdReady
// 并添加到toRun *gList
netpollready(&toRun, pd, mode)
}
}
return toRun
}

備注: netpollready 函數借助于netpollunblock修改goroutine狀態,并將其加到 io ready 的 goroutine list。

runtime在調用 netpoll 時,通常采用的是 nonblock 模式(delay=0), 只有在 findrunnable 的最后一個環節,會檢查是否有單獨的M(GMP中的M)進行net polling,如果沒有,會block等待delay參數指定的時間。

netpollBreak 函數

netpollBreak函數的功能比較簡單,但實現比較有意思。它和netpoll函數通過變量netpollWakeSig進行交互,由于在不同的goroutine中,所以對于該變量的操作都是原子?操作。

// netpollBreak interrupts an epollwait.
func netpollBreak() {
if atomic.Cas(&netpollWakeSig, 0, 1) {
for {
var b byte
n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
if n == 1 {
break
}
if n == -_EINTR {
continue
}
if n == -_EAGAIN {
return
}
println("runtime: netpollBreak write failed with", -n)
throw("runtime: netpollBreak write failed")
}
}
}

poll_runtime_pollOpen 函數

poll_runtime_pollOpen 的邏輯分為三塊:

  1. 給 pollDesc 分配內存。
  2. 初始化 pollDesc 對象。
  3. 借助于 netpollopen 注冊epoll監聽(netpollopen在linux下是 epoll_ctl)。
  4. 返回 pollDesc 對象。

poll_runtime_pollOpen函數的實現位于 runtime/netpoll.go 中, 主要邏輯如下:

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()
lock(&pd.lock)
wg := pd.wg.Load()
if wg != 0 && wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
rg := pd.rg.Load()
if rg != 0 && rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
// ... 省略部分初始化邏輯
unlock(&pd.lock)

errno := netpollopen(fd, pd)
if errno != 0 {
pollcache.free(pd)
return nil, int(errno)
}
return pd, 0
}

// 位于net/netpoll_epoll.go
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

poll_runtime_pollWait 函數

poll_runtime_pollWait 函數只是對 netpollblock 函數的封裝,增加了容錯。值得注意的是,該函數不是runtime觸發的,而是用戶程序觸發的。

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}

下面我們看下用戶程序如何觸發 poll_runtime_xxx 系列的函數。首先,套接字分為兩類:LISTEN套接字(Server套接字) 和 ESTABLISHED套接字(TCPConn);

  • LISTEN 套接字通過系統調用 socket/bind/listen 去生成。
  • ESTABLISHED 套接字通過系統調用 accept 去生成。

LISTEN套接字(Server套接字)

從http server的角度來看,LISTEN套接字注冊epoll監聽的鏈路如下:

// net/http/server.go
func ListenAndServe(addr string, handler Handler) error

// net/http/server.go
func (srv *Server) ListenAndServe() error

// net/dial.go
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}

// net/dial.go
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error)

// net/tcpsock_posix.go
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error)

// net/ipsock_posix.go
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error)

// net/sock_posix.go
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error)

// net/sock_posix.go
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error)
if err = fd.init(); err != nil {
return err
}

// net/fd_unix.go
func (fd *netFD) init() error {
// fd.pfd 類型是 poll.FD
return fd.pfd.Init(fd.net, true)
}

// internal/poll/fd_unix.go
func (fd *FD) Init(net string, pollable bool) error {
// We don't actually care about the various network types.
if net == "file" {
fd.isFile = true
}
if !pollable {
fd.isBlocking = 1
return nil
}
err := fd.pd.init(fd)
if err != nil {
// If we could not initialize the runtime poller,
// assume we are using blocking mode.
fd.isBlocking = 1
}
return err
}

// internal/poll/fd_poll_runtime.go
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}

ESTABLISHED套接字(TCPConn)

http server accept 新的tcp conn。

// net/http/server.go
func (srv *Server) Serve(l net.Listener) error {
for {
rw, err := l.Accept()

// net/tcpsock.go
func (l *TCPListener) Accept() (Conn, error)

func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()

// net/fd_posix.go
func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
// 省略部分代碼
if err = netfd.init(); err != nil
// 省略部分代碼


// internal/poll/fd_unix.go
func (fd *FD) Init(net string, pollable bool) error

// internal/poll/fd_poll_runtime.go
func (pd *pollDesc) init(fd *FD) error

關于 net.netFD struct

netFD是對套接字(網絡文件描述符)的封裝。對于Server套接字而言,可以通過accept方法從Server套接字(LISTEN套接字)獲取新的TCP連接(或ESTABLISHED套接字)。Linux的accept系統調用返回的ESTABLISHED套接字是一個int值,通過 newFD 和 init 函數將其封裝為一個完整的 netFD結構,后面會被封裝為一個net.TCPConn。

對于操作系統而言,LISTEN套接字和ESTABLISHED套接字都只是一個int類型的文件描述符,沒有本質區別。系統調用accept和read都是從套接字讀取數據,所以epoll里會放到一個batch里去監聽。

這是 netFD 的定義和accept方法的實現:

// Network file descriptor.
type netFD struct {
pfd poll.FD

// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}

func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}

if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}

net.netFD 依賴 poll.FD 實現poll功能。區別正如名字所展示,net.netFD是封裝了網絡相關的功能,而 poll.FD是更為通用的FD,封裝了文件描述符上能進行的操作。其定義如下:

// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex

// System file descriptor. Immutable until Close.
Sysfd int

// I/O poller.
pd pollDesc

// Writev cache.
iovecs *[]syscall.Iovec

// Semaphore signaled when file is closed.
csema uint32

// Non-zero if this file has been set to blocking mode.
isBlocking uint32

// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool

// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool

// Whether this is a file rather than a network socket.
isFile bool
}

poll.FD 依賴 poll.pollDesc 實現poll功能。poll.pollDesc 實現了 IO polling 的功能。poll.pollDesc 有一系列的方法,比如 init、wait、close、prepare 等都是對 runtime_pollXXX 函數系列的封裝,下面詩pollDesc的部分邏輯:

type pollDesc struct {
runtimeCtx uintptr
}

var serverInit sync.Once

func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}


責任編輯:姜華 來源: 今日頭條
相關推薦

2023-03-06 08:37:58

JavaNIO

2023-03-31 07:49:51

syscall庫Echo Serve

2023-03-09 08:22:57

Go net庫Socket

2025-06-20 09:57:42

2011-03-31 10:41:49

BIONIOIO

2024-08-20 16:27:54

2023-04-03 08:02:16

切片擴容GO

2014-11-10 10:52:33

Go語言

2020-04-16 15:20:43

PHP前端BIO

2022-04-16 16:52:24

Netty網絡服務器客戶端程序

2023-11-29 08:19:45

Go泛型缺陷

2023-04-03 06:53:04

Go開發架構

2023-09-26 01:21:34

2021-11-02 11:31:47

Go代碼模式

2018-09-19 14:53:02

NIOBIO運行

2023-08-07 08:01:15

2020-08-05 12:27:18

Go語言碼農

2023-08-10 08:00:42

令牌限流器計數器

2025-04-02 05:23:00

GoChannel數據

2022-05-19 08:56:13

Go提案賦值
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 精品欧美乱码久久久久久 | 精品影院 | 欧美又大粗又爽又黄大片视频 | 欧美黄色片| 午夜在线小视频 | 欧美精品一区二区三区蜜桃视频 | 午夜精品一区 | 国外成人在线视频网站 | 美女久久视频 | 午夜网址 | 久草99| 一区二区在线看 | 日韩欧美在线播放 | 欧美中文在线 | 国产99久久精品一区二区永久免费 | 中文字幕成人 | 香蕉视频久久久 | 在线日韩视频 | www.99re| 精品综合视频 | 视频在线一区二区 | 日韩一区二区在线观看视频 | 亚洲第一区国产精品 | 337p日本欧洲亚洲大胆鲁鲁 | 羞羞在线视频 | 日韩精品一区二区三区中文字幕 | 欧美一级特黄aaa大片在线观看 | 国产精品久久久久国产a级 欧美日本韩国一区二区 | 婷婷久久综合 | 亚洲精品免费视频 | 国产在线97 | 日本在线免费 | 久久久久香蕉视频 | 操久久| 一级高清视频 | 一区二区三区免费 | 国产一级免费视频 | 亚洲成人999| 美国一级片在线观看 | 久久一本| 色综合欧美 |