Golang Runtime

一、基础知识

在阅读源码前,我们先来熟悉一下相关的基础知识,这是理解runtime的基础。

  • 用户级线程:由用户运行时(runtime)来管理线程,操作系统不能感知其存在,所以也不会对其调度,一般会在用户空间提供一个线程库来操作它们。在进程中多个用户级线程对应一个内核级线程,当内核线程阻塞,进程中的所有线程都会被阻塞。Lua的协程(coroutine)、golang的goroutine都属于用户级线程。
  • 轻量级进程(LWP):Linux中没有线程概念,跟线程比较相似的是轻量级进程(LWP),一个进程中可以有多个LWT。LWP需要和内核线程绑定才能被执行,与内核线程是一对一的绑定关系。为了方便理解,我们直接把LWP看成是内核线程。
  • 混合模型:进程中可以同时有多个内核线程和多个用户线程,用户线程只有绑定内核线程才能被执行,多个用户线程可以进入同一个内核线程的执行队列,由runtime负责用户线程的调度以及用户线程与内核线程的绑定与解绑。golang就是采用此模型。
  • OS调度:线程是内核调度的基本单位,在windows和linux上,可以通过CreateThread和clone创建线程(Linux clone创建的是LWP),创建线程需要传递入口地址(通常是函数),线程从这个入口地址开始执行。因为OS中的线程数量大部分时候是>CPU核心数,OS为了确保每个线程都能得到公平的执行提出了时间片的概念,即每个线程只能持续执行一段时间,时间片到了后,保存上下文,切换到其它线程。上下文包含了2个重要的信息分别是PC和堆栈,记录它们可以确保线程被中断后,下次无论在哪个核上都能接着上次中断的现场继续执行。
  • CPU:CPU是真正的执行单位,线程是被CPU执行。在内核的调度下,符合条件的线程被CPU执行,正在被执行的其实是线程的局部指令,在这些局部指令可能是线程的某个函数片段或者协程。

二、GPM数据结构

golang runtime是基于GPM模型来实现高并发的,它能充分发挥多核CPU的优势, 让每个核的负载更均衡。GMP在源码里对应了各自的结构体,这里只列出几个理解runtime必要的字段。

1.1 G(goroutine)

runtime调度的基本单位。创建goroutine时会在函数前加go关键字,这个函数地址表示当前goroutine的入口地址。goroutine在执行过程中可能因为各种原因被暂停,这时需要保存PC和堆栈信息,以便恢复后时继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//源码文件 runtime/runtime2.go, runtime/proc.go, runtime/signal_unix.go, runtime/preempt.go
type g struct {
goid int64 // goroutine id
atomicstatus uint32 // 当前状态
stack stack // g栈区间
sched gobuf // 运行时信息,包含PC以及运行时的堆栈信息
stackguard0 uintptr // tackguard0 = stack.lo + StackGuard,如果要抢占当前g会把字段值设为stackPreempt
}
type stack struct { //栈从高地址往低地址增长
lo uintptr
hi uintptr
}
type gobuf struct {
sp uintptr
pc uintptr
bp uintptr
}
  • 状态:每个goroutine都包含如下状态,当goroutine被创建后就在下面这些状态间切换。
1
2
3
4
5
6
7
_Gidle  // 刚被创建,还没初始化
_Grunnable // 在运行队列中,还没被执行,也没分配栈
_Grunning // 运行中
_Gsyscall // 系统调用
_Gwaiting // 阻塞状态,比如:等待channel、i/o操作被gopark
_Gdead // goroutine执行结束,进入freelist中
_Gpreempted // g被通过信号方式抢占,此状态不能直接goready
1.2 P(Processor)

逻辑处理器,从功能角度来看它更像是个资源管理器,主要包含了goroutine队列以及当前P的内存分配信息,创建goroutine时,首先偿试放入当前P的Local队列,如果队列已满,则把本地队列中的一半g转移到全局队列中。P必须和M绑定才能工作,一个P任意时刻只能绑定一个M,所以在P上分配内存,取队列中的goroutine都不需要加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type p struct {
id int32 //P的id
status uint32 //当前状态
mcache *mcache //内存分配器,之前的文章已经介绍过
runqhead uint32 //等执行的goroutine列表,访问时不需要加锁
runqtail uint32
runq [256]guintptr
m muintptr //P绑定的m
gFree struct { //当G运行结束后,清除数据放入列表以便复用
gList
n int32
}
runnext guintptr //下个运行的g,如果不为nil, 则当前g执行完后,优先执行它
}

