美文网首页
7 - 并发编程

7 - 并发编程

作者: 天命_风流 | 来源:发表于2020-07-05 20:42 被阅读0次

    只执行一次

    • 最常见的场景是单例模式
    • 使用 sync.Once 可以实现单例模式
    package singletion
    
    import (
        "fmt"
        "sync"
        "testing"
        "unsafe"
    )
    
    type Singleton struct {
    }
    
    var singleInstance *Singleton
    var once sync.Once
    
    func GetSingletionObj() *Singleton{  // 使用这个函数,可以实现单例模式
        once.Do(func() {  // 似乎这段代码只会执行一次
            fmt.Println("Create Obj")
            singleInstance = new(Singleton)
        })
        return singleInstance
    }
    
    func TestGetSingleonObj(t *testing.T) {
        var wg = sync.WaitGroup{}
        for i := 0 ; i < 10 ; i++{
            wg.Add(1)
            go func() {
                obj := GetSingletionObj()
                fmt.Printf("%x\n", unsafe.Pointer(obj))
                wg.Done()
            }()
        }
        wg.Wait()
    }
    

    仅需任意任务完成

    • 在某些时候,我们开启了一堆任务,只要我们完成一个即可
    • 我们可以使用 channel 完成这个功能
    package first_response
    
    import (
        "fmt"
        "runtime"
        "testing"
        "time"
    )
    
    func runTask(id int) string{
        time.Sleep(time.Millisecond * 15)
        return fmt.Sprintf("the result is from %d", id)
    }
    
    func FirstResponse() string{
        numOfRunner := 10
        ch := make(chan string)
        for i:= 0 ; i < numOfRunner ; i++{
            go func(i int) {
                defer func() {
                    fmt.Println("over: ", i)
                }()
                ret := runTask(i)
                ch <- ret  // [1]
            }(i)
        }
        return <- ch
    }
    
    func TestFirstResponse(t *testing.T)  {
        t.Log("Before:", runtime.NumGoroutine())  // 2
        t.Log(FirstResponse())  // the result is from 4
        time.Sleep(time.Second * 1)
        t.Log("After:", runtime.NumGoroutine())  // 11
    }
    
    • 你会发现,由于 channal 是阻塞的(在[1]处),所以当一个任务运行完之后,其他 9 个任务依然在阻塞状态。
    • 这可能导致严重的内存泄漏问题
    • 使用 buffer channel,可以解决这个问题
    package first_response
    
    import (
        "fmt"
        "runtime"
        "testing"
        "time"
    )
    
    func runTask(id int) string{
        time.Sleep(time.Millisecond * 15)
        return fmt.Sprintf("the result is from %d", id)
    }
    
    func FirstResponse() string{
        numOfRunner := 10
        ch := make(chan string, numOfRunner)  // 这里是 buffer channal
        for i:= 0 ; i < numOfRunner ; i++{
            go func(i int) {
                defer func() {
                    fmt.Println("over: ", i)
                }()
                ret := runTask(i)
                ch <- ret
            }(i)
        }
        return <- ch
    }
    
    func TestFirstResponse(t *testing.T)  {
        t.Log("Before:", runtime.NumGoroutine())  // 2
        t.Log(FirstResponse())  // the result is from 4
        time.Sleep(time.Second * 1)
        t.Log("After:", runtime.NumGoroutine())  // 2
    }
    

    等待所有任务完成

    • 这和上一个任务的要求是完全相反的,所以我们可以使用上面的例子
    • 当然,你也可以使用我们之前使用过的 sync. WaitGroup 来实现
    package all_response
    
    import (
        "fmt"
        "runtime"
        "testing"
        "time"
    )
    
    func runTask(id int) string{
        time.Sleep(time.Millisecond * 15)
        return fmt.Sprintf("the result is from %d", id)
    }
    
    func AllResponse() string{
        numOfRunner := 10
        ch := make(chan string, numOfRunner)
        for i:= 0 ; i < numOfRunner ; i++{
            go func(i int) {
                defer func() {
                    fmt.Println("over: ", i)
                }()
                ret := runTask(i)
                ch <- ret  // [1]
            }(i)
        }
    
        finalRet := ""
        for j := 0 ; j < numOfRunner ; j++{  // 注意,这里是串行的
            finalRet += <- ch + "\n"
        }
    
        return finalRet
    }
    
    func TestFirstResponse(t *testing.T)  {
        t.Log("Before:", runtime.NumGoroutine())  // 2
        t.Log(AllResponse())  // the result is from 4
        time.Sleep(time.Second * 1)
        t.Log("After:", runtime.NumGoroutine())  // 2
    }
    

    资源池

    • 我们可以使用 buffer channal 做一个资源池
    package obj_pool
    
    import (
        "errors"
        "time"
    )
    
    type ReusableObj struct {
        // 在这里定义 pool 中的对象
    }
    
    type ObjPool struct {
        bufChan chan *ReusableObj  // 使用一个 channal 存放 ReusableObj 的指针
    }
    
    func NewObjPool(numOfObj int) *ObjPool{
        objPool := ObjPool{}
        objPool.bufChan = make(chan *ReusableObj, numOfObj)
        for i := 0 ; i < numOfObj ; i++{
            objPool.bufChan <- &ReusableObj{}
        }
        return &objPool
    }
    
    func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error){
        select {
        case ret := <- p.bufChan :
            return ret, nil
        case <- time.After(timeout):
            return nil, errors.New("time out")
        }
    }
    
    func (p *ObjPool) ReleaseObj(obj *ReusableObj) error{
        select {
        case p.bufChan <- obj:
            return nil
        default:
            return errors.New("overflow")
        }
    }
    
    func TestObjPool(t *testing.T){
        pool := NewObjPool(10)  // 建立一个资源池,有 10 个资源对象
    
        if err := pool.ReleaseObj(&ReusableObj{}); err != nil{  // 尝试向满的资源池中放入新的资源
            t.Error(err)  // overflow
        }
    
        for i := 0 ; i < 11 ; i++{
            if v, err := pool.GetObj(time.Second * 1); err != nil {  // 注意,每次执行都会首先在 pool 中 get 一下
                t.Error(err)
            } else {  // 然后程序会执行下面的代码,这种方式你可能觉得有点奇怪,但这就是 go 的一大特色
                fmt.Printf("%T,%d\n", v, i)
                if err := pool.ReleaseObj(v); err != nil{
                    t.Error(err)
                }
            }
        }
        fmt.Println("Done")
    }
    

    sync.Pool(对象缓存)

    • sync.Pool 和我们常见的资源池是不同的,所以严格来说,它不可以作为资源池的实现

    1. 对象获取

    • Processor 包含两个部分: 私有对象 和 共享池
    • 注意:这里的 Processor 是 Go 协程中的一个概念
    • 私有对象是协程安全的,共享池是协程不安全的
    • 尝试从私有对象获取
    • 如果私有对象不存在,尝试从当前 Processor 的共享池获取
    • 如果共享池为空,会尝试去其他 Processor 的共享池获取
    • 如果所有池都是空的,最后会使用用户制定的 New 函数产生一个新的对象返回

    2.对象放回

    • 如果私有对象不存在,则保存为私有对象
    • 如果存在,放入当前 Processor 子池的共享池中

    3.使用方法

    image.png
    • New 为 Pool 中的一个变量,我们将这个变量定义为一个函数
    • 使用 Get 方法的时候使用了断言,以判断获取的数据的类型

    4.生命周期

    • GC 会清除 sync.pool 缓存的对象(在 1.13 之后,这个清除机制会保留上一次 GC 之后使用过的对象)
    • 所以你会发现,使用 sync.pool 可能会被 GC 处理,所以它不能作为资源池使用

    5.代码实例

    package sync_pool
    
    import (
        "fmt"
        "sync"
        "testing"
    )
    
    func TestSyncPool(t *testing.T)  {
        pool := &sync.Pool{
            New: func() interface{} {  // Pool 中的 New 变量,需要指定为一个函数,用于创建值
                fmt.Println("Create a new object.")
                return 100
            },
        }
    
        v := pool.Get().(int)  //  所有资源都为空,v 取到为 100
        fmt.Println(v)
        pool.Put(3)  // 放入一个 3
        //runtime.GC()
        v1, _ := pool.Get().(int)  // v1 为 3
        fmt.Println(v1)
        v2, _ := pool.Get().(int)  // v2 为 100
        fmt.Println(v2)
    }
    
    func TestSyncPoolInMultiGroutine(t *testing.T){
        pool := &sync.Pool{
            New: func() interface{}{
                fmt.Println("Creat a new object.")
                return 10
            },
        }
    
        pool.Put(100)
        pool.Put(100)
        pool.Put(100)
    
        var wg sync.WaitGroup
        for i := 0 ; i < 10 ; i++{
            wg.Add(1)
            go func(id int) {
                fmt.Println(pool.Get())  // 前三个为 100,后面的都是 10
                wg.Done()
            }(i)
        }
    
        wg.Wait()
    }
    

    6.总结

    • 适用于通过复用,降低复杂度对象的创建和 GC 代价
    • 会有锁的开销
    • 生命周期收到 GC 影响,不适合做连接池

    相关文章

      网友评论

          本文标题:7 - 并发编程

          本文链接:https://www.haomeiwen.com/subject/acguqktx.html