并发小工具整理

作者: Best博客 | 来源:发表于2021-01-25 20:54 被阅读0次

    小工具

    并发场景用一些经历过生产检验的小工具,比自己临时磨刀要安全,go-zero的core里面很多他们自己写好的小工具,没啥依赖,你要觉得好完全可以copy到你自己的项目里面使用,真香。

    并发下多个 goroutine ,任意一个返回error,则拿到error并finish不在阻塞

    更多使用魔法https://github.com/tal-tech/zero-doc/blob/main/doc/mapreduce.md

    package mr
    
    import (
        "errors"
        "io/ioutil"
        "log"
        "runtime"
        "sync/atomic"
        "testing"
        "time"
    
        "github.com/stretchr/testify/assert"
        "github.com/tal-tech/go-zero/core/stringx"
        "github.com/tal-tech/go-zero/core/syncx"
    )
    
    //调用链路 Finish -> MapReduceVoid-> MapReduce -> MapReduceWithSource->executeMappers
    // 多个goroutine 有一个返回error,如果还没有执行的func就会不执行了,已经执行了的,业务就不管它了,也不会去等待它返回
    func TestFinish(t *testing.T) {
        var total uint32
        err := Finish(func() error {
            atomic.AddUint32(&total, 2)
            err := errors.New("aaaa")
            return err
        }, func() error {
            atomic.AddUint32(&total, 3)
            return nil
        }, func() error {
            atomic.AddUint32(&total, 5)
            return nil
        })
    
        assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
        assert.Nil(t, err)
    }
    

    并发场景下的func 串的方式执行

    1. 当一个func被并发调用,只会有一个goroutine拿到锁执行,该func在执行期间,并发的调用行为都不会真正执行,直接返回结果,但该func执行完毕后,又来了并发调用则又走以上流程。 适用与突增并发类型,用于保证该func同时执行的次数为1
      举例:func执行花了1秒,期间该函数被并发调用了1000次,则该func在这1S调用的1000次里面实际只执行1次;但是如果你这1000次是每隔1s调用一次,则该函数就会被真正执行1000次。 内部用到了lock
      https://github.com/tal-tech/zero-doc/blob/main/doc/sharedcalls.md
    package syncx
    
    import (
        "errors"
        "fmt"
        "sync"
        "sync/atomic"
        "testing"
        "time"
    )
    //值得注意的是,不是分布式控制的,只能达到去重目的,并不能保证真的只执行一次,因为源代码里面有个delete掉key的行为
    // 可能是作者的思想是只保证fn()在执行期间来的并发就不需要执行,直接返回结果
    // 但一开始我还以为是只执行一次
    func TestExclusiveCallDo(t *testing.T) {
        g := NewSharedCalls()
        v, err := g.Do("key", func() (interface{}, error) {
            return "bar", nil
        })
        if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
            t.Errorf("Do = %v; want %v", got, want)
        }
        if err != nil {
            t.Errorf("Do error = %v", err)
        }
    }
    
    // 这个这样子写  能保证只执行一次,但其实时伪保证,是在故意加大fn执行时间
    func TestExclusiveCallDoDupSuppress(t *testing.T) {
        g := NewSharedCalls()
        c := make(chan string)
        var calls int32
        fn := func() (interface{}, error) {
            atomic.AddInt32(&calls, 1)
            return <-c, nil
        }
    
        const n = 10
        var wg sync.WaitGroup
        for i := 0; i < n; i++ {
            wg.Add(1)
            go func() {
                v, err := g.Do("key", fn)
                if err != nil {
                    t.Errorf("Do error: %v", err)
                }
                if v.(string) != "bar" {
                    t.Errorf("got %q; want %q", v, "bar")
                }
                wg.Done()
            }()
        }
        time.Sleep(100 * time.Millisecond) // let goroutines above block
        c <- "bar"
        wg.Wait()
        if got := atomic.LoadInt32(&calls); got != 1 {
            t.Errorf("number of calls = %d; want 1", got)
        }
    }
    

    生成只执行一次的函数

    package syncx
    
    import (
        "fmt"
        "github.com/stretchr/testify/assert"
        "testing"
        "time"
        "sync"
    )
    
    func Once(fn func()) func() {
        once := new(sync.Once)
        return func() {
            once.Do(fn)
        }
    }
    
    //只打印一条内容,因为add只执行一次
    func TestOnce(t *testing.T) {
        var v int
        add := Once(func() {
            v++
        })
    
        for i := 0; i < 5; i++ {
            add()
        }
        fmt.Printf("--- 执行次数 ---")
        time.Sleep(time.Second)
    }
    
    

    让函数触发多次只执行一次,并能拿到返回结果,非并发安全

    package syncx
    
    import (
        "sync"
        "time"
        "sync/atomic"
    
        "github.com/tal-tech/go-zero/core/timex"
    )
    
    const defaultRefreshInterval = time.Second
    
    type (
        ImmutableResourceOption func(resource *ImmutableResource)
    
        ImmutableResource struct {
            fetch           func() (interface{}, error)
            resource        interface{}
            err             error
            lock            sync.RWMutex
            refreshInterval time.Duration
            lastTime        *AtomicDuration
        }
    )
    
    func NewImmutableResource(fn func() (interface{}, error), opts ...ImmutableResourceOption) *ImmutableResource {
        // cannot use executors.LessExecutor because of cycle imports
        ir := ImmutableResource{
            fetch:           fn,
            refreshInterval: defaultRefreshInterval,
            lastTime:        NewAtomicDuration(),
        }
        for _, opt := range opts {
            opt(&ir)
        }
        return &ir
    }
    
    func (ir *ImmutableResource) Get() (interface{}, error) {
        ir.lock.RLock()
        resource := ir.resource
        ir.lock.RUnlock()
        if resource != nil {
            return resource, nil
        }
    
        ir.maybeRefresh(func() {
            res, err := ir.fetch()
            ir.lock.Lock()
            if err != nil {
                ir.err = err
            } else {
                ir.resource, ir.err = res, nil
            }
            ir.lock.Unlock()
        })
    
        ir.lock.RLock()
        resource, err := ir.resource, ir.err
        ir.lock.RUnlock()
        return resource, err
    }
    
    func (ir *ImmutableResource) maybeRefresh(execute func()) {
        now := timex.Now()
        lastTime := ir.lastTime.Load()
        if lastTime == 0 || lastTime+ir.refreshInterval < now {
            ir.lastTime.Set(now)
            execute()
        }
    }
    
    // Set interval to 0 to enforce refresh every time if not succeeded. default is time.Second.
    func WithRefreshIntervalOnFailure(interval time.Duration) ImmutableResourceOption {
        return func(resource *ImmutableResource) {
            resource.refreshInterval = interval
        }
    }
    
    
    
    type AtomicDuration int64
    
    func NewAtomicDuration() *AtomicDuration {
        return new(AtomicDuration)
    }
    
    func ForAtomicDuration(val time.Duration) *AtomicDuration {
        d := NewAtomicDuration()
        d.Set(val)
        return d
    }
    
    func (d *AtomicDuration) CompareAndSwap(old, val time.Duration) bool {
        return atomic.CompareAndSwapInt64((*int64)(d), int64(old), int64(val))
    }
    
    func (d *AtomicDuration) Load() time.Duration {
        return time.Duration(atomic.LoadInt64((*int64)(d)))
    }
    
    func (d *AtomicDuration) Set(val time.Duration) {
        atomic.StoreInt64((*int64)(d), int64(val))
    }
    
    
    // 
    func main(){
        var count int
        r := NewImmutableResource(func() (interface{}, error) {
            count++
            time.Sleep(time.Millisecond*1) //这里sleep一下并发就不能保证只执行一次了
            return "hello", nil
        })
        for i:=0;i<100;i++{
            go func() {
                res, err := r.Get()  //就需要你在这里的Get加锁后再执行了
                assert.Equal(t, "hello", res)
                assert.Equal(t, 1, count)
                assert.Nil(t, err)
            }()
        }
    
        time.Sleep(time.Second * 1)
    
        // again
        res, err := r.Get()
    }
    
    

    相关文章

      网友评论

        本文标题:并发小工具整理

        本文链接:https://www.haomeiwen.com/subject/rhwizktx.html