P的状态

1
2
3
4
5
_Pidle //当M没有G可执行时,P进入空闲列表
_Prunning //P和M绑定,正在执行G或在寻找可执行的G
_Psyscall //与之关联中的M进入系统调用
_Pgcstop //GC STW
_Pdead //调小GOMAXPROCS数量后多余的p置为此状态
1.3 M(Machine)

系统线程抽象。M会从P的队列中取G来执行,当G被暂停M会把上下文信息写回G,并取下一个G继续执行。

1
2
3
4
5
6
7
8
9
10
type m struct {
id int64
g0 *g // 用于执行调度任务的g,使用系统栈,不受gc影响
tls [6]uintptr // 线程本地存储空间
curg *g // 当前正在被执行的goroutine
nextp puintptr // M被唤醒需要立即绑定的P
p puintptr // 与M绑定的P
spinning bool // true表示M处于自旋转状态(当前没有g执行,正在寻找可执行的g)
mcache *mcache //当M与P绑定后,跟P的mcache指向同一个内存分配器
}

三、调度流程

1.1 相关概念
  • M0:进程的主线程,启动进程时默认启动M0。
  • G0:每个M创建的第一个g叫g0,创建M时为g0分配固定栈空间不走普通g0的栈扩张流程,g0不受GC影响,主要工作包括:goroutine的创建、调度等。(newproc、schedue、netpoll运行在g0上)
  • 自旋转:进入调度流程后findrunable函数会一直寻找可执行的g(本地队列,全局队列,netpoll),如果处于自旋转状态,还会偿试去其它p的队列中偷g,直到找到为止。当自旋转中的m数量 < runing中p数量的1/2时,才会有m进入自旋转,这样保证了新到的g能够快速被执行,也确保了runtime中不会同时存在大量不必要自旋转的M消耗cpu
1.2 主要流程
  • 当启动m0时,会创建GOMAXPROCS个p, 状态置为_Pidle
  • go关键字创建协程时,首先偿试把g放入P本地队列,如果本地队列已满,则把本地队列中的g迁移一半到全局队列。并检查如果存在idle的P且没有M处于自旋转,则获取一个idle的P跟idle的M绑定,如果没有idle的M,则新建一个M,并让M进入自旋转。g运行结束后g0会调用schedue继续寻找可执行的g。
  • schedue调用findrunable函数寻找可执行的g,首先从p的本地队列(runq)中寻找可执行(状态为_Grunnable)g;如果没找到则从全局队列中寻找,若全局队列中有则拿一批(sched.runqsize/gomaxprocs + 1个,最多拿一半)到本地队列;如果全局队列也没有,则查看netpoll中是否有阻塞的g,如果有则检查epoll是否有fd就绪,有则寻找成功,把这些g的状态从_Grunnable改为_Grunning,并把多余的g放入全局队列;如果netpoll中也没有,则从其它p中偷。
  • g的调度没有时间片概念,sysmongc抢占g做法是把stackguard0stackPreempt,当g调用函数触发newstack时根据stackPreempt标记抢占。在golang1.14以前如果g中没有函数调用,比如是一个纯数值计算的for循环,那g永远也不会被抢占,当然gc也无法成功执行。

四、Runtime源码

1.1 main函数启动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
//1. osinit 初始化cpu数量和页大小
func osinit() {
ncpu = getproccount()
physHugePageSize = getHugePageSize()
}

