美文网首页
从sync.map看并发问题 2022-05-24

从sync.map看并发问题 2022-05-24

作者: 9_SooHyun | 来源:发表于2022-05-27 23:59 被阅读0次

1.一般意义下的并发问题

并发读写的问题,其实都出在写上。并发读一点问题都没有

并发读写2大问题
如果写是更新操作,可能导致数据脏不可靠
如果写涉及删除操作,可能导致内存访问异常

下面具体说明

并发更新的问题

问题就是数据因为无法保护而变得不可靠

并发更新场景下数据不可靠
// You can edit this code!
// Click here and start typing.
package main

import (
    "fmt"
    "sync"
)

func main() {
    i := 0
    var wg = &sync.WaitGroup{}
    for index := 0; index < 1000; index += 1 {
        wg.Add(1)
        go func() {
            defer wg.Done()
            i += 1
        }()
    }
    wg.Wait()
    fmt.Println(i)
}

假设并发1000个goroutine对同一个变量i进行+1操作,预期最后i是1000,但绝大多数情况下都不会是1000,而是一个小于1000的数。这是因为出现了多个goroutine在同时读出了i,然后+1,这n个+1操作重叠了,实际只加了一个1

如果我们加锁让+1操作串行,就可以得到预期的1000

// You can edit this code!
// Click here and start typing.
package main

import (
    "fmt"
    "sync"
)

func main() {
    i := 0
    var wg = &sync.WaitGroup{}
    var mu sync.Mutex
    for index := 0; index < 1000; index += 1 {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            i += 1
            mu.Unlock()
        }()
    }
    wg.Wait()
    fmt.Println(i)

}

小结:并发更新场景下数据不可靠,需要通过原子CAS或者加锁来保护

多读一写

既然并发写不行,那我只让一个协程写,多读一写式的并发可以吗?试一试

// You can edit this code!
// Click here and start typing.
package main

import (
    "fmt"
    "sync"
)

func main() {
    i := 0
    var wg = &sync.WaitGroup{}
    // 1000 goroutines reading
    for index := 0; index < 1000; index += 1 {
        wg.Add(1)
        go func() {
            defer wg.Done()
            fmt.Println(i)
        }()
    }
    // 1 goroutines writing
    for index := 0; index < 1000000; index += 1 {
        i += 1
    }
    wg.Wait()
    fmt.Println(i)

}

看起来是可以的
1000 goroutines打印的i,可能是0,可能是1000000,也可能是0-1000000间任何一个数。它们读到的值都是读时最新的

但是,上面代码的写操作仅仅是update操作,如果是delete操作将i从内存中释放,那么读i就必然发生内存异常。当然,golang不让开发者手动维护对象的生命周期,因此go在语言层面上可以保证上面的代码被正确执行,但如果是c/c++这些支持手动释放内存的语言,多读一写的并发是不可靠的

所以,严格来说,多读一更新,没问题;多读一(更新+删除),不可靠,有内存访问异常的风险

2.golang原生map并发读写

原生map不允许并发读写

为什么原生map不允许并发读写?
主要还是看对map的写操作。写map,涉及到增删改,

  • 并发更新场景下,数据不可靠,需要通过原子CAS或者加锁来保护。map没有锁保护,读写也不是原子操作
  • 多读一写场景下,map的写可以释放掉键值对,就类似c/c++手动释放变量的内存一样,负责读的goroutine有内存访问异常的风险

具体来讲,对map而言,写和读类似,都需要前置一个查找key的过程:
找到key了,然后读出value 或者 找到key了,然后写入value
也就是说,map的读和写都不是原子操作

以读map为例看一下map查找的过程


golang map read

很明显,map的读是一系列步骤组成,不是原子操作

map并发更新的问题不再讨论,这里讨论多读一写的场景:
当一个协程尝试查找一个key读一个value的时候,并已经拿到指向value的指针,正在寻址过程中(就像在翻字典的过程中),突然另一个协程将该键值对删除并触发垃圾回收,指向value的指针就指向了空内存,变成了读一个非法的value,导致异常

