Golang Timer 源码探索及常见问题

最近使用golang timer时写出了bug, 为了避免再出现类似问题以及对goalng timer实现细节的好奇,阅读了一遍timer的源码。golang timer使用的是时间堆算法。

一、 时间堆算法原理

时间堆算法的底层数据结构是基于数组的小顶堆,任意一个节点的值比其孩子节点值小,堆中最小元素总是位于根节点。用于定时器的堆一般为2叉堆或4叉堆。Golang Timer使用的是桶(Bucket)+4叉堆,每个桶包含1个4叉小顶堆。

  • 节点访问
    • 堆是一棵完全二叉树(N叉树),因此可以很方便的用二维数组来表示,在数组起始位置为0的4叉堆中:
    • 任意节点i的父节点为:p := (i - 1) / 4
    • 任意节点i的左孩子点为:lc := i*4 + 1
  • tick创建
    • 通过NewTickerNewTimer 等函数创建timer
    • 根据当前G所在的P为timer分配一个桶(Bucket)
    • 把当前timer加到桶中4叉堆的最后位置
    • 向上调整timer在4叉堆的位置。(新timer加入后,可能会破坏小顶堆特性,所以需要调整。因为是加到的最后,所以只需要向上调整,直到当前堆满足小顶堆即可。)
  • tick删除
    • 取出timer所在的桶(Bucket) 。
    • 从桶(Bucket)的堆中删除timer,并调整堆。
    • 被删除的timer可能在堆的任意位置,删除后会破坏小顶堆的特性,因此需要向上和向下调整。
  • tick触发

    当前桶首次创建timer时,会创建一个goroutine(timerproc),用于检查堆顶的tick是否可触发。当堆中没有timer时, timerproc会一直睡眠,直到创建新timer时,调用了notewakeup函数唤醒。

golang timer的原理很简单,但因其并发特性,在使用上带来了很多问题,如果对其原理和接口理解不清晰,姿势稍有不当,就会写出bug。比如timerproc已经把timer从桶(Bucket)中删除但还没有调用sendTime函数,其它goroutine调用StopRest。在select中使直接或间接调用NewTimer 等。 在文章最后会介绍这些问题以及如何避免。

二、源码解读

源码版本(go version go1.12.7)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
//tick.go
func NewTicker(d Duration) *Ticker {
c := make(chan Time, 1) //当timer触发,会向c中写入当前时间(触发时间)
t := &Ticker{
C: c,
r: runtimeTimer{
when: when(d), //触发时间
period: int64(d), //触发间隔
f: sendTime, //timer触发时调用此函数向c中发信号
arg: c, //sendtime的参数,触发时sendTime向c中发信号
},
}
startTimer(&t.r)
return t
}

//sleep.go
//和NewTicker相比,唯一不同的是,没有period,不会重复执行。

func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}

//timer触发时,调用函数f
func AfterFunc(d Duration, f func()) *Timer {
t := &Timer{
r: runtimeTimer{
when: when(d),
f: goFunc, //触发时调用的函数
arg: f, //goFunc参数, 也就是用户指定调用的函数
},
}
startTimer(&t.r)
return t
}
func goFunc(arg interface{}, seq uintptr) {
go arg.(func())() //AfterFunc触发时,会调用此函数,并创建一个协程调用args(AfterFunc中的f)
}

