Go语言高并发Map解决方案
Go语言基础库中的map不是并发安全的,不过基于读写锁可以实现线程安全;不过在Go1.9版本时,官方提供了sync.Map的线程安全Map
读写锁实现安全map
package main
import (
"fmt"
"sync"
)
type SafeMap struct {
data map[int]string
mutex sync.Mutex
}
func (m *SafeMap) Get(key int) string {
m.mutex.Lock()
defer m.mutex.Unlock()
if v, ok := m.data[key]; ok {
return v
}
return ""
}
func (m *SafeMap) Put(key int, value string) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.data[key] = value
}
func main() {
m := SafeMap{
data : make(map[int]string),
}
m.Put(1, "1")
fmt.Println(m.Get(1))
}
这种实现的map适用于读多写少的情况,一般场景下足以满足
sync.Map
示例代码
package main
import (
"fmt"
"sync"
)
func main() {
m := sync.Map{}
m.Store(1, "1")
fmt.Println(m.Load(1))
}
sync.Map的借口和普通的Map是不一样的
sync.Map的原理
读写分离
type readOnly struct {
m map[interface{}]*entry
amended bool // 如果dirty和read数据不一致则此字段为true
}
type Map struct {
mu Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}
- 写操作直接写入
dirty
中去 - 读操作则是优先从
read
中读取,如果读取不到再到dirty
中读取;对read
的读取操作时并发安全的,修改等操作也全部是原子性操作 - 在查询穿透达到一定数量超过
dirty
的长度时用dirty
的数据替代read
添加操作
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// The entry was previously expunged, which implies that there is a
// non-nil dirty map and this entry is not in it.
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
e.storeLocked(&value)
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
- 如果数据在
read
中没有,则仅在dirty
中添加、修改数据 - 如果数据在
read
中有,那么read
和dirty
的数据一起修改
删除操作
读写分离的操作可以解决写入、读取的冲突问题,以上的机制是处理不了删除的
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
// Regardless of whether the entry was present, record a miss: this key
// will take the slow path until the dirty map is promoted to the read
// map.
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}
func (e *entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*interface{})(p), true
}
}
}
在进行删除操作时并未直接从map
中删除而是通过标记的方式删除,即将read
值设置为nil
来实现
//统计查询失败次数
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
//存储数据时重新生成dirty
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
在没次查询次数超过限制时就会将dirty
提升为read
,但是却把dirty
设置为nil
在存储、修改数据时会再次从read
中复制一份数据为dirty
这个时候才是真正的将数据删除掉了
总结
对于sync.Map的代码细节就不过多分析了,内部有很多双重功能判断需要结合业务来分析;这个读写锁更加的高级,但是实际上面对写并发较多的情况下依然存在不足。
分段锁Map
并发Map更常见的解决方案是:分段锁
分段锁Map的实现思路
创建多个Map,对于读写操作的key先进行一次HASH操作,将对此key的操作放到都放到一个Map上,这样读写操作的锁只需要针对这一个Map来做,不同Map之间互不影响。
示例代码
package concurrentmap
import "sync"
type shard struct {
data map[string]interface{}
mutex sync.Mutex
}
type Map struct {
table []*shard
count uint32
}
//可以额外实现指定count和hash函数的接口
func NewMap() *Map {
m := Map {}
var count uint32 = 32
for i := uint32(0); i < count; i++ {
m.table = append(m.table, &shard{
data : make(map[string]interface{}),
})
}
m.count = count
return &m
}
const prime32 = uint32(16777619)
func fnv32(key string) uint32 {
hash := uint32(2166136261)
for i := 0; i < len(key); i++ {
hash *= prime32
hash ^= uint32(key[i])
}
return hash
}
func (m *Map) getShared(u uint32) *shard {
return m.table[int(u % m.count)]
}
func (m *Map) Put(key string, value interface{}) {
shard := m.getShared(fnv32(key))
shard.mutex.Lock()
defer shard.mutex.Unlock()
shard.data[key] = value
}
func (m *Map) Get(key string) (interface{}, bool) {
shard := m.getShared(fnv32(key))
shard.mutex.Lock()
if v, ok := shard.data[key]; ok {
return v, true
}
return nil, false
}
网友评论