在golang的实现里面,map的读第一步就是进行写保护检测。如果当前有其他协程在执行“写”操作,就会panic

if h.flags&hashWriting != 0 {    throw("concurrent map read and map write")}

为什么golang原生map不去加锁来支持并发写

map的大部分使用场景不涉及多线程并发读写。如果为了支持少量的并发读写场景而加锁,那么常见的单线程使用场景下,锁的频繁获取与释放必然对效率产生影响
https://go.dev/doc/faq#atomic_maps

golang原生map支持并发读,不支持并发写

Map access is unsafe only when updates are occurring. As long as all goroutines are only reading—looking up elements in the map, including iterating through it using a for range loop—and not changing the map by assigning to elements or doing deletions, it is safe for them to access the map concurrently without synchronization.

原生map并发读是完全可以的,只读不会产生任何意料之外的操作。即使是非原子读,因为不会被其他写操作干涉,所以完全没问题。见下:

// You can edit this code!
// Click here and start typing.
package main

import (
    "fmt"
    "time"
)

func main() {
    c := make(map[string]int)
    c["1"] = 1
    c["0"] = 0
    // 并发读正常
    for j := 0; j < 10000; j++ {
        go func(i int) {
            fmt.Println(c[fmt.Sprintf("%d", i%2)], i)
        }(j)
    }
    time.Sleep(time.Second * 20)
}

// work correctly

一旦涉及了并发写,就会挂,concurrent map writes or concurrent map read and map write 见下:

// You can edit this code!
// Click here and start typing.
package main

import (
    "fmt"
    "time"
)

func main() {
    c := make(map[string]int)
    c["1"] = 1
    c["0"] = 0
    // 并发读写
    for j := 0; j < 10000; j++ {
        go func(i int) {
            fmt.Println(c[fmt.Sprintf("%d", i%2)], i)
        }(j)
    }
    for j := 0; j < 10000; j++ {
        go func(i int) {
            c[fmt.Sprintf("%d", i%2)] = i%2 + 1
        }(j)
    }
    time.Sleep(time.Second * 20)
}

// fatal error: concurrent map read and map write
// fatal error: concurrent map writes

3.sync.map

sync.map支持并发写
其实现综合使用了原子CAS锁机制
同时也冗余了readdirty两个原生的map,以空间换时间,换来了一定的读效率

sync.map数据结构

// The Map type is optimized for two common use cases: (1) when the entry for a given
// key is only ever written once but read many times, as in caches that only grow,
// or (2) when multiple goroutines read, write, and overwrite entries for disjoint
// sets of keys. In these two cases, use of a Map may significantly reduce lock
// contention compared to a Go map paired with a separate Mutex or RWMutex.
type Map struct {
    mu Mutex

    // read contains the portion of the map's contents that are safe for
    // concurrent access (with or without mu held).
    //
    // The read field itself is always safe to load, but must only be stored with
    // mu held.
    //
    // Entries stored in read may be updated concurrently without mu, but updating
    // a previously-expunged entry requires that the entry be copied to the dirty
    // map and unexpunged with mu held.
    read atomic.Value // readOnly

    // dirty contains the portion of the map's contents that require mu to be
    // held. To ensure that the dirty map can be promoted to the read map quickly,
    // it also includes all of the non-expunged entries in the read map.
    //
    // Expunged entries are not stored in the dirty map. An expunged entry in the
    // clean map must be unexpunged and added to the dirty map before a new value
    // can be stored to it.
    //
    // If the dirty map is nil, the next write to the map will initialize it by
    // making a shallow copy of the clean map, omitting stale entries.
    dirty map[interface{}]*entry

    // misses counts the number of loads since the read map was last updated that
    // needed to lock mu to determine whether the key was present.
    //
    // Once enough misses have occurred to cover the cost of copying the dirty
    // map, the dirty map will be promoted to the read map (in the unamended
    // state) and the next store to the map will make a new dirty copy.
    misses int
}

// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
    m       map[interface{}]*entry
    amended bool // true if the dirty map contains some key not in m.
}

官方注释介绍了,适合

  • 读多写少场景,最好是一直“增长写”的场景。(不然涉及delete操作就得进入dirty map又要加锁?)
  • 读写的键值对重叠度低的场景,这种场景下锁的介入度低

sync.map的设计理念:

  • readkey set不变
    read本质上就是个普通的原生map,但设计成是readOnly类型的。这里的readOnly指的是read的key set不会改变,即约定read不允许被insert新的键值对,也不会去delete键值对。key set不变,但value可以被CAS原子更新
    为什么要设计成key set不变 and 支持CAS更新?这其实解决了文章一开始提到的2个并发读写问题
    依据上文知道,原生map可以支持并发读,因为读不会改变key set。一旦key set在并发场景下改变,map的非原子读就可能读异常;那反过来,即使读和更新操作并发,只要key set一直不变,map的非原子读就不会读内存异常;只要CAS更新,读到的数据就是可靠的
  • dirty是map并发读写的常规思路,锁+原生map
  • readdirty按照一定的规则配合起来,从而支持并发读写
  • 尽管readdirty之间互相冗余,但是两个map相同key的entry都通过指针指向同一块内存地址,减少了内存浪费;同时,对read的update也作用到了dirty
sync.map数据结构

read和dirty的配合机制

read和dirty的配合机制
  • 【dirty负责扩缩容,read是某一版本的dirty快照】。首先,map从无到有,扩大缩小,一定是一个key不断增减变化的过程。而根据sync.map的设计,key的增(insert)减(真实delete而非逻辑delete)是在dirty实现的,而一个版本下的read的key set是不变的。因此可以说,A. dirty这个map(锁+原生map)不断地打快照,形成了read;B. dirty的key set永远是read unexpunged key set 的父集,如上图所示
  • 【read承担了dirty的一层缓存角色】。快照打下来有什么用呢?用来作为缓存——read的key set是不变的,因此,除了新增键值对需要直接作用在dirty上,对sync.map的删改查都可以先访问read
    • read,可以无锁并发
    • read,利用原子操作CAS实现lock free
    • read,逻辑删除而不是真的删除,通过将key对应的entry结构体中的p指针更新为nil(read[key]的value是*entry类型,是一个指向entry的指针,而entry中的指针p指向真正的值),断开entry与true value的关联。使用更新操作实现删除效果,也利用了原子CAS,lock free
// An entry is a slot in the map corresponding to a particular key.
type entry struct {
    // p points to the interface{} value stored for the entry.
    //
    // If p == nil, the entry has been deleted and m.dirty == nil.
    //
    // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry
    // is missing from m.dirty.
    //
    // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty
    // != nil, in m.dirty[key].
    //
    // An entry can be deleted by atomic replacement with nil: when m.dirty is
    // next created, it will atomically replace nil with expunged and leave
    // m.dirty[key] unset.
    //
    // An entry's associated value can be updated by atomic replacement, provided
    // p != expunged. If p == expunged, an entry's associated value can be updated
    // only after first setting m.dirty[key] = e so that lookups using the dirty
    // map find the entry.
    p unsafe.Pointer // *interface{}
}
  • dirty->read快照时机。当read miss次数达到len(dirty)后,将dirty“打快照”赋给read。就是当访问read的miss积累到一定程度后,认为当前这个版本的read缓存不够用了,老是要跑到后面的dirty去加锁读写。于是将dirty“打快照”赋给read,给read升级一个版本扩扩容。此时,dirty delete掉无用的nil entry,然后“打快照”赋给read,最后dirty本身重置为nil。具体代码在m.missLocked()
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0

结合源码具体观察sync.map的增删改查实现

  • 查询(载入一个entry)
