美文网首页
从sync.map看并发问题 2022-05-24

从sync.map看并发问题 2022-05-24

作者: 9_SooHyun | 来源:发表于2022-05-27 23:59 被阅读0次

    1.一般意义下的并发问题

    并发读写的问题,其实都出在写上。并发读一点问题都没有

    并发读写2大问题
    如果写是更新操作,可能导致数据脏不可靠
    如果写涉及删除操作,可能导致内存访问异常

    下面具体说明

    并发更新的问题

    问题就是数据因为无法保护而变得不可靠

    并发更新场景下数据不可靠
    // You can edit this code!
    // Click here and start typing.
    package main
    
    import (
        "fmt"
        "sync"
    )
    
    func main() {
        i := 0
        var wg = &sync.WaitGroup{}
        for index := 0; index < 1000; index += 1 {
            wg.Add(1)
            go func() {
                defer wg.Done()
                i += 1
            }()
        }
        wg.Wait()
        fmt.Println(i)
    }
    
    

    假设并发1000个goroutine对同一个变量i进行+1操作,预期最后i是1000,但绝大多数情况下都不会是1000,而是一个小于1000的数。这是因为出现了多个goroutine在同时读出了i,然后+1,这n个+1操作重叠了,实际只加了一个1

    如果我们加锁让+1操作串行,就可以得到预期的1000

    // You can edit this code!
    // Click here and start typing.
    package main
    
    import (
        "fmt"
        "sync"
    )
    
    func main() {
        i := 0
        var wg = &sync.WaitGroup{}
        var mu sync.Mutex
        for index := 0; index < 1000; index += 1 {
            wg.Add(1)
            go func() {
                defer wg.Done()
                mu.Lock()
                i += 1
                mu.Unlock()
            }()
        }
        wg.Wait()
        fmt.Println(i)
    
    }
    
    

    小结:并发更新场景下数据不可靠,需要通过原子CAS或者加锁来保护

    多读一写

    既然并发写不行,那我只让一个协程写,多读一写式的并发可以吗?试一试

    // You can edit this code!
    // Click here and start typing.
    package main
    
    import (
        "fmt"
        "sync"
    )
    
    func main() {
        i := 0
        var wg = &sync.WaitGroup{}
        // 1000 goroutines reading
        for index := 0; index < 1000; index += 1 {
            wg.Add(1)
            go func() {
                defer wg.Done()
                fmt.Println(i)
            }()
        }
        // 1 goroutines writing
        for index := 0; index < 1000000; index += 1 {
            i += 1
        }
        wg.Wait()
        fmt.Println(i)
    
    }
    
    

    看起来是可以的
    1000 goroutines打印的i,可能是0,可能是1000000,也可能是0-1000000间任何一个数。它们读到的值都是读时最新的

    但是,上面代码的写操作仅仅是update操作,如果是delete操作将i从内存中释放,那么读i就必然发生内存异常。当然,golang不让开发者手动维护对象的生命周期,因此go在语言层面上可以保证上面的代码被正确执行,但如果是c/c++这些支持手动释放内存的语言,多读一写的并发是不可靠的

    所以,严格来说,多读一更新,没问题;多读一(更新+删除),不可靠,有内存访问异常的风险

    2.golang原生map并发读写

    原生map不允许并发读写

    为什么原生map不允许并发读写?
    主要还是看对map的写操作。写map,涉及到增删改,

    • 并发更新场景下,数据不可靠,需要通过原子CAS或者加锁来保护。map没有锁保护,读写也不是原子操作
    • 多读一写场景下,map的写可以释放掉键值对,就类似c/c++手动释放变量的内存一样,负责读的goroutine有内存访问异常的风险

    具体来讲,对map而言,写和读类似,都需要前置一个查找key的过程:
    找到key了,然后读出value 或者 找到key了,然后写入value
    也就是说,map的读和写都不是原子操作

    以读map为例看一下map查找的过程


    golang map read

    很明显,map的读是一系列步骤组成,不是原子操作

    map并发更新的问题不再讨论,这里讨论多读一写的场景:
    当一个协程尝试查找一个key读一个value的时候,并已经拿到指向value的指针,正在寻址过程中(就像在翻字典的过程中),突然另一个协程将该键值对删除并触发垃圾回收,指向value的指针就指向了空内存,变成了读一个非法的value,导致异常

    在golang的实现里面,map的读第一步就是进行写保护检测。如果当前有其他协程在执行“写”操作,就会panic

    if h.flags&hashWriting != 0 {    throw("concurrent map read and map write")}
    

    为什么golang原生map不去加锁来支持并发写

    map的大部分使用场景不涉及多线程并发读写。如果为了支持少量的并发读写场景而加锁,那么常见的单线程使用场景下,锁的频繁获取与释放必然对效率产生影响
    https://go.dev/doc/faq#atomic_maps

    golang原生map支持并发读,不支持并发写

    Map access is unsafe only when updates are occurring. As long as all goroutines are only reading—looking up elements in the map, including iterating through it using a for range loop—and not changing the map by assigning to elements or doing deletions, it is safe for them to access the map concurrently without synchronization.

    原生map并发读是完全可以的,只读不会产生任何意料之外的操作。即使是非原子读,因为不会被其他写操作干涉,所以完全没问题。见下:

    // You can edit this code!
    // Click here and start typing.
    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main() {
        c := make(map[string]int)
        c["1"] = 1
        c["0"] = 0
        // 并发读正常
        for j := 0; j < 10000; j++ {
            go func(i int) {
                fmt.Println(c[fmt.Sprintf("%d", i%2)], i)
            }(j)
        }
        time.Sleep(time.Second * 20)
    }
    
    // work correctly
    

    一旦涉及了并发写,就会挂,concurrent map writes or concurrent map read and map write 见下:

    // You can edit this code!
    // Click here and start typing.
    package main
    
    import (
        "fmt"
        "time"
    )
    
    func main() {
        c := make(map[string]int)
        c["1"] = 1
        c["0"] = 0
        // 并发读写
        for j := 0; j < 10000; j++ {
            go func(i int) {
                fmt.Println(c[fmt.Sprintf("%d", i%2)], i)
            }(j)
        }
        for j := 0; j < 10000; j++ {
            go func(i int) {
                c[fmt.Sprintf("%d", i%2)] = i%2 + 1
            }(j)
        }
        time.Sleep(time.Second * 20)
    }
    
    // fatal error: concurrent map read and map write
    // fatal error: concurrent map writes
    

    3.sync.map

    sync.map支持并发写
    其实现综合使用了原子CAS锁机制
    同时也冗余了readdirty两个原生的map,以空间换时间,换来了一定的读效率

    sync.map数据结构

    // The Map type is optimized for two common use cases: (1) when the entry for a given
    // key is only ever written once but read many times, as in caches that only grow,
    // or (2) when multiple goroutines read, write, and overwrite entries for disjoint
    // sets of keys. In these two cases, use of a Map may significantly reduce lock
    // contention compared to a Go map paired with a separate Mutex or RWMutex.
    type Map struct {
        mu Mutex
    
        // read contains the portion of the map's contents that are safe for
        // concurrent access (with or without mu held).
        //
        // The read field itself is always safe to load, but must only be stored with
        // mu held.
        //
        // Entries stored in read may be updated concurrently without mu, but updating
        // a previously-expunged entry requires that the entry be copied to the dirty
        // map and unexpunged with mu held.
        read atomic.Value // readOnly
    
        // dirty contains the portion of the map's contents that require mu to be
        // held. To ensure that the dirty map can be promoted to the read map quickly,
        // it also includes all of the non-expunged entries in the read map.
        //
        // Expunged entries are not stored in the dirty map. An expunged entry in the
        // clean map must be unexpunged and added to the dirty map before a new value
        // can be stored to it.
        //
        // If the dirty map is nil, the next write to the map will initialize it by
        // making a shallow copy of the clean map, omitting stale entries.
        dirty map[interface{}]*entry
    
        // misses counts the number of loads since the read map was last updated that
        // needed to lock mu to determine whether the key was present.
        //
        // Once enough misses have occurred to cover the cost of copying the dirty
        // map, the dirty map will be promoted to the read map (in the unamended
        // state) and the next store to the map will make a new dirty copy.
        misses int
    }
    
    // readOnly is an immutable struct stored atomically in the Map.read field.
    type readOnly struct {
        m       map[interface{}]*entry
        amended bool // true if the dirty map contains some key not in m.
    }
    

    官方注释介绍了,适合

    • 读多写少场景,最好是一直“增长写”的场景。(不然涉及delete操作就得进入dirty map又要加锁?)
    • 读写的键值对重叠度低的场景,这种场景下锁的介入度低

    sync.map的设计理念:

    • readkey set不变
      read本质上就是个普通的原生map,但设计成是readOnly类型的。这里的readOnly指的是read的key set不会改变,即约定read不允许被insert新的键值对,也不会去delete键值对。key set不变,但value可以被CAS原子更新
      为什么要设计成key set不变 and 支持CAS更新?这其实解决了文章一开始提到的2个并发读写问题
      依据上文知道,原生map可以支持并发读,因为读不会改变key set。一旦key set在并发场景下改变,map的非原子读就可能读异常;那反过来,即使读和更新操作并发,只要key set一直不变,map的非原子读就不会读内存异常;只要CAS更新,读到的数据就是可靠的
    • dirty是map并发读写的常规思路,锁+原生map
    • readdirty按照一定的规则配合起来,从而支持并发读写
    • 尽管readdirty之间互相冗余,但是两个map相同key的entry都通过指针指向同一块内存地址,减少了内存浪费;同时,对read的update也作用到了dirty
    sync.map数据结构

    read和dirty的配合机制

    read和dirty的配合机制
    • 【dirty负责扩缩容,read是某一版本的dirty快照】。首先,map从无到有,扩大缩小,一定是一个key不断增减变化的过程。而根据sync.map的设计,key的增(insert)减(真实delete而非逻辑delete)是在dirty实现的,而一个版本下的read的key set是不变的。因此可以说,A. dirty这个map(锁+原生map)不断地打快照,形成了read;B. dirty的key set永远是read unexpunged key set 的父集,如上图所示
    • 【read承担了dirty的一层缓存角色】。快照打下来有什么用呢?用来作为缓存——read的key set是不变的,因此,除了新增键值对需要直接作用在dirty上,对sync.map的删改查都可以先访问read
      • read,可以无锁并发
      • read,利用原子操作CAS实现lock free
      • read,逻辑删除而不是真的删除,通过将key对应的entry结构体中的p指针更新为nil(read[key]的value是*entry类型,是一个指向entry的指针,而entry中的指针p指向真正的值),断开entry与true value的关联。使用更新操作实现删除效果,也利用了原子CAS,lock free
    // An entry is a slot in the map corresponding to a particular key.
    type entry struct {
        // p points to the interface{} value stored for the entry.
        //
        // If p == nil, the entry has been deleted and m.dirty == nil.
        //
        // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry
        // is missing from m.dirty.
        //
        // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty
        // != nil, in m.dirty[key].
        //
        // An entry can be deleted by atomic replacement with nil: when m.dirty is
        // next created, it will atomically replace nil with expunged and leave
        // m.dirty[key] unset.
        //
        // An entry's associated value can be updated by atomic replacement, provided
        // p != expunged. If p == expunged, an entry's associated value can be updated
        // only after first setting m.dirty[key] = e so that lookups using the dirty
        // map find the entry.
        p unsafe.Pointer // *interface{}
    }
    
    • dirty->read快照时机。当read miss次数达到len(dirty)后,将dirty“打快照”赋给read。就是当访问read的miss积累到一定程度后,认为当前这个版本的read缓存不够用了,老是要跑到后面的dirty去加锁读写。于是将dirty“打快照”赋给read,给read升级一个版本扩扩容。此时,dirty delete掉无用的nil entry,然后“打快照”赋给read,最后dirty本身重置为nil。具体代码在m.missLocked()
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
    

    结合源码具体观察sync.map的增删改查实现

    • 查询(载入一个entry)
    // Load returns the value stored in the map for a key, or nil if no
    func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
        // 通过atomic的原子操作Load获取一个只读的readOnly类型的实例read
        read, _ := m.read.Load().(readOnly)
        // lock free查找read中的数据
        e, ok := read.m[key]
        //如果没找到,且amended为true即dirty中有新数据,从dirty中加锁查找
        if !ok && read.amended {
              // 加锁
            m.mu.Lock()
            //锁的惯用方式,double-check,避免加锁前的间隙m.dirty打快照给m.read
            read, _ = m.read.Load().(readOnly)
            e, ok = read.m[key]
            if !ok && read.amended {
                e, ok = m.dirty[key]
                // missLocked就是dirty->read打快照的函数,达到阈值后m.dirty打快照给m.read
                m.missLocked()
            }
            m.mu.Unlock()
        }
        if !ok {
            return nil, false
        }
        return e.load()
    }
    
    • 删除entry
    // Delete deletes the value for a key.
    func (m *Map) Delete(key interface{}) {
        read, _ := m.read.Load().(readOnly)
        e, ok := read.m[key]
        // read查不到就去dirty查,查到后dirty直接从内存中删除键值对
        if !ok && read.amended {
            m.mu.Lock()
            read, _ = m.read.Load().(readOnly)
            e, ok = read.m[key]
            //double-checking
            if !ok && read.amended {
                            // 查到后dirty直接从内存中删除键值对
                delete(m.dirty, key)
            }
            m.mu.Unlock()
        }
        // read中查到key(自然dirty里面也有这个key),则从read逻辑删除(更新成nil)
        if ok {
                    // delete() 一直在cas nil直到成功
            e.delete()
        }
    }
    
    • 更新/增加entry

    先介绍下处于删除态的entry对应的2个值:nil 和 expunged

    entry.p == nil: read和dirty都有这个key. entry.p == nil发生在(m *Map) Delete的时候,read中查到key(自然dirty里面也有这个key),则从read逻辑删除(更新成nil,dirty中也跟着变成nil)

    entry.p == expunged: read的entry被标记为expunged,表示dirty中不含有对应的key
    read的entry什么时候会被标记为expunged?只有当dirty为nil时,Store 操作中的dirtyLocked触发一次read键值对copy到dirty,在每个read entry copy到dirty之前,如果read entry是nil,就会cas转为expunged;expunged的键值对不会覆盖到dirty,在read->dirty copy完毕之后,expunged的键值对只存在于read中

    // Store sets the value for a key.
    func (m *Map) Store(key, value interface{}) {
        read, _ := m.read.Load().(readOnly)
        //如果m.read存在这个键,并且这个entry没有被标成expunged,直接存储
        if e, ok := read.m[key]; ok && e.tryStore(&value) {
            return
        }
    
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        if e, ok := read.m[key]; ok {
              // unexpungeLocked 检查e.p是不是expunged,如是expunged,则会cas为nil并返回true;否则返回false
            if e.unexpungeLocked() {
                // The entry was previously expunged, which implies that there is a non-nil dirty map and this entry is not in it.
                // 往dirty中添加数据。因为dirty的key set是read unexpunged key set的父集
                m.dirty[key] = e
            }
            //更新
            e.storeLocked(&value)
            // read不存在k而dirty中存在这个k,直接更新数据
        } else if e, ok := m.dirty[key]; ok {
            e.storeLocked(&value)
        } else {
            // read和dirty都没有这个key,变成insert,往dirty中加入一个新key
    
                // 如果dirty中不存在read没有的key,有可能是dirty为nil,或者两者key set相同
            if !read.amended {
                // 只有当dirty为nil时,dirtyLocked触发一次read map 浅拷贝到dirty,让dirty脱离nil状态
                m.dirtyLocked()
                m.read.Store(readOnly{m: read.m, amended: true})
            }
            //将entry加入到dirty中
            m.dirty[key] = newEntry(value)
        }
        //解锁
        m.mu.Unlock()
    }
    
    两个map互相拷贝小结:
    • read -> dirty,nil entry.p -> expunged entry.p
      【在insert新key到dirty,并且dirty为nil时】,会发生read -> dirty。这时,read中的nil entry.p会变成expunged entry.p,不参与拷贝过程
    • dirty -> read
      【在misses == len(dirty)的时候】发生。这时,直接覆盖m.read.Store(readOnly{m: m.dirty})

    read -> dirty会过滤出expunged entry.p,也就是dirty不含有read 的expunged entry.p对应的key;那么下一次dirty->read全覆盖的时候,这些expunged entry.p就被真正删除了。nil entry.p变成expunged entry.p,就是判了死刑,宣告了它们会在下一次dirty->read全覆盖被删除

    sync.map思考:为什么dirty->read是全覆盖呢?像read -> dirty 那样,在dirty->read的时候先删掉nil entry.p再覆盖可以吗?这样覆盖完成后,read == dirty,并且全是normal entry,dirty也不必置为nil

    不可以,因为在dirty->read的同时,read可能正被访问中,对于read而言,删除任何nil entry.p都是不安全的,任何nil entry.p对应的键值对在read中仍然是合法的。因此,每一次dirty->read的覆盖,不可以先删掉nil entry.p,哪些东西可以安全删掉,需要由read自己确定
    所以,也许正是因此,sync.map的开发者需要在dirty->read覆盖后将dirty置为nil,而dirty == nil,就给了read一次选择权


    以下为源码,一共300来行,挺精简的

    // Copyright 2016 The Go Authors. All rights reserved.
    // Use of this source code is governed by a BSD-style
    // license that can be found in the LICENSE file.
    
    package sync
    
    import (
        "sync/atomic"
        "unsafe"
    )
    
    // Map is like a Go map[interface{}]interface{} but is safe for concurrent use
    // by multiple goroutines without additional locking or coordination.
    // Loads, stores, and deletes run in amortized constant time.
    //
    // The Map type is specialized. Most code should use a plain Go map instead,
    // with separate locking or coordination, for better type safety and to make it
    // easier to maintain other invariants along with the map content.
    //
    // The Map type is optimized for two common use cases: (1) when the entry for a given
    // key is only ever written once but read many times, as in caches that only grow,
    // or (2) when multiple goroutines read, write, and overwrite entries for disjoint
    // sets of keys. In these two cases, use of a Map may significantly reduce lock
    // contention compared to a Go map paired with a separate Mutex or RWMutex.
    //
    // The zero Map is empty and ready for use. A Map must not be copied after first use.
    type Map struct {
            //互斥锁,用于dirty数据操作
        mu Mutex
    
        // read contains the portion of the map's contents that are safe for
        // concurrent access (with or without mu held).
    
        // The read field itself is always safe to load, but must only be stored with
        // mu held.
    
        // Entries stored in read may be updated concurrently without mu, but updating
        // a previously-expunged entry requires that the entry be copied to the dirty
        // map and unexpunged with mu held.
        read atomic.Value // readOnly
    
        // dirty contains the portion of the map's contents that require mu to be
        // held. To ensure that the dirty map can be promoted to the read map quickly,
        // it also includes all of the non-expunged entries in the read map.
        //
        // Expunged entries are not stored in the dirty map. An expunged entry in the
        // clean map must be unexpunged and added to the dirty map before a new value
        // can be stored to it.
        //
        // If the dirty map is nil, the next write to the map will initialize it by
        // making a shallow copy of the clean map, omitting stale entries.
        dirty map[interface{}]*entry
    
        // misses counts the number of loads since the read map was last updated that
        // needed to lock mu to determine whether the key was present.
        //
        // Once enough misses have occurred to cover the cost of copying the dirty
        // map, the dirty map will be promoted to the read map (in the unamended
        // state) and the next store to the map will make a new dirty copy.
        misses int
    }
    
    // readOnly is an immutable struct stored atomically in the Map.read field.
    type readOnly struct {
        m       map[interface{}]*entry
        amended bool // true if the dirty map contains some key not in m.
    }
    
    // expunged is an arbitrary pointer that marks entries which have been deleted
    // from the dirty map.
    var expunged = unsafe.Pointer(new(interface{}))
    
    // An entry is a slot in the map corresponding to a particular key.
    type entry struct {
        // p points to the interface{} value stored for the entry.
        //
        // If p == nil, the entry has been deleted and m.dirty == nil.
        //
        // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry
        // is missing from m.dirty.
        //
        // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty
        // != nil, in m.dirty[key].
        //
        // An entry can be deleted by atomic replacement with nil: when m.dirty is
        // next created, it will atomically replace nil with expunged and leave
        // m.dirty[key] unset.
        //
        // An entry's associated value can be updated by atomic replacement, provided
        // p != expunged. If p == expunged, an entry's associated value can be updated
        // only after first setting m.dirty[key] = e so that lookups using the dirty
        // map find the entry.
        p unsafe.Pointer // *interface{}
    }
    
    func newEntry(i interface{}) *entry {
        return &entry{p: unsafe.Pointer(&i)}
    }
    
    // Load returns the value stored in the map for a key, or nil if no
    // value is present.
    // The ok result indicates whether value was found in the map.
    func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
        read, _ := m.read.Load().(readOnly)
        e, ok := read.m[key]
        if !ok && read.amended {
            m.mu.Lock()
            // Avoid reporting a spurious miss if m.dirty got promoted while we were
            // blocked on m.mu. (If further loads of the same key will not miss, it's
            // not worth copying the dirty map for this key.)
            read, _ = m.read.Load().(readOnly)
            e, ok = read.m[key]
            if !ok && read.amended {
                e, ok = m.dirty[key]
                // Regardless of whether the entry was present, record a miss: this key
                // will take the slow path until the dirty map is promoted to the read
                // map.
                m.missLocked()
            }
            m.mu.Unlock()
        }
        if !ok {
            return nil, false
        }
        return e.load()
    }
    
    func (e *entry) load() (value interface{}, ok bool) {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return nil, false
        }
        return *(*interface{})(p), true
    }
    
    // Store sets the value for a key.
    func (m *Map) Store(key, value interface{}) {
        read, _ := m.read.Load().(readOnly)
        if e, ok := read.m[key]; ok && e.tryStore(&value) {
            return
        }
    
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        if e, ok := read.m[key]; ok {
            if e.unexpungeLocked() {
                // The entry was previously expunged, which implies that there is a
                // non-nil dirty map and this entry is not in it.
                m.dirty[key] = e
            }
            e.storeLocked(&value)
        } else if e, ok := m.dirty[key]; ok {
            e.storeLocked(&value)
        } else {
            if !read.amended {
                // We're adding the first new key to the dirty map.
                // Make sure it is allocated and mark the read-only map as incomplete.
                m.dirtyLocked()
                m.read.Store(readOnly{m: read.m, amended: true})
            }
            m.dirty[key] = newEntry(value)
        }
        m.mu.Unlock()
    }
    
    // tryStore stores a value if the entry has not been expunged.
    //
    // If the entry is expunged, tryStore returns false and leaves the entry
    // unchanged.
    func (e *entry) tryStore(i *interface{}) bool {
        for {
            p := atomic.LoadPointer(&e.p)
            if p == expunged {
                return false
            }
            if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
                return true
            }
        }
    }
    
    // unexpungeLocked ensures that the entry is not marked as expunged.
    //
    // If the entry was previously expunged, it must be added to the dirty map
    // before m.mu is unlocked.
    func (e *entry) unexpungeLocked() (wasExpunged bool) {
        return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
    }
    
    // storeLocked unconditionally stores a value to the entry.
    //
    // The entry must be known not to be expunged.
    func (e *entry) storeLocked(i *interface{}) {
        atomic.StorePointer(&e.p, unsafe.Pointer(i))
    }
    
    // LoadOrStore returns the existing value for the key if present.
    // Otherwise, it stores and returns the given value.
    // The loaded result is true if the value was loaded, false if stored.
    func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
        // Avoid locking if it's a clean hit.
        read, _ := m.read.Load().(readOnly)
        if e, ok := read.m[key]; ok {
            actual, loaded, ok := e.tryLoadOrStore(value)
            if ok {
                return actual, loaded
            }
        }
    
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        if e, ok := read.m[key]; ok {
            if e.unexpungeLocked() {
                m.dirty[key] = e
            }
            actual, loaded, _ = e.tryLoadOrStore(value)
        } else if e, ok := m.dirty[key]; ok {
            actual, loaded, _ = e.tryLoadOrStore(value)
            m.missLocked()
        } else {
            if !read.amended {
                // We're adding the first new key to the dirty map.
                // Make sure it is allocated and mark the read-only map as incomplete.
                m.dirtyLocked()
                m.read.Store(readOnly{m: read.m, amended: true})
            }
            m.dirty[key] = newEntry(value)
            actual, loaded = value, false
        }
        m.mu.Unlock()
    
        return actual, loaded
    }
    
    // tryLoadOrStore atomically loads or stores a value if the entry is not
    // expunged.
    //
    // If the entry is expunged, tryLoadOrStore leaves the entry unchanged and
    // returns with ok==false.
    func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
        p := atomic.LoadPointer(&e.p)
        if p == expunged {
            return nil, false, false
        }
        if p != nil {
            return *(*interface{})(p), true, true
        }
    
        // Copy the interface after the first load to make this method more amenable
        // to escape analysis: if we hit the "load" path or the entry is expunged, we
        // shouldn't bother heap-allocating.
        ic := i
        for {
            if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
                return i, false, true
            }
            p = atomic.LoadPointer(&e.p)
            if p == expunged {
                return nil, false, false
            }
            if p != nil {
                return *(*interface{})(p), true, true
            }
        }
    }
    
    // LoadAndDelete deletes the value for a key, returning the previous value if any.
    // The loaded result reports whether the key was present.
    func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
        read, _ := m.read.Load().(readOnly)
        e, ok := read.m[key]
        if !ok && read.amended {
            m.mu.Lock()
            read, _ = m.read.Load().(readOnly)
            e, ok = read.m[key]
            if !ok && read.amended {
                e, ok = m.dirty[key]
                delete(m.dirty, key)
                // Regardless of whether the entry was present, record a miss: this key
                // will take the slow path until the dirty map is promoted to the read
                // map.
                m.missLocked()
            }
            m.mu.Unlock()
        }
        if ok {
            return e.delete()
        }
        return nil, false
    }
    
    // Delete deletes the value for a key.
    func (m *Map) Delete(key interface{}) {
        m.LoadAndDelete(key)
    }
    
    func (e *entry) delete() (value interface{}, ok bool) {
        for {
            p := atomic.LoadPointer(&e.p)
            if p == nil || p == expunged {
                return nil, false
            }
            if atomic.CompareAndSwapPointer(&e.p, p, nil) {
                return *(*interface{})(p), true
            }
        }
    }
    
    // Range calls f sequentially for each key and value present in the map.
    // If f returns false, range stops the iteration.
    //
    // Range does not necessarily correspond to any consistent snapshot of the Map's
    // contents: no key will be visited more than once, but if the value for any key
    // is stored or deleted concurrently, Range may reflect any mapping for that key
    // from any point during the Range call.
    //
    // Range may be O(N) with the number of elements in the map even if f returns
    // false after a constant number of calls.
    func (m *Map) Range(f func(key, value interface{}) bool) {
        // We need to be able to iterate over all of the keys that were already
        // present at the start of the call to Range.
        // If read.amended is false, then read.m satisfies that property without
        // requiring us to hold m.mu for a long time.
        read, _ := m.read.Load().(readOnly)
        if read.amended {
            // m.dirty contains keys not in read.m. Fortunately, Range is already O(N)
            // (assuming the caller does not break out early), so a call to Range
            // amortizes an entire copy of the map: we can promote the dirty copy
            // immediately!
            m.mu.Lock()
            read, _ = m.read.Load().(readOnly)
            if read.amended {
                read = readOnly{m: m.dirty}
                m.read.Store(read)
                m.dirty = nil
                m.misses = 0
            }
            m.mu.Unlock()
        }
    
        for k, e := range read.m {
            v, ok := e.load()
            if !ok {
                continue
            }
            if !f(k, v) {
                break
            }
        }
    }
    
    func (m *Map) missLocked() {
        m.misses++
        if m.misses < len(m.dirty) {
            return
        }
        m.read.Store(readOnly{m: m.dirty})
        m.dirty = nil
        m.misses = 0
    }
    
    func (m *Map) dirtyLocked() {
        if m.dirty != nil {
            return
        }
    
        read, _ := m.read.Load().(readOnly)
        m.dirty = make(map[interface{}]*entry, len(read.m))
        for k, e := range read.m {
            if !e.tryExpungeLocked() {
                m.dirty[k] = e
            }
        }
    }
    
    func (e *entry) tryExpungeLocked() (isExpunged bool) {
        p := atomic.LoadPointer(&e.p)
        for p == nil {
            if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
                return true
            }
            p = atomic.LoadPointer(&e.p)
        }
        return p == expunged
    }
    
    

    从数学的角度理解sync.map:两个集合的转换

    相关文章

      网友评论

          本文标题:从sync.map看并发问题 2022-05-24

          本文链接:https://www.haomeiwen.com/subject/mhtsprtx.html