Channel 工作原理及源码分析

golang提倡使用通信的方式来共享内存,而不是通过共享内存来通信。下面介绍channel是如何实这一思想的,以及使用channel时需要注意的地方。

一、基本原理

1.写入数据时

若channel recvq(读阻塞队列)上有被gopark的g,则直接把数据拷贝到队首g的栈空间,并唤醒g;若recvq中没有gopark的g,但缓冲区未满,则把数据放入缓冲区。

  • 阻塞方式:若缓冲区满,则直接gopark当前g,等待读g的唤醒。
  • 非阻塞方式:若缓冲区满,直接返回,不对数据做任何处理。
2.读取数据时

跟写数据有点不一样,当channel sendq(写阻塞队列)上有被gopark的g时分两种情况:若缓冲区大小为0则直接从g上读取数据;若缓冲区大小不为0,此时缓冲区必然已满(若不未满g也不会被gopark),则从缓冲区头部读取数据,把send g 的数据拷贝到缓冲区尾部,并goready g。

  • 阻塞方式:若缓冲区中没有数据,则当前g被gopark。
  • 非阻塞方式:若缓冲区中没有数据,则直接返回。

二、定义

channel可以简单的看成一个带锁的环形队列,结构体定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//go version 1.17.1, 源文件:src\runtime\chan.go
type hchan struct {
qcount uint // 当前元素个数,len(channel)时返回dataqsiz
dataqsiz uint // 容量,cap(channel)时返回qcount
buf unsafe.Pointer // 指向环形队列数据元数
elemsize uint16 // 元素大小
closed uint32 // channel是否关闭
elemtype *_type // 元素类型
sendx uint // 发送元素索引
recvx uint // 接收元素索引,当sendx=recvx时,缓冲区为空
recvq waitq // 从channel中读取数据时,阻塞在channel上的g列表
sendq waitq // 往channel中写入数据时,阻塞在channel上的g列表
// lock保存hchan的所有字段和sudog阻塞在当前channel上的部分字段
// 持有此锁时不要改变另一个g的状态,尤其不要唤醒g(ready a G),不然在栈收缩时可能死锁
lock mutex
}

三、往Channel中写数据

1.阻塞与非阻塞

往channel写入数据分为阻塞模式和非阻塞模式:阻塞模式时若channel为nil或channel缓冲区满,当前g会被gopark。非阻塞模式遇到这两种情况则直接返回fase。

1.1阻塞

当以如下方式向channel中写入数据时,会被编译器翻译成阻塞模式,block为true

1
2
3
4
select {
case c <- v:
...
}
1.2非阻塞

若在case后加上default则会翻译成非阻塞模式,block为false

1
2
3
4
5
6
select {
case c <- v:
...
default:
...
}
2.源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//若channel为nil且是非阻塞模式则返回fase,否则gopark当前g并抛异常
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
//非阻塞模式channel缓冲区满(且channel未关闭)直接返回
if !block && c.closed == 0 && full(c) {
return false
}
lock(&c.lock)
//往关闭的channel中写数据,直接panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//若channel的recvq队列上有阻塞的g,则绕过channel buffer 直接把数据给队首的g
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
//若buffer未满直把数据拷贝到缓冲区
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

//若channel buffer已经满且为非阻塞方式,则直接返回
if !block {
unlock(&c.lock)
return false
}

// 下面是g阻塞在channel上的实现(sudog 表示阻塞在sendq队列上g的封装)
gp := getg() //获取当前g
mysg := acquireSudog()
mysg.releasetime = 0

mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp //保存当前g的地址,以便于唤醒
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) //把sudog放入sendq列队
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) //gopark当前g

//保活,避免被gc
KeepAlive(ep)

// 被唤醒后清理sudog,并把sudog重新放入到当前p缓存中
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}

//把ep指向的数据直接拷贝到sg的栈上,并唤醒g
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}

四、从Channel中读数据