// Load returns the value stored in the map for a key, or nil if no
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    // 通过atomic的原子操作Load获取一个只读的readOnly类型的实例read
    read, _ := m.read.Load().(readOnly)
    // lock free查找read中的数据
    e, ok := read.m[key]
    //如果没找到,且amended为true即dirty中有新数据,从dirty中加锁查找
    if !ok && read.amended {
          // 加锁
        m.mu.Lock()
        //锁的惯用方式,double-check,避免加锁前的间隙m.dirty打快照给m.read
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            // missLocked就是dirty->read打快照的函数,达到阈值后m.dirty打快照给m.read
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    return e.load()
}
  • 删除entry
// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    // read查不到就去dirty查,查到后dirty直接从内存中删除键值对
    if !ok && read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        //double-checking
        if !ok && read.amended {
                        // 查到后dirty直接从内存中删除键值对
            delete(m.dirty, key)
        }
        m.mu.Unlock()
    }
    // read中查到key(自然dirty里面也有这个key),则从read逻辑删除(更新成nil)
    if ok {
                // delete() 一直在cas nil直到成功
        e.delete()
    }
}
  • 更新/增加entry

先介绍下处于删除态的entry对应的2个值:nil 和 expunged

entry.p == nil: read和dirty都有这个key. entry.p == nil发生在(m *Map) Delete的时候,read中查到key(自然dirty里面也有这个key),则从read逻辑删除(更新成nil,dirty中也跟着变成nil)

entry.p == expunged: read的entry被标记为expunged,表示dirty中不含有对应的key
read的entry什么时候会被标记为expunged?只有当dirty为nil时,Store 操作中的dirtyLocked触发一次read键值对copy到dirty,在每个read entry copy到dirty之前,如果read entry是nil,就会cas转为expunged;expunged的键值对不会覆盖到dirty,在read->dirty copy完毕之后,expunged的键值对只存在于read中

// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
    read, _ := m.read.Load().(readOnly)
    //如果m.read存在这个键,并且这个entry没有被标成expunged,直接存储
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }

    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
          // unexpungeLocked 检查e.p是不是expunged,如是expunged,则会cas为nil并返回true;否则返回false
        if e.unexpungeLocked() {
            // The entry was previously expunged, which implies that there is a non-nil dirty map and this entry is not in it.
            // 往dirty中添加数据。因为dirty的key set是read unexpunged key set的父集
            m.dirty[key] = e
        }
        //更新
        e.storeLocked(&value)
        // read不存在k而dirty中存在这个k,直接更新数据
    } else if e, ok := m.dirty[key]; ok {
        e.storeLocked(&value)
    } else {
        // read和dirty都没有这个key,变成insert,往dirty中加入一个新key

            // 如果dirty中不存在read没有的key,有可能是dirty为nil,或者两者key set相同
        if !read.amended {
            // 只有当dirty为nil时,dirtyLocked触发一次read map 浅拷贝到dirty,让dirty脱离nil状态
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        //将entry加入到dirty中
        m.dirty[key] = newEntry(value)
    }
    //解锁
    m.mu.Unlock()
}
两个map互相拷贝小结:
  • read -> dirty,nil entry.p -> expunged entry.p
    【在insert新key到dirty,并且dirty为nil时】,会发生read -> dirty。这时,read中的nil entry.p会变成expunged entry.p,不参与拷贝过程
  • dirty -> read
    【在misses == len(dirty)的时候】发生。这时,直接覆盖m.read.Store(readOnly{m: m.dirty})

read -> dirty会过滤出expunged entry.p,也就是dirty不含有read 的expunged entry.p对应的key;那么下一次dirty->read全覆盖的时候,这些expunged entry.p就被真正删除了。nil entry.p变成expunged entry.p,就是判了死刑,宣告了它们会在下一次dirty->read全覆盖被删除