//runtime\time.go
func startTimer(t *timer) {
addtimer(t)
}
func addtimer(t *timer) {
tb := t.assignBucket() //为新timer分配桶
lock(&tb.lock)
ok := tb.addtimerLocked(t) //加入时间堆中,并调整
unlock(&tb.lock)
if !ok {
badTimer()
}
}
func (tb *timersBucket) addtimerLocked(t *timer) bool {
t.i = len(tb.t)
tb.t = append(tb.t, t) //把新timer加入到堆末尾(堆是完全N叉树,通常用数组表示)
if !siftupTimer(tb.t, t.i) { //调整堆为小顶堆
return false
}
if t.i == 0 {
// 如果timerproc在sleep,且醒来时间>当前timer的执行时间,则唤醒
// 否则可以让它继续睡(立即醒来也没事做,让它睡到自然醒,再去轮询when是否到来)
if tb.sleeping && tb.sleepUntil > t.when {
tb.sleeping = false
notewakeup(&tb.waitnote)
}
if tb.rescheduling {
tb.rescheduling = false
goready(tb.gp, 0)
}
if !tb.created {//如果timerproc协程未被创建,则创建
tb.created = true
go timerproc(tb)
}
}
return true
}
//向堆顶调整
func siftupTimer(t []*timer, i int) bool {
if i >= len(t) {
return false
}
when := t[i].when
tmp := t[i]
for i > 0 {
p := (i - 1) / 4 // parent
if when >= t[p].when {
break
}
t[i] = t[p]
t[i].i = i
i = p
}
if tmp != t[i] {
t[i] = tmp
t[i].i = i
}
return true
}
//此函数是timer的核心,检查堆顶timer是否到期,如果到期timer是个ticker则计算下次到期时间,向下调整堆。
//如果不是ticker则从堆中删除当前timer,并通过sendTime或goFunc,向timer的channel写入当前时间或调用用户指定函数
func timerproc(tb *timersBucket) {
tb.gp = getg()
for {
lock(&tb.lock)
tb.sleeping = false
now := nanotime()
delta := int64(-1)
for {
if len(tb.t) == 0 {
delta = -1
break
}
t := tb.t[0]
delta = t.when - now
if delta > 0 { // delta > 0 堆顶元素还不能执行,<=0 堆顶元素可执行
break
}
ok := true
if t.period > 0 { //period>0说明是ticker,计算下次执行时间,并调整堆(下次执行的when一定>当前时间,所以只需要向下调整siftdownTimer)
// leave in heap but adjust next time to fire
t.when += t.period * (1 + -delta/t.period)
if !siftdownTimer(tb.t, 0) {
ok = false
}
} else {
// 把当前timer从堆顶移除(与最后一个timer交换后再删除),并调整堆(因为是跟最后一个交换,所以只需要调用siftdownTimer向下调整即可)
last := len(tb.t) - 1
if last > 0 {
tb.t[0] = tb.t[last]
tb.t[0].i = 0
}
tb.t[last] = nil
tb.t = tb.t[:last]
if last > 0 {
if !siftdownTimer(tb.t, 0) {
ok = false
}
}
t.i = -1 // mark as removed
}
f := t.f
arg := t.arg
seq := t.seq
unlock(&tb.lock)
if !ok {
badTimer()
}
if raceenabled {
raceacquire(unsafe.Pointer(t))
}
f(arg, seq)
lock(&tb.lock)
}
if delta < 0 || faketime > 0 {
// No timers left - put goroutine to sleep.
tb.rescheduling = true
goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)
continue
}
//如果堆为空(没有timer),则让当前goroutine睡眠,直到加入新timer addtimerLocked函数中唤醒
// At least one timer pending. Sleep until then.
tb.sleeping = true
tb.sleepUntil = now + delta
noteclear(&tb.waitnote)
unlock(&tb.lock)
notetsleepg(&tb.waitnote, delta)
}
}

//runtime\time.go
//删除定时器
func stopTimer(t *timer) bool {
return deltimer(t)
}
func deltimer(t *timer) bool {
if t.tb == nil {
return false
}
tb := t.tb //定时器所在的桶
lock(&tb.lock)
removed, ok := tb.deltimerLocked(t) //从堆中删除定时器
unlock(&tb.lock)
if !ok {
badTimer()
}
return removed
}
func (tb *timersBucket) deltimerLocked(t *timer) (removed, ok bool) {
i := t.i
last := len(tb.t) - 1
if i < 0 || i > last || tb.t[i] != t {
return false, true
}
if i != last {
tb.t[i] = tb.t[last]
tb.t[i].i = i
}
tb.t[last] = nil
tb.t = tb.t[:last]
ok = true
//直接删除定时器和定时到期时删除略有不同,到期时删除,把堆顶元素和最交换后,再删除,只需要向下调整。直接删除的元素可能是中间元素,所以要检查两个方向的调整。
if i != last {
if !siftupTimer(tb.t, i) {
ok = false
}
if !siftdownTimer(tb.t, i) {
ok = false
}
}
return true, ok
}
//重置定时器,把之前的timer从老的堆上删除,重置when,加入到新堆中。若之前的timer已经过期返回false, 没过期返回true。
//Reset只能在已经停止或已经过期且channel值已经排掉(drained)的timer上使用。
func (t *Timer) Reset(d Duration) bool {
if t.r.f == nil {
panic("time: Reset called on uninitialized Timer")
}
w := when(d)
active := stopTimer(&t.r)
t.r.when = w
startTimer(&t.r)
return active
}

