Golang Sync.Map 源码分析

Golang中存在2种map, 一种是常规的非线程安全map,另一种是线程安全的sync.map。在golang1.9以前,我们在项目中是通过数组、map、sync.RWMutex来实现普通map的并发读写(采用map数组,把key hash到相应的map,每个map单独加锁以降低锁的粒度),那么golang sync.map是如何实现线程安全的呢?

一、原理

通过2个map(read、dirty)来实现读写分离,read、dirty 指向同一个entity。

  • 读数据(Load):首先从read map中读(无锁),如果不存在,则到dirty map中读(加锁)。
  • 存数据(Store):首先查找数据是否在read map中,如果在则更新entity.p(无锁);否则在dirty map中查找,如果存在则更新,不存在则新建key(有锁)。
  • 删除数据(Delete):如果数据在read map中找到,则直接把entity.p置为nil(无锁);如果read map中不存在则在dirty map中查找,若找到则直接删除键值对(有锁)。之所以read map只置为nil:1.因为read map是无锁访问的,不能直接删除键值对;2.为了延迟删除
  • dirty map提拔为read map
    在read map中访问数据时,当miss次数>=len(m.dirty)时,会把dirty map提拔为read map(直接修改read map的指针指向dirty map), 并把dirty map置为nil。
  • 延迟删除
    删除数据时,如果在read map中找到,并不会直接删除键值对;而是把value指向的entity.p置为nil,并在下次构建dirty map时把值为nil的p设为expunged。若read map中值为expunged的键值对若在下次提拔dity map时还没有写入数据,则会在被彻底删除。(因为提拔的方式是dity直接覆盖read,dity中没有,新的read中也不会有)

从上面可以看出:操作read map中的数据,无论是读(Load)、存(Store, 如果key在read map中,则直接更新entity,否则转为修改dirty)、删除(Delete,同Store)都不需要加锁(通过CAS(自旋转锁)实现 参考),但对dirty map的所有操作都是加锁的。下面结合源码来看它是如何实现的。

二、源码

1.1 sync.map结构体定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//go1.12.7
type Map struct {
mu Mutex //主要用于对dirty map的操作
read atomic.Value // read map, 由于map是一个结构体,对read变量并发读写需要借助互斥锁或atomic.Value确保安全
dirty map[interface{}]*entry //dirty map
misses int //操作数据时,若read map中不存在则misses+1,当miss次数>=len(m.dirty)时,会把dirty map提拔为read map
}

type readOnly struct {
m map[interface{}]*entry
amended bool //如果dirty map中包含read map中不存在的数据,则为true
}

type entry struct {
p unsafe.Pointer // 指向具体数据
}
1.2 关键函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
//如果存在key,则返回对应的value, 如果不存在返回nil, ok表示是否找到值
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
//在read map中找到该key,则直接返回value;
//如果不存在且dirty map中存在read map中没有的数据(amended==true),则去dirty map中找
if !ok && read.amended {
m.mu.Lock()
//sync.map操作是并发的,在加锁前dirty map可能已经被提拔为read map,所以加锁后再重新检测一次
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// 不管dirty map中是否存在key,都记1次miss
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}

func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
//当misses次数>=len(m.dirty),提拔dirty为read, 并把dirty置为nil, misses归0
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}

func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p) //取key对应entry的实际值
if p == nil || p == expunged {//如果指向值为nil或expunged返回nil
return nil, false
}
return *(*interface{})(p), true
}

//设置key的value值
func (m *Map) Store(key, value interface{}) {
//如果key在read map中且不为expunged,则直接更新
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}

m.mu.Lock()
//如果read map中不存在,则在dirty中查找。在加锁前dirty map可能已经被提拔为read map
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() { //e.p为expunged说明read map中存在,dirty map中不存在
//如果e.p为expunged,只需要在dirty map中增加键值对,更新entity.p的指向即可(read map, dirty map的value指向同一个entity)
m.dirty[key] = e
}
e.storeLocked(&value)//更新entity.p指向value地址
} else if e, ok := m.dirty[key]; ok {//如果read map中不存在,dirty map存在,则直接更新entity.p
e.storeLocked(&value)
} else {//read map, dirty map中都不存在该key
if !read.amended {
//提拔后首次往dirty map中加入新的key值时,拷贝read map中entity.p不为nil的值到dirty map,
//并把为nil的entity.p置为expunged(见dirtyLocked())
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}

func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}

read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
// 删除指定key
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended { //跟Load, Store一样,双重检测
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
//如果read map中不存在key,则在直接在dirty map中删除,
//在dirty map中存不存在不重要,存在则删除,不存在执行delete也没问题。
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()//把entity.p置为nil
}
}

func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}

//遍历sync.map
func (m *Map) Range(f func(key, value interface{}) bool) {
//如果read.amended为false直接遍历read map即可,否则提拔dity map为read map后再遍历read map
read, _ := m.read.Load().(readOnly)
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}

for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
在源码中,entity.p的值容易引起混淆:
entity.p == nil 时:值已经被删除(可重新设置),会在提拔dirty map后的首次新增key时调用dirtyLocked把entity.p置为expunged
entity.p == expunged 时:不可直接更新entity.p值(因为dirty中已经删除映射),
必须先把它设为nil,并在dirty map中加入映射,再更新entity.p为新value。

expunged的作用其实就是为了说明,虽然read map中键值对还存在,但entity已经没指向任何数据了且该键值地在dirty map中不存在,
如果要往read map中的此键值对中写数据,必须先在dirty map中增键值对,并更新entity.p的指向。这样做的目的是延迟删除

三、适用场景

从源码可以看出sync.map的优点和缺点都很明显:

  • 优点:
    1)并发读时,若数据存在read map中,则不需要加锁。
    2)并发写时,若只更新read map中entry指向的值,不需要加锁
  • 缺点:
    频繁访问read map中不存在的值时(不管在dirty map中是否存在),不但要加锁而且还会加快dirty map提拔为read map,并在下次构建dirty map时拷贝read map中的非空值。
    这种场景可以用数组+map+mutex的方式来代替。例如