sync.map思考:为什么dirty->read是全覆盖呢?像read -> dirty 那样,在dirty->read的时候先删掉nil entry.p再覆盖可以吗?这样覆盖完成后,read == dirty,并且全是normal entry,dirty也不必置为nil

不可以,因为在dirty->read的同时,read可能正被访问中,对于read而言,删除任何nil entry.p都是不安全的,任何nil entry.p对应的键值对在read中仍然是合法的。因此,每一次dirty->read的覆盖,不可以先删掉nil entry.p,哪些东西可以安全删掉,需要由read自己确定
所以,也许正是因此,sync.map的开发者需要在dirty->read覆盖后将dirty置为nil,而dirty == nil,就给了read一次选择权


以下为源码,一共300来行,挺精简的

// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package sync

import (
    "sync/atomic"
    "unsafe"
)

// Map is like a Go map[interface{}]interface{} but is safe for concurrent use
// by multiple goroutines without additional locking or coordination.
// Loads, stores, and deletes run in amortized constant time.
//
// The Map type is specialized. Most code should use a plain Go map instead,
// with separate locking or coordination, for better type safety and to make it
// easier to maintain other invariants along with the map content.
//
// The Map type is optimized for two common use cases: (1) when the entry for a given
// key is only ever written once but read many times, as in caches that only grow,
// or (2) when multiple goroutines read, write, and overwrite entries for disjoint
// sets of keys. In these two cases, use of a Map may significantly reduce lock
// contention compared to a Go map paired with a separate Mutex or RWMutex.
//
// The zero Map is empty and ready for use. A Map must not be copied after first use.
type Map struct {
        //互斥锁,用于dirty数据操作
    mu Mutex

    // read contains the portion of the map's contents that are safe for
    // concurrent access (with or without mu held).

    // The read field itself is always safe to load, but must only be stored with
    // mu held.

    // Entries stored in read may be updated concurrently without mu, but updating
    // a previously-expunged entry requires that the entry be copied to the dirty
    // map and unexpunged with mu held.
    read atomic.Value // readOnly

    // dirty contains the portion of the map's contents that require mu to be
    // held. To ensure that the dirty map can be promoted to the read map quickly,
    // it also includes all of the non-expunged entries in the read map.
    //
    // Expunged entries are not stored in the dirty map. An expunged entry in the
    // clean map must be unexpunged and added to the dirty map before a new value
    // can be stored to it.
    //
    // If the dirty map is nil, the next write to the map will initialize it by
    // making a shallow copy of the clean map, omitting stale entries.
    dirty map[interface{}]*entry

    // misses counts the number of loads since the read map was last updated that
    // needed to lock mu to determine whether the key was present.
    //
    // Once enough misses have occurred to cover the cost of copying the dirty
    // map, the dirty map will be promoted to the read map (in the unamended
    // state) and the next store to the map will make a new dirty copy.
    misses int
}

// readOnly is an immutable struct stored atomically in the Map.read field.
type readOnly struct {
    m       map[interface{}]*entry
    amended bool // true if the dirty map contains some key not in m.
}

// expunged is an arbitrary pointer that marks entries which have been deleted
// from the dirty map.
var expunged = unsafe.Pointer(new(interface{}))

// An entry is a slot in the map corresponding to a particular key.
type entry struct {
    // p points to the interface{} value stored for the entry.
    //
    // If p == nil, the entry has been deleted and m.dirty == nil.
    //
    // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry
    // is missing from m.dirty.
    //
    // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty
    // != nil, in m.dirty[key].
    //
    // An entry can be deleted by atomic replacement with nil: when m.dirty is
    // next created, it will atomically replace nil with expunged and leave
    // m.dirty[key] unset.
    //
    // An entry's associated value can be updated by atomic replacement, provided
    // p != expunged. If p == expunged, an entry's associated value can be updated
    // only after first setting m.dirty[key] = e so that lookups using the dirty
    // map find the entry.
    p unsafe.Pointer // *interface{}
}