// 2. call schedinit
func schedinit() {
sched.maxmcount = 10000 //m的最大数量
mallocinit() //内存分配相关
//根据环境变量,创建GOMAXPROCS个G
sched.lastpoll = uint64(nanotime())
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
}
// 3. 启动 main goroutine
func main() {
g := getg()
// 标记main已经启动
mainStarted = true
//在系统栈上(也就是通过g0)创建m,并执行sysmon,所以symon是在单独的m上运行,不受gc影响
if GOARCH != "wasm" {
systemstack(func() {
newm(sysmon, nil)
})
}
//如果main goroutine不在m0上运行,肯定bug了
if g.m != &m0 {
throw("runtime.main not on m0")
}
//执行runtime的init和用户包中init函数
doInit(&runtime_inittask)
doInit(&main_inittask)
//调用用户自定义的main函数
//从这里可以看出在golang中,init函数先于main函数执行
fn := main_main
fn()
//退出主进程
exit(0)
for {
var x *int32
*x = 0
}
}
//4. call runtime·mstart
//M0在这里调用的https://github.com/golang/go/blob/master/src/runtime/asm_amd64.s#L225
// mstart 是一个新M的入口函数.
func mstart() {
_g_ := getg()
osStack := _g_.stack.lo == 0
if osStack {
// 初始化g0栈大小
size := _g_.stack.hi
if size == 0 {
size = 8192 * sys.StackGuardMultiplier
}
_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
_g_.stack.lo = _g_.stack.hi - size + 1024
}
_g_.stackguard0 = _g_.stack.lo + _StackGuard
_g_.stackguard1 = _g_.stackguard0
mstart1()
if GOOS == "windows" || GOOS == "solaris" || GOOS == "illumos" || GOOS == "plan9" || GOOS == "darwin" || GOOS == "aix" {
osStack = true
}
// m退出
mexit(osStack)
}
func mstart1() {
_g_ := getg()
if _g_ != _g_.m.g0 {//调用这个函数只,m上只可能有g0
throw("bad runtime·mstart")
}
//初始化m
asminit()
minit()
//执行mspinning、sysmon等
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
//_g_所在的m不是m0,则关联p和m
if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
schedule()//开始调度
}
1.2 创建协程

在函数前加go关键字创建一个协程,其实是调用newproc函数,fn就是go关键字后面函数地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
gp := getg()
pc := getcallerpc()
systemstack(func() {//通过g0调用newproc1
newproc1(fn, (*uint8)(argp), siz, gp, pc)
})
}
func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
_g_ := getg()
if fn == nil {
_g_.m.throwing = -1 // do not dump full stacks
throw("go of nil func value")
}
acquirem()
siz := narg
siz = (siz + 7) &^ 7

_p_ := _g_.m.p.ptr()
newg := gfget(_p_) //从p的gFree中获取一个g(g运行结束后,会清理数据,放入p的gFree队列中以便复用)
if newg == nil {
newg = malg(_StackMin) //new一个栈大小为2k的g,并把状态设为_Gdead
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) //把g加入到allgs列表,因为状态为Gdead,所以GC不会扫描
}
//计算参数栈的起始位置
totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize
totalSize += -totalSize & (sys.SpAlign - 1)
sp := newg.stack.hi - totalSize
spArg := sp
if usesLR {
*(*uintptr)(unsafe.Pointer(sp)) = 0
prepGoExitFrame(sp)
spArg += sys.MinFrameSize
}
if narg > 0 { //把参数拷贝到g的栈上
memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
}
//清理运行时信息(g.sched)
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
//g成员初始值,
newg.sched.sp = sp
newg.stktopsp = sp
//设置g的退出函数为goexit,g结束后m会调用goexit清理g,并重新调用schedule
newg.sched.pc = funcPC(goexit) + sys.PCQuantum
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels
}
if isSystemGoroutine(newg, false) {
atomic.Xadd(&sched.ngsys, +1)
}
newg.gcscanvalid = false
//g状态改为 _Grunnable
casgstatus(newg, _Gdead, _Grunnable)
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
//runqput会优先放入p本地队列,如果本地队列已满,则把本地队列挪一半到全局队列
runqput(_p_, newg, true)
//如果有p处于idle状态、没有m处于自旋转、主线程已启动则偿试唤醒一个p
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
wakep()
}
releasem(_g_.m)
}
1.3 调度流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
//需要注意的是:schedule函数及子函数中调用的getg()返回的都是g0,因为schedule是运行在g0上的
// 取得并执行goroutine
func schedule() {
_g_ := getg() //获取当前g
var gp *g
if gp == nil {
// 为了保证公平调度,schedule每执行61次就会去全局队列拿一批g到p的本地队列
// 否则可能出现2个g永久占用本地队列(因为被暂停的goroutine唤醒后优先放入本地队列)
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
//从本地队列中取g
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
//如果本地队列中没有可执行的g,则调用findrunnable直到有可运行的g为止
if gp == nil {
gp, inheritTime = findrunnable()
}
// 到这里,说明已经找到可运行的g,如果m还处于自旋转状态,则置回正常状态
// 并唤醒p与之绑定
if _g_.m.spinning {
resetspinning()
}
//直接在当前m上执行g
execute(gp, inheritTime)
}

//阻塞获取可执行的G,findrunnable会从全局队列、其它P队列、netpoll中去轮询
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
// 本地队列中有g,则直接返回
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// 全局队列中如果有g,则从全局队列中取一批到本地队列
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 从netpoll中获取g(非阻塞轮询已经完成的网络io)
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(false); !list.empty() {//从netpoll中取出i/o读写完成的g列表
gp := list.pop()//先取出一个让当前m执行,这样能提高响应速度
injectglist(&list)//再把剩余的g列表放入队列
casgstatus(gp, _Gwaiting, _Grunnable)
return gp, false
}
}
// 检查是否可以从其它p中偷一部分g
procs := uint32(gomaxprocs)
if atomic.Load(&sched.npidle) == procs-1 {
//如果其它p全部都是idle的,那肯定没地方偷
goto stop
}
//如果当前m没有处于自旋转且自旋转中的p数量 < running中的p数量/2,则让当前m进入自旋转
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}