三、使用Timer时常见的错误

1.Rest、Stop问题
1.1 timer过期,channel值没排掉

如果t已经过期,但channel值没排掉(drained),那么t.Reset(d)后,channel中的值依然存在。<-t.C取出的值是上个timer过期值,显然不符合我们的期待。

1
2
t.Reset(d)
<-t.C

如果Reset前判断timer是否已经stop,并手动排掉,问题是不是就解决了呢? 值没排掉的问题解决了,但会有新问题。

1
2
3
4
if !t.Stop() { //code1
<-t.C
}
t.Reset(d)

如果多个goroutine中调用<-t.C时,上面那段代码会有问题,可能会一直阻塞在<-t.C。 如果加个判断len(t.C),channel中有值才执行<-t.C,还会有问题吗?(一样会有多线程的问题,if语句和<-t.C并非原子操作)

1
2
3
4
if !t.Stop() && len(t.C) > 0 {//code2
<-t.C
}
t.Reset(d)
1.2 并发带来的问题

上面的例子还隐藏着一个巨大的问题。我们回顾一下timerproc的源码

1
2
3
4
5
f := t.f
arg := t.arg
seq := t.seq
unlock(&tb.lock) //解锁
f(arg, seq) //如果是AfterFun,f为goFun; 如果是NewTimer或NewTicker则f为sendTime(向channel中写值)

如代码所示,解锁后,才调用sendTime往channel中写值,这点非常关键。它会导致一种很微妙的情况。如下:
timerproc中,某个timer到期,已经执行了从桶中删除timer,但还没往timer的channel写数据,此时另一个goroutine 执行了code2所在的整个代码块(t.Stop()返回false,这只能说明timer从桶中删除了,它可能没向c中发信号, len(t.C)返回0,执行t.Reset(d))。这段代码执行后,在某个地方调用了 <-t.C 会立即取到值,但并不是我们所期待的值,而是上一个timer没有排掉的过期值。
上面这些情况如何避免呢?

  • 用一个变量记录是否已经从channel中排掉值
  • 同一个timer的stop、reset、drained(排值)操作在一个goroutine中执行
2.在select case中直接或间接调用NewTimer

在下面例子的中case2永远不会执行,case1每次执行后,case1, case2都会重新new一个timer,而case1, case2会在新创建的timer channel上监听。如此往复

1
2
3
4
5
6
7
8
9
func GO_func() {
for {
case <-time.After(Interval): //case1
fmt.Println("1")
case <-time.After(Interval*2): //case2
fmt.Println("2")
}
}
}

正确姿势:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func GO_func() {
timer1 := time.NewTimer(Interval)
timer2 := time.NewTimer(Interval * 2)
for {
select {
case <-timer1.C:
fmt.Println("1")
timer1.Reset(Interval)
case <-timer2.C:
fmt.Println("2")
timer2.Reset(Interval * 2)
}
}
}
3.没考虑channel被关闭的情况
1
2
3
4
5
6
7
8
9
func GO_func() {
for {
case <-time.After(Interval): //case1
fmt.Println("1")
case <-playerChannel: //case2
fmt.Println("2")
}
}
}

playerChannel被关闭后,远永可以从playerChannel中取出值(false), 所以会一直执行case2, 而且每次case1都会创新timer并在其channel上监听。

4.for select中break问题

在select中break不加标签,只能跳出select不能跳出for

1
2
3
4
5
6
7
for {
select {
case <-time.After(time.Duration(1)*time.Second):
fmt.Println("1")
break
}
}

如果要跳出for,可以用break加标签,当然goto、return也可以。

1
2
3
4
5
6
7
loop:
for {
select {
case <-time.After(time.Duration(1)*time.Second):
break loop
}
}