func newEntry(i interface{}) *entry {
    return &entry{p: unsafe.Pointer(&i)}
}

// Load returns the value stored in the map for a key, or nil if no
// value is present.
// The ok result indicates whether value was found in the map.
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        // Avoid reporting a spurious miss if m.dirty got promoted while we were
        // blocked on m.mu. (If further loads of the same key will not miss, it's
        // not worth copying the dirty map for this key.)
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            // Regardless of whether the entry was present, record a miss: this key
            // will take the slow path until the dirty map is promoted to the read
            // map.
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    return e.load()
}

func (e *entry) load() (value interface{}, ok bool) {
    p := atomic.LoadPointer(&e.p)
    if p == nil || p == expunged {
        return nil, false
    }
    return *(*interface{})(p), true
}

// Store sets the value for a key.
func (m *Map) Store(key, value interface{}) {
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }

    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        if e.unexpungeLocked() {
            // The entry was previously expunged, which implies that there is a
            // non-nil dirty map and this entry is not in it.
            m.dirty[key] = e
        }
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
        e.storeLocked(&value)
    } else {
        if !read.amended {
            // We're adding the first new key to the dirty map.
            // Make sure it is allocated and mark the read-only map as incomplete.
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
}

// tryStore stores a value if the entry has not been expunged.
//
// If the entry is expunged, tryStore returns false and leaves the entry
// unchanged.
func (e *entry) tryStore(i *interface{}) bool {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
            return true
        }
    }
}

// unexpungeLocked ensures that the entry is not marked as expunged.
//
// If the entry was previously expunged, it must be added to the dirty map
// before m.mu is unlocked.
func (e *entry) unexpungeLocked() (wasExpunged bool) {
    return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

// storeLocked unconditionally stores a value to the entry.
//
// The entry must be known not to be expunged.
func (e *entry) storeLocked(i *interface{}) {
    atomic.StorePointer(&e.p, unsafe.Pointer(i))
}

// LoadOrStore returns the existing value for the key if present.
// Otherwise, it stores and returns the given value.
// The loaded result is true if the value was loaded, false if stored.
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
    // Avoid locking if it's a clean hit.
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        actual, loaded, ok := e.tryLoadOrStore(value)
        if ok {
            return actual, loaded
        }
    }

    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        if e.unexpungeLocked() {
            m.dirty[key] = e
        }
        actual, loaded, _ = e.tryLoadOrStore(value)
    } else if e, ok := m.dirty[key]; ok {
        actual, loaded, _ = e.tryLoadOrStore(value)
        m.missLocked()
    } else {
        if !read.amended {
            // We're adding the first new key to the dirty map.
            // Make sure it is allocated and mark the read-only map as incomplete.
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        m.dirty[key] = newEntry(value)
        actual, loaded = value, false
    }
    m.mu.Unlock()

    return actual, loaded
}

// tryLoadOrStore atomically loads or stores a value if the entry is not
// expunged.
//
// If the entry is expunged, tryLoadOrStore leaves the entry unchanged and
// returns with ok==false.
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
    p := atomic.LoadPointer(&e.p)
    if p == expunged {
        return nil, false, false
    }
    if p != nil {
        return *(*interface{})(p), true, true
    }

    // Copy the interface after the first load to make this method more amenable
    // to escape analysis: if we hit the "load" path or the entry is expunged, we
    // shouldn't bother heap-allocating.
    ic := i
    for {
        if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
            return i, false, true
        }
        p = atomic.LoadPointer(&e.p)
        if p == expunged {
            return nil, false, false
        }
        if p != nil {
            return *(*interface{})(p), true, true
        }
    }
}

// LoadAndDelete deletes the value for a key, returning the previous value if any.
// The loaded result reports whether the key was present.
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            delete(m.dirty, key)
            // Regardless of whether the entry was present, record a miss: this key
            // will take the slow path until the dirty map is promoted to the read
            // map.
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if ok {
        return e.delete()
    }
    return nil, false
}

