Golang Netpoll 实现原理

golang netpoller封装了不同平台的网络模型,由于我们的游戏服务器程几乎只运行在linux上,这里介绍epoll。epoll是linux下非阻塞的高效的网络模型,对大量文件描述符读写拥有优异的性能。下面先简单介绍epoll的原理,再从源码角度来看golang是如何封装epoll的。

一、epoll实现

1.1 例子

先通过一个简单的例子来看一下epoll是如何使用的,流程如下:

  1. 创建套接字、绑定、监听
  2. epoll_create接口创建epoll对象
  3. epoll_ctl接口注册套接字的事件
  4. epoll_wait接口轮询是否有事件发生,并通过events参数返回就绪(触发)的事件列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int s = socket(AF_INET, SOCK_STREAM, 0);   
bind(s, , ,)
listen(s, ,)

int epfd = epoll_create(128);//创建eventpoll对象

ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
epoll_ctl(epfd, EPOLL_CTL_ADD, s, &ev);//注册事件

while(true){//轮询就绪事件
int n = epoll_wait(epfd, &events[0], len(events), 1000)//返回值n为就绪的事件数,events为事件列表
for( i := 0; i < n; i++ ) {
ev := &events[i]
//处理事件
}
}
1.2 结构体

这里只例出了介绍epoll原理必要的字段

1
2
3
4
5
6
struct eventpoll {
wait_queue_head_t wq; //调用epoll_wait时,被阻塞的线程队列(链表)
struct list_head rdllist; //所有就绪的fd列表(链表)
struct rb_root_cached rbr; //所有通过epoll_ctl添加,需要监控的fd(红黑树)
...
};
1.3 原理
  1. 当调用epoll_create,其实是创建了一个eventpoll结构体对象,在epoll运行期间的相关数据都存在此结构里面。
  2. 接着是通过epoll_ctl注册socket s 感兴趣的事件,结构中的rbr就是用来存放所有注册的socket。同时epoll_ctl接口还会注册回调函数ep_poll_callback
  3. 网卡收到数据后,会把数据复制到内核空间,并触发回调函数ep_poll_callbackep_poll_callback会把就绪的fd指针放入rdllist,并检查wq中是否有阻塞的线程,如果有则唤醒它们。
  4. 调用epoll_wait函数检查是否有事件触发(就绪),如果有,则通过参数2返回(这里其实就是检查rdllist是否为空,如果不为空则返回事件列表)。参数4为阻塞时间,若不为0,在rdllist为空时,调用epoll_wait的线程会被阻塞,并放到wq中,如果阻塞时间结束,仍然没有事件发生,则被唤醒;如果等待期间有事件发生内核触发ep_poll_callback回调并唤醒这个fd上阻塞的线程。
1.4 事件

EPOLLIN :fd可读
EPOLLOUT:fd可写
EPOLLPRI:fd有紧急事件数据到达
EPOLLERR:fd发生错误
EPOLLHUP:fd被挂断
EPOLLET: 设置epoll为边沿触发,默认为水平触发
EPOLLONESHOT:只监听一次事件

二、Golang Netpoll 实现

使用golang可以快速开发一个网络应用,通常不需要借助第三方网络库就能满足需求。那么它是如何做到简单高效的呢?理解了epoll的原理后,下面介绍golang netpoll是如何实现的。

1.1 例子

还是先通过一个简单的例子回顾一下golang网络模块基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func main() {
listen, err := net.Listen("tcp", ":8080")
if err != nil {
fmt.Println("listen error: ", err)
return
}

for {
conn, err := listen.Accept()
if err != nil {
fmt.Println("accept error: ", err)
break
}

go HandleConn(conn)
}
}
func HandleConn(conn net.Conn) {
defer conn.Close()
buf := make([]byte, 1024)
for {
_, _ = conn.Read(buf)
_, _ = conn.Write(buf)
}
}
1.2 结构体

TCP网络listener, 当我们调用net.Listen时,会返回一个TCPListener对象

1
2
3
4
type TCPListener struct {
fd *netFD
lc ListenConfig
}

网络文件描述符,其实是对FD的封装

1
2
3
4
5
6
7
8
9
type netFD struct {
pfd poll.FD // 文件描述符
family int // 地址族标识
sotype int
isConnected bool // 是已经建立连接
net string
laddr Addr
raddr Addr
}

FD是文件描述符。netos包使用此类型表示一个网络连接或os文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex

// socket的系统文件描述符,直到fd被关闭都不会变化
Sysfd int

// I/O poller.
pd pollDesc

// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool //TCP时为true

// true表示普通文件描述符(fd),false表示网络连接描述符(nfd)
isFile bool
}

type pollDesc struct {
runtimeCtx uintptr
}

对连接的读写都是通过此结构的方法实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // 指向读数据被gopark的g
rt timer // 读超时timer
rd int64 // 读超时时间
wseq uintptr // protects from stale write timers
wg uintptr // 指向写数据被gopark的g
wt timer // 写超时timer
wd int64 // 写超时时间
}
1.3 源码

上面的例子主要涉及了接口ListenAcceptReadWriteClose,我们通过源码来看下这几个接口是怎么实现的,以及在上层无感知的情况下如何跟epoll完美绑定到一起的。

1.1 Listen

当我们在应用层调用net.Listen时,Listen接口会依次调用:

  1. sysListener.listenTCP:创建TCPListener对象
  2. socket:返回一个使用network poller异步I/O的网络文件描述符(在socket函数中会创建netFD对象)。
  3. netFD.listenStream:设置套接字参数,绑定,监听。
  4. ollDesc.init:init函数中会调用runtime_pollServerInitruntime_pollOpen
  5. runtime_pollServerInit:调用epoll_create创建epoll
  6. runtime_pollOpen:调用epoll_ctl添加监听事件。
    就这样Listen接口成功和epollepoll_createepoll_ctl接口关联起来了,下面是详细过程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)//解析地址
if err != nil {
return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
}
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la)
}
return l, nil
}

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)//根据网络类型(TCP或其它)返回适当的地址族
return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}

//返回一个使用 network poller异步I/O的网络文件描述符
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
s, err := sysSocket(family, sotype, proto)//创建一个socket,底层调用socket接口
if err != nil {
return nil, err
}
//设置socket参数setsockopt
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
poll.CloseFunc(s)
return nil, err
}
//创建netFD并初始化
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}

if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET://SOCK_STREAM表示TCP,基于连接的字节洗
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM://SOCK_DGRAM表示UDP,无连接的数据报
if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
}
return fd, nil
}

//创建netFD对象并初始化
func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
},
family: family,
sotype: sotype,
net: net,
}
return ret, nil
}

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
var err error
//设置setsockopt为syscall.SOL_SOCKET, syscall.SO_REUSEADDR
if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
return err
}
var lsa syscall.Sockaddr
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
}
if ctrlFn != nil {
c, err := newRawConn(fd)
if err != nil {
return err
}
if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
return err
}
}
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {//绑定
return os.NewSyscallError("bind", err)
}
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {//监听
return os.NewSyscallError("listen", err)
}
if err = fd.init(); err != nil {//初始化FD,并创建epoll,注册epoll事件
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}

//初始化FD,Sysfd必须在调用此函数之Sysfd被设置(socket的fd)
//这个在同一个fd上可能会被调用多次
//net参数是网络名(例如:tcp)或者 "file"
//pollable为true表示fd由runtime netpoll管理
func (fd *FD) Init(net string, pollable bool) error {
//这里不关心网络类型,但需要标识出是文件fd还是网络fd
if net == "file" {
fd.isFile = true
}
if !pollable {//如果不交给netpoll管理,就使用阻塞模式
fd.isBlocking = 1
return nil
}
err := fd.pd.init(fd) //初始化runtime poller
if err != nil {
//如果初始化runtime poller失败,则使用阻塞模式
fd.isBlocking = 1
}
return err
}

var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
//一个进程只会调用一次runtime_pollServerInit,意味着一个进程只会有一个epoll对象
serverInit.Do(runtime_pollServerInit)
//注册epoll事件
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}

//在linux平台上,调用runtime_pollServerInit 会被映射到poll_runtime_pollServerInit
//runtime_pollOpen也同理
func poll_runtime_pollServerInit() {
netpollGenericInit()
}

func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}

func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC) //创建epoll
if epfd < 0 {
epfd = epollcreate(1024)
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()//创建pollDesc对象
lock(&pd.lock)
if pd.wg != 0 && pd.wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
if pd.rg != 0 && pd.rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
pd.closing = false
pd.everr = false
pd.rseq++
pd.rg = 0
pd.rd = 0
pd.wseq++
pd.wg = 0
pd.wd = 0
unlock(&pd.lock)

var errno int32
errno = netpollopen(fd, pd)
return pd, int(errno)
}