//随机从一个p中偷,最多偿试4次
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
stop:
allpSnapshot := allp
//再次检查全局队列,如果有g,则取出执行,否则把p与当前m解绑
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
if releasep() != _p_ { //p与当前m解绑
throw("findrunnable: wrong p")
}
pidleput(_p_) //解绑的p放入idle队列
unlock(&sched.lock)

wasSpinning := _g_.m.spinning
if _g_.m.spinning {//如果当前m还处于自旋转状态,则取消,sched.nmspinning -1
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}

// 再次检查所有p的队列
for _, _p_ := range allpSnapshot {
if !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget() //如果某个p的队列不为空,则从idle列表中取出一个p
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {//如果解绑以前m是自旋转的,则还是让它保持自旋转
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}
// 再次从netpoll中阻塞的取g
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
if _g_.m.p != 0 {
throw("findrunnable: netpoll with p")
}
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
list := netpoll(true)
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if !list.empty() {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
injectglist(&list)
}
}
//如果始终找到不,就让m停止
stopm()
goto top
}
//把g列表放入全局队列,调用startm检测是否有idle的p,将idle的m与之绑定或new一个m
func injectglist(glist *gList) {
if glist.empty() {
return
}
if trace.enabled {
for gp := glist.head.ptr(); gp != nil; gp = gp.schedlink.ptr() {
traceGoUnpark(gp, 0)
}
}
lock(&sched.lock)
var n int
for n = 0; !glist.empty(); n++ {
gp := glist.pop()
//队列中的g状态必须为_Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)
globrunqput(gp)
}
unlock(&sched.lock)
for ; n != 0 && sched.npidle != 0; n-- {
startm(nil, false)
}
*glist = gList{}
}
//重置自旋转状态
func resetspinning() {
_g_ := getg()
if !_g_.m.spinning {
throw("resetspinning: not a spinning m")
}
_g_.m.spinning = false
nmspinning := atomic.Xadd(&sched.nmspinning, -1)
if int32(nmspinning) < 0 {
throw("findrunnable: negative nmspinning")
}
//除当前m外没有其它m处于自旋转状态且还有p处于idle,则唤醒一个p,这样能让等执行的g尽量早的被处理。
if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 {
wakep()
}
}
func wakep() {
// 如果已有m处于自旋转状态,则直接返回(g一定会被处于自旋转状态的m执行[结合findrunable函数看])
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}
//startm检测是否有idle的p,并将idle的m与p绑定或new一个m
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
if _p_ == nil {
_p_ = pidleget()//获取一个idle的p
if _p_ == nil {
unlock(&sched.lock)
if spinning {
// spinning为true说明startm的调用方对nmspinning加了1,但是没发现idle的p,所以要回滚nmspinning
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
return
}
}
//成功获取到p, 再获取idle的m
mp := mget()
unlock(&sched.lock)
if mp == nil {//获取idle的m失败,则创建一个m
var fn func()
if spinning {
// 如果spinning为true, 则标记新创建的m为spinning
fn = mspinning
}
newm(fn, _p_)//创建新m
return
}
mp.spinning = spinning
mp.nextp.set(_p_) // 把p设置为即将与m绑定的p
notewakeup(&mp.park)
}
func newm(fn func(), _p_ *p) {
mp := allocm(_p_, fn) //new一个m结构,并初始化
mp.nextp.set(_p_)//把p设置为即将与m绑定的p
mp.sigmask = initSigmask
newm1(mp)
}
func newm1(mp *m) {
execLock.rlock()
newosproc(mp)
execLock.runlock()
}
//newosproc创建OS线程,不同的OS接口不一样,linux用的clone, windows为_CreateThread
func newosproc(mp *m) {
stk := unsafe.Pointer(mp.g0.stack.hi)
// clone期间禁用信号,clone完成再启用
var oset sigset
sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
//调用clone创建os线程, mstart为线程起始函数
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
sigprocmask(_SIG_SETMASK, &oset, nil)
}
1.4 sysmon