// Delete deletes the value for a key.
func (m *Map) Delete(key interface{}) {
    m.LoadAndDelete(key)
}

func (e *entry) delete() (value interface{}, ok bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return nil, false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return *(*interface{})(p), true
        }
    }
}

// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
//
// Range does not necessarily correspond to any consistent snapshot of the Map's
// contents: no key will be visited more than once, but if the value for any key
// is stored or deleted concurrently, Range may reflect any mapping for that key
// from any point during the Range call.
//
// Range may be O(N) with the number of elements in the map even if f returns
// false after a constant number of calls.
func (m *Map) Range(f func(key, value interface{}) bool) {
    // We need to be able to iterate over all of the keys that were already
    // present at the start of the call to Range.
    // If read.amended is false, then read.m satisfies that property without
    // requiring us to hold m.mu for a long time.
    read, _ := m.read.Load().(readOnly)
    if read.amended {
        // m.dirty contains keys not in read.m. Fortunately, Range is already O(N)
        // (assuming the caller does not break out early), so a call to Range
        // amortizes an entire copy of the map: we can promote the dirty copy
        // immediately!
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        if read.amended {
            read = readOnly{m: m.dirty}
            m.read.Store(read)
            m.dirty = nil
            m.misses = 0
        }
        m.mu.Unlock()
    }

    for k, e := range read.m {
        v, ok := e.load()
        if !ok {
            continue
        }
        if !f(k, v) {
            break
        }
    }
}

func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {
        return
    }
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
}

func (m *Map) dirtyLocked() {
    if m.dirty != nil {
        return
    }

    read, _ := m.read.Load().(readOnly)
    m.dirty = make(map[interface{}]*entry, len(read.m))
    for k, e := range read.m {
        if !e.tryExpungeLocked() {
            m.dirty[k] = e
        }
    }
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
    p := atomic.LoadPointer(&e.p)
    for p == nil {
        if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
    }
    return p == expunged
}

从数学的角度理解sync.map:两个集合的转换

相关文章

  • 从sync.map看并发问题 2022-05-24

    1.一般意义下的并发问题 并发读写的问题,其实都出在写上。并发读一点问题都没有 并发读写2大问题如果写是更新操作,...

  • Go语言——sync.Map详解

    Go语言——sync.Map详解 sync.Map是1.9才推荐的并发安全的map,除了互斥量以外,还运用了原子操...

  • go 中的 sync.Map

    sync.map sync.Map 的使用和内置的 map 类型一样,只不过它是并发安全的。 Store:存储一对...

  • Go sync.Map

    map并发读线程安全,并发读写线程不安全。 sync.Map 读写分离 空间换时间 Map Golang1.6之前...

  • map与sync.Map

    Go 语言原生 map 并不是线程安全的,对它进行并发读写操作的时候,需要加锁。而 sync.map 则是一种并发...

  • GO并发安全字典sync.map(1)

    sync.Map 一个并发安全的高级数据结构。这个字典类型提供了一些常用的键值存取操作方法,并保证了这些操作的并发...

  • Golang之Map源码解析

    在Golang情景中,Map主要分为两种:sync.Map和内置Map,两者主要区别是内置Map不支持并发读写,s...

  • GO并发安全字典sync.map(2)

    并发安全字典如何做到尽量避免使用锁? 只读字典 sync.Map类型在内部使用了大量的原子操作来存取键和值,并使用...

  • map和sync.map基准测试

    在基准测试中,在并发安全的情况下sync.Map会比我们常用的map+读写锁更加的快,快了五倍,这是得以于只读re...

  • 8、同步互斥机制1(进程并发执行)(操作系统笔记)

    一、进程并发执行 1.1问题的提出 并发是所有问题产生的基础,也是操作系统设计的基础。 1.2从进程的特征看待并发...

网友评论

      本文标题:从sync.map看并发问题 2022-05-24

      本文链接:https://www.haomeiwen.com/subject/mhtsprtx.html