//注册事件,可读,可写,挂断,边沿触发
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
1.2 Accept

Accept的调用流程相对简单,当例子中调用listen.Accept时,会依次调用TCPListener.accept(), netFD.accept(), FD.accept()FD.accept()会重置pollDesc中的rg,并调用原始套接字的accept接口,直到有连接到来或发生错误返回,如果返回EGAIN,则当前ggorpark。若成功等到连接则创建netFD对象,再调用netFD.init()进入跟ListennetFD.init()一样的流程。这里需要注意的是同一进程epoll只会被创建一次(runtime_pollServerInit用的sync.One不管调用多少次,只有第一次会被执行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd) //
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}

//创建TCPConn对象,并设置setsockopt参数
func newTCPConn(fd *netFD) *TCPConn {
c := &TCPConn{conn{fd}}
setNoDelay(c.fd, true)
return c
}

//设置setsockopt的参数TCP_NODELAY, 禁用Nagle算法
func setNoDelay(fd *netFD, noDelay bool) error {
err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, syscall.TCP_NODELAY, boolint(noDelay))
runtime.KeepAlive(fd)
return wrapSyscallError("setsockopt", err)
}

func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}

//创建netFD对象
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}

//走Listen一样的流程,最终在epoll中注册当前套接字(fd)的事件
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()

//重置pollDesc中的rg为0
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
//轮训是否有connect来到,并返回非阻塞的fd
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EAGAIN://如果暂没有connect,则gopark当前g
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}
1.3 Read/Write

ReadWrite的流程非常相似,这里只介绍Read。在调用例子中conn.Read(buf)时,调用流程为conn.Read->netFD.Read->FD.ReadFD.Read首先重置pollDesc中的rg为0,检查是否有可读数据,有则读取返回;若返回EAGAINgopark当前g

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}

func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
return 0, nil
}
//把pollDesc.rg置为0
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
//最多只能读maxRW
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
//尝试从fd.Sysfd中读取数据,或读取成功,则直接返回。或返回errcode为EAGAIN,则gopark当前g
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}

// On MacOS we can see EINTR here if the user
// pressed ^Z. See issue #22838.
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}
1.4 轮询epoll

在1.1~1.3的源码中,我们已经看到了golangepollepoll_createepoll_ctrl的封装,但没看到epoll_wait。而且在1.2,1.3的源码中,accept, Read如果返回EAGAIN时,当前g会被gopark。下面我们就来看下epoll_wait是何时被调用的,以及由于EAGAINgoparkg是何时被唤醒的。在sysmonfindrunnable中会调用netpoll函数,返回所有就绪fd上的g,并加入到全局队列中。acceptReadWritegoparkg的指针都保存在pollDesc中,所以fd一旦就绪,我们可通过wgrg找到g并交runtimeruntime会把它从_Gwaiting状态置为_Grunnable,此时就绪的g就可以接着gopark时的状态继续执行。需要注意的是ep_poll_callback只会唤醒内核线程,被goparkg则在runtime中唤醒的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
func netpoll(block bool) gList {
if epfd == -1 {
return gList{}
}
waitms := int32(-1)
if !block {
waitms = 0
}
var events [128]epollevent
retry:
//下面的代码跟我们前面介绍epoll使用时的代码非常相似
n := epollwait(epfd, &events[0], int32(len(events)), waitms)//轮询是否有事件发生,并通过events参数返回就绪(触发)的事件列表
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
goto retry
}
var toRun gList
for i := int32(0); i < n; i++ {//取出并处理事件
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32//取出事件类型,读、写
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
netpollready(&toRun, pd, mode)//取出gopark的g并重置rg,wg
}
}
if block && toRun.empty() {
goto retry
}
return toRun
}

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg) //取出gopark的g加入toRun列表,交给runtime处理
}
if wg != nil {
toRun.push(wg)
}
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

for {
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
if atomic.Casuintptr(gpp, old, new) {//重置rg或wg
if old == pdReady || old == pdWait {
old = 0
}
return (*g)(unsafe.Pointer(old))//取出被gopark的g
}
}
}

参考
https://taohuawu.club/go-netpoll-io-multiplexing-reactor
http://gityuan.com/2019/01/06/linux-epoll/
https://zhuanlan.zhihu.com/p/64746509