Golang 互斥锁、读写锁实现

一、相关概念

  • 互斥锁(mutex):获取锁失败时,线程会被休眠让出cpu。当锁被其它线程释放后,阻塞线程被内核唤醒继续执行。这里会产生2次上下文切换。
  • 自旋转锁(cas): 获取锁失败时,线程不会被休眠,而是调用pause指令消耗cpu,直到获取成功。因为线程没被暂停所以不会有上下文切换,但由于需要调用pause指令等待锁被释放,所以在获得锁以前会一直消耗cpu。
  • 读写锁:读写锁可以基于互斥锁或自旋转锁来实现,任意时刻支持多个线程读或者一个线程写。

适合场景:如果能确定获取锁后会快速释放,优先使用自旋转锁;如果不确定锁的释放时间或需要者长时间占用锁则使用互斥锁;如果读多写少可以考虑读写锁。

二、互斥锁Mutex

互斥锁的基本思想就是自旋转等待锁释放,拿到则返回;没拿到则进入队列等待,等久了则切换为饥饿模式;如果是饥饿模式,unlock会直接把锁交给队首饥饿的gotoutine。

1.1 Mutex普通模式和饥饿模式的相互切换
  • 普通模式:需要拿锁的goroutine在队列(FIFO)中排队等待,被唤醒的goroutine会和新到来的goroutine竞争mutex。新到来的goroutine因为已经在CPU上运行且数量可能很多,所以获得锁的机会明显要比唤醒的goroutine大。在这种情况下被唤醒的goroutine会插入到等待队列前面,如果有goroutine等待时间超过1ms,就会把mutex切换成饥饿模式。
  • 饥饿模式:如果处于饥饿模式,unlock释放mutex后,会直接交给等待队列中的第1个goroutine,新到达的goroutine会到队列尾部等待,不会偿试获取mutex或自旋转。
  • 饥饿模式转普通模式:如果当前取得mutex的goroutine是队列中的最后一个,或者它的等待时间<1ms,则把mutex置为普通模式。在普通模式下,取得mutex即使需要偿试多次,甚至可能有goroutine阻塞在队列中,但它依然拥有很好的性能,饥饿模式则可以有效的防止尾部延迟。
1.2 结构定义
1
2
3
4
5
6
7
8
9
10
11
type Mutex struct {
state int32 //32位中,第1位表示是否锁定,第2位表示是否唤醒,第3位表示是否处于饥饿模式,第4位以后表示等待获取锁的goroutine数量
sema uint32 //信号量,用于锁释放后唤醒下一个goroutine
}
const (
mutexLocked = 1 << iota // 锁定状态位
mutexWoken // 唤醒状态位
mutexStarving // 饥饿状态位
mutexWaiterShift = iota // 第4位开始表示等待数
starvationThresholdNs = 1e6 //当队首goroutine等待时间> starvationThresholdNs表示饥饿
)
1.3 核心代码解释
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//获取互斥锁
func (m *Mutex) Lock() {
// 如果mutex当前状态为空闲,则立即获得锁 (Lock为内联函数)
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
m.lockSlow()
}

func (m *Mutex) lockSlow() {
var waitStartTime int64 //当前goroutine等待时间,若 > starvationThresholdNs则把state设为饥饿
starving := false //是否饥饿
awoke := false //当前gotoutine是否为唤醒
iter := 0 //自旋转次数
old := m.state //缓存锁mutex的当前状态
for {
//若old为非饥饿状态(饥饿状态自旋转没意义),且锁已经被获取,则自旋转的等待锁释放(否则只能进入队列,进入队列后可能是个漫长的等待过程)
//runtime_canSpin判断是否能自旋转的依据是:1.自旋转次数<active_spin,2.GOMAXPROCS>1 3.running P>1 4.当前P的队列为空
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {//若mutex当前状态为锁定状态且不为饥饿状态且满足自旋转条件
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&//当前g和mutex为非唤醒状态,且排队的g数量>0,则尝把当前g和mutex设为唤醒
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {//把mutex唤醒状态置为1的目的是通知Unlock函数,解锁后不需唤醒队列中的其它g
awoke = true
}
runtime_doSpin()
iter++
old = m.state //缓存最新的state信息
continue
}
new := old //new用于设置state的新状态

if old&mutexStarving == 0 {//old无饥饿状态则可以设置new的锁定状态为1
new |= mutexLocked
}

// 若old是锁定状态或饥饿状态不会偿试获取锁,而是进入队列排队
if old&(mutexLocked|mutexStarving) != 0 {
//队列中的等待数量+1,之所以左移3位,因为第4位以后表示等待数
new += 1 << mutexWaiterShift
}

//若当前g是饥饿的且当前锁已经被获取,则mutex饥饿状态置为1
//若当前g是饥饿的,说明g被唤醒时,等待时间已经>starvationThresholdNs, 所以需要在此次唤醒后把mutex的饥饿状态置为1
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
//清除new中的mutexWoken状态,假如下面的cas成功,新状态只可能是锁定和饥饿;若操作失败则返回for
new &^= mutexWoken
}

//这里可能多个g会竞争,把mutex的状态置为自己的new。如果g的starving为true,cas成功后mutex就成了饥饿模式
//如果获得锁成功则返回。否则被放回到队首,当锁被释放后,直接就得到。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//state即无饥饿状态也无锁定状态,只能是唤醒状态,所以成功获取锁,退出
if old&(mutexLocked|mutexStarving) == 0 {
break
}

//若state有锁定状态或饥饿状态,则获取锁会失败
queueLifo := waitStartTime != 0
if waitStartTime == 0 {//首次等待记录开始等待时间
waitStartTime = runtime_nanotime()
}

//queueLifo为true表示当前g是从队首唤醒的,失败了仍然把它放到队首;queueLifo为false表示当前g为新到的,则加入到队尾
runtime_SemacquireMutex(&m.sema, queueLifo, 1)//把g放入队列并阻塞等待信号唤醒

//g被唤醒后,判断如果等待时间>starvationThresholdNs,则starving为true,此时仅仅是当前g为饥饿,state可能是饥饿,也可能是非饥饿的
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

old = m.state
if old&mutexStarving != 0 {//如果stage是饥饿状态,则成功获取到锁(unlock会直接把锁交给唤醒的队首goroutine)
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}

//队列中的排队数量-1
delta := int32(mutexLocked - 1<<mutexWaiterShift)

//如果当前goutine不是饥饿状态或当前gotoutine是队列中的最后一个,则取消mutex的饥饿状态,并设为锁定状态(饥饿模式转普通模式)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving //delta=-11, stage=12, 相加结果为1(锁定模式)
}
atomic.AddInt32(&m.state, delta)
break//获得锁,退出
}
awoke = true //mutex为无饥饿状态,从队列中唤醒的g偿试跟其它g竞争锁
iter = 0 //重置自旋转次数
} else {
old = m.state
}
}
}