sysmon(system monitor简称)监控runtime,它的任务包括触发强制gc、netpool、暂停运行时间过长的goroutine。如果M陷入长时间的系统调用、获取锁,则会把当前P与M解绑,若P队列有G则会唤醒一个idle的M或创建一个新的M与之绑定;若P队列没有G,则把M放入idle队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
func sysmon() {
lasttrace := int64(0)
idle := 0
delay := uint32(0)
for {
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay)//sysmon每个周期会睡眠20us~10ms
// 每10nm检查一次网络io中是否有准备好的g, 并把它们放入全局队列中
lastpoll := int64(atomic.Load64(&sched.lastpoll))
now := nanotime()
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(false) // non-blocking - returns list of goroutines
if !list.empty() {
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
}
}
// 抢占阻塞在系统调用中的p和长时间运行的g
if retake(now) != 0 {
idle = 0
} else {
idle++
}
}
}
func retake(now int64) uint32 {
n := 0
lock(&allpLock)
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
continue
}
pd := &_p_.sysmontick
s := _p_.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// 如果g运行时间太长(10ms),则抢占
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
preemptone(_p_)//告诉_p_的当前g停止运行
sysretake = true
}
}
if s == _Psyscall {
// 抢占阻塞在系统调用上的p,如果阻塞时间超过1个sysmon tick周期,则抢占
t := int64(_p_.syscalltick)
if !sysretake && int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
//一方面,如果没有其它的g可以执行,我们不希望抢占p
//另一方面,因为抢占能阻止sysmon线程进入深度睡眠,我们还是希望抢占它
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
}
unlock(&allpLock)
return uint32(n)
}

五、基于信号抢占调度

golang1.14以前采用的是基于协作式抢占调度,抢占g时把stackguard0设置为stackPreempt。在goroutine中调用函数时触发newstack并检测stackPreempt标记以实现抢占。假如g1是一个长时间纯数值计算的协程或无函数调用的for循环,在这期间,g1不会被抢占。如果此时触发了gc, 整个进程除了g1以外,其它goroutine都被gc停止, 直到g1执行完成。
golang1.14加入了基于信号抢占调度,通过向g所在的M发送信号来实现抢占,这可以避免g中无函数调用时不能抢占成功的情况,这对于gc来说尤其重要。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
...
// 请求异步抢占m的当前g,不是所有os都支持信号抢占,这里通过preemptMSupported标识
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
return true
}
func preemptM(mp *m) {
if !pushCallSupported {
//如果当前架构不支持ctxt.pushCall,那么doSigPreempt也不可能被调用
return
}
if atomic.Cas(&mp.signalPending, 0, 1) {
//确保多线程调用只有一个能成功
//发送信号
signalM(mp, sigPreempt)
}
}
//收到信号时,OS会回调此函数
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
if sig == sigPreempt {
// 如果是抢占信号
doSigPreempt(gp, c)
}
}

func doSigPreempt(gp *g, ctxt *sigctxt) {
//如果要抢占、检测是否安全
if wantAsyncPreempt(gp) && isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()) {
//asyncPreempt会保存保存上下文,调用asyncPreempt2后,再恢复上下文
ctxt.pushCall(funcPC(asyncPreempt))
}
//记录抢占
atomic.Xadd(&gp.m.preemptGen, 1)
atomic.Store(&gp.m.signalPending, 0)
}
func asyncPreempt2() {
gp := getg()
gp.asyncSafePoint = true
if gp.preemptStop {
mcall(preemptPark)//切换到g0栈执行暂停任务
} else {
mcall(gopreempt_m)
}
gp.asyncSafePoint = false
}
// 暂停gp并置为_Gpreempted.
func preemptPark(gp *g) {
if trace.enabled {
traceGoPark(traceEvGoBlock, 0)
}
status := readgstatus(gp)
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
gp.waitreason = waitReasonPreempted
//在dropg之前不能是runing状态
casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted)
dropg()//g与m解绑,g被暂停
casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted)
schedule()//重新进入调度流程为m寻找可执行的g
}

参考
https://mp.weixin.qq.com/s/gTb9p0WpJ37M5_k9e6xUiQ
https://wudaijun.com/2018/01/go-scheduler/
https://zboya.github.io/post/go_scheduler/
https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-goroutine/
https://changkun.de/golang/zh-cn/part2runtime/ch06sched/