与channel中写数据类似,从channel中读也分为阻塞和非阻塞方式。若以阻塞方式读时,当channel为nil、channel缓冲区为空时当前g被gopark;若以非阻塞方式读时,遇到这两种情况直接返回。

1.阻塞与非阻塞
1.1阻塞

当以如下方式从channel中读数据时,会被编译器翻译成阻塞模式,block为true

1
2
3
4
select {
case v, ok <- c:
...
}
1.2非阻塞

若在case后加上default则会翻译成非阻塞模式,block为false

1
2
3
4
5
6
select {
case v, ok <- c:
...
default:
...
}
2.源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// chanrecv在c上接收数据并写入ep, 如果case中的数据接收者为空,则ep为nil(例如 case _, ok <- c:).
// 如果block为false且channel中没有数据, 返回 (false, false).
// 否则, 如果c已经关闭, 则把*ep清0, 返回 (true, false).
// 如果缓冲区中有数据, 则把数据写入 *ep 返回 (true, true).
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
//若channel为nil且是非阻塞模式则返回fase,否则gopark当前g并抛异常
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
//非阻塞状态且channel缓冲区中无数据
if !block && empty(c) {
//若channel未关闭则直接返回(false, false)
if atomic.Load(&c.closed) == 0 {
return
}
//若channel已关闭且缓冲区中无数据,则把*ep清0
//若channel已经关闭且缓冲区中有数据,则下面继续读取
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
lock(&c.lock)
//取得锁后再次检查,若channel已关闭且缓冲区中无数据,则把*ep清0 返回(true,false)
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

//当sendq有阻塞的g时:
//1.如果缓冲区大小为0,则从发送g上读取数据
//2.如果缓冲区大小不为0,则从队首读取数据,并把g的数据拷贝到缓冲区,再goready g
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
//缓冲区有数据,则直接从缓冲区中读取数据到ep
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
//做一些清理、修改环形指针指向
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

//若是非阻塞模式、缓冲区中无数据则直接返回,否则阻塞等待
if !block {
unlock(&c.lock)
return false, false
}

// sendq中无阻塞的g且缓冲区中无数据,则阻塞当前g到channel的recvq上
//这里首先取得当前g,再new一个sudog, 跟发送阻塞类似
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0

mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
//设置parkingOnChan标记,告诉试图收缩栈当前g栈的地方,当前g马上要gopark了,收缩是不安全的。
atomic.Store8(&gp.parkingOnChan, 1)
//gopark当前g
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

// 等待有g往channel中发送数据 当前g被唤醒了
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//若缓冲区大小为0,则直接从阻塞的g(sg)中读取数据
//否则从队首读取数据,并把g的数据拷贝到缓冲区,再goready g
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) //唤醒阻塞在队列上的g(gp)
}

五、注意事项

1.向channel中写数据时len(c)检查缓冲区是否满有隐患
1
2
3
4
5
6
7
8
9
10
//code1
func send(){
//当channel的len==cap时,表明channel已满丢弃数据
if len(c.msgChan) >= cap(c.msgChan) {
logger.LogErr("SendMsg the msgChan is full")
return
}
//若缓冲区未满则向channel中阻塞的写入数据
c.msgChan <- &XXMsg{}
}
1
2
3
4
5
6
7
8
9
10
11
//code2
func loop(){
for {//从channel中阻塞的读数据
select {
case msg, ok := <-c.msgChan:
if ok {
c.handleMsg(msg)
}
}
}
}
  • 情况1:上面代码中,loop单独一个goroutine运行,send只在主线程中调用。
  • 情况2:不知什么时候开始,除主线程外,其它goroutine也在调用send。
    若情况1遇到handleMsg被阻塞时,return函数会直接返回,不会对主线程造成影响。 如情况2 send函数在多线程调用情况下是不能保证缓冲区满时第6行一定return的。若此时handleMsg被阻塞,那么有概率造成主线程的阻塞。所以在往channel中写数据时,建议默认使用非阻塞方式即:
1
2
3
4
5
6
select {
case c.msgChan <- &XXMsg{}:
...
default:
...
}