//释放互斥锁
func (m *Mutex) Unlock() {
//如果是单一的锁定状态,则直接释放成功(内联函数)
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}

func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {//普通模式
old := new
for {
//如果队列中没有等待者或锁已经被获取或[已经被唤醒]或处于饥饿模式时,无需唤醒任何goroutine
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
//等待队列-1,并唤醒队首的goroutine,通过m.sema信号量通知
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饥饿模式则直接把锁交给队首的等待者
runtime_Semrelease(&m.sema, true, 1)
}
}

三、读写锁RWMutex

1.1 基本思想

读写锁是对互斥锁Mutex的再次封装,读锁并没有调用系统层面的加锁,而是通过手动控制变量的方式来实现加锁和释放锁,所以获取释放读锁十分高效。获取写锁首先要获得互斥锁Mutex, 再通过把readerCount置为负数通知即将到来的读锁等待,若前面有读锁没释放完,则等待它们释放完的信号;若前面没有读锁,则直接获取到锁。写锁的释放时,如果有读锁在等待,则通过信号通知它们,此时读锁会立即获取到锁,再释放互斥锁。这里需要注意锁的释放顺序,若先释放互斥锁,则会违背读写锁的有序性。

1.2 成员及意义
1
2
3
4
5
6
7
8
9
type RWMutex struct {
w Mutex // 就是一个Mutex对象,在获取写锁时首先要w.Lock()成功
writerSem uint32 // 信号量,用于写锁等待读锁释放
readerSem uint32 // 信号量,用于读锁用于等待写锁释放
readerCount int32 // 已经获得读锁的数量,若 > 0 表示目前读锁被获取的数量; < 0 表示有写锁在等待
readerWait int32 // 等待读锁释放的等待者数量
}

const rwmutexMaxReaders = 1 << 30 //读锁最多被获取的数量
1.3 核心代码解释
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (rw *RWMutex) RLock() {
//获取读锁时,读者数量+1。 若readerCount+1后<0表示有写锁在等待或写锁已经被获取
//由于读锁和写锁是互斥的,所以只能等待
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
//等待写锁释放
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}

func (rw *RWMutex) RUnlock() {
//rw.readerCount - 1 < 0,表示有写锁在等待,rUnlockSlow检查是否唤醒写锁
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
rw.rUnlockSlow(r)
}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
//最后一个读锁释放时,唤醒写锁
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

func (rw *RWMutex) Lock() {
//先获取互斥锁(mutex)
rw.w.Lock()
//写锁会把rw.readerCount置为负,告诉其它读锁,你们需要等readerSem信号量
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 如果有读锁被获取,等待它们释放完,往writerSem发信号
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}

func (rw *RWMutex) Unlock() {
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
//通知所有阻塞的读锁,写锁已经释放
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 释放mutex
rw.w.Unlock()
}

四、使用锁需要注意

  1. 注意拿锁的顺序:在不同goroutine中获取或释放多把锁,我们应该保证,拿锁的顺序必须始终如一。假如玩家某个操作需要获取A,B,C锁,不管在哪个goroutine中,我们都必须保证同样的获取顺序,如:先获取A,再获取B,然后获取C。如果在另一段代码中不是按照这个顺序,那么当这两段代码在不同goroutine中同时被执行时,就容易产生死锁。

  2. 对临界资源的访问尽量通过函数参数实现,不要每个调用的地方都去lock,unlock, 这样即不方便,也容易出错。例如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func (m *Moto) withMacs(write bool, f func()) {
    if write {
    m.Lock()
    f()
    m.Unlock()
    } else {
    m.RLock()
    f()
    m.RUnlock()
    }
    }
  3. 尽量不要把锁拿到类外部使用,这样随着功能越来越复杂,开发人员越来越多,拿到类外部使用的锁很可能被滥用,甚至失控。