美文网首页
7 - 并发编程

7 - 并发编程

作者: 天命_风流 | 来源:发表于2020-07-05 20:42 被阅读0次

只执行一次

  • 最常见的场景是单例模式
  • 使用 sync.Once 可以实现单例模式
package singletion

import (
    "fmt"
    "sync"
    "testing"
    "unsafe"
)

type Singleton struct {
}

var singleInstance *Singleton
var once sync.Once

func GetSingletionObj() *Singleton{  // 使用这个函数,可以实现单例模式
    once.Do(func() {  // 似乎这段代码只会执行一次
        fmt.Println("Create Obj")
        singleInstance = new(Singleton)
    })
    return singleInstance
}

func TestGetSingleonObj(t *testing.T) {
    var wg = sync.WaitGroup{}
    for i := 0 ; i < 10 ; i++{
        wg.Add(1)
        go func() {
            obj := GetSingletionObj()
            fmt.Printf("%x\n", unsafe.Pointer(obj))
            wg.Done()
        }()
    }
    wg.Wait()
}

仅需任意任务完成

  • 在某些时候,我们开启了一堆任务,只要我们完成一个即可
  • 我们可以使用 channel 完成这个功能
package first_response

import (
    "fmt"
    "runtime"
    "testing"
    "time"
)

func runTask(id int) string{
    time.Sleep(time.Millisecond * 15)
    return fmt.Sprintf("the result is from %d", id)
}

func FirstResponse() string{
    numOfRunner := 10
    ch := make(chan string)
    for i:= 0 ; i < numOfRunner ; i++{
        go func(i int) {
            defer func() {
                fmt.Println("over: ", i)
            }()
            ret := runTask(i)
            ch <- ret  // [1]
        }(i)
    }
    return <- ch
}

func TestFirstResponse(t *testing.T)  {
    t.Log("Before:", runtime.NumGoroutine())  // 2
    t.Log(FirstResponse())  // the result is from 4
    time.Sleep(time.Second * 1)
    t.Log("After:", runtime.NumGoroutine())  // 11
}
  • 你会发现,由于 channal 是阻塞的(在[1]处),所以当一个任务运行完之后,其他 9 个任务依然在阻塞状态。
  • 这可能导致严重的内存泄漏问题
  • 使用 buffer channel,可以解决这个问题
package first_response

import (
    "fmt"
    "runtime"
    "testing"
    "time"
)

func runTask(id int) string{
    time.Sleep(time.Millisecond * 15)
    return fmt.Sprintf("the result is from %d", id)
}

func FirstResponse() string{
    numOfRunner := 10
    ch := make(chan string, numOfRunner)  // 这里是 buffer channal
    for i:= 0 ; i < numOfRunner ; i++{
        go func(i int) {
            defer func() {
                fmt.Println("over: ", i)
            }()
            ret := runTask(i)
            ch <- ret
        }(i)
    }
    return <- ch
}

func TestFirstResponse(t *testing.T)  {
    t.Log("Before:", runtime.NumGoroutine())  // 2
    t.Log(FirstResponse())  // the result is from 4
    time.Sleep(time.Second * 1)
    t.Log("After:", runtime.NumGoroutine())  // 2
}

等待所有任务完成

  • 这和上一个任务的要求是完全相反的,所以我们可以使用上面的例子
  • 当然,你也可以使用我们之前使用过的 sync. WaitGroup 来实现
package all_response

import (
    "fmt"
    "runtime"
    "testing"
    "time"
)

func runTask(id int) string{
    time.Sleep(time.Millisecond * 15)
    return fmt.Sprintf("the result is from %d", id)
}

func AllResponse() string{
    numOfRunner := 10
    ch := make(chan string, numOfRunner)
    for i:= 0 ; i < numOfRunner ; i++{
        go func(i int) {
            defer func() {
                fmt.Println("over: ", i)
            }()
            ret := runTask(i)
            ch <- ret  // [1]
        }(i)
    }

    finalRet := ""
    for j := 0 ; j < numOfRunner ; j++{  // 注意,这里是串行的
        finalRet += <- ch + "\n"
    }

    return finalRet
}

func TestFirstResponse(t *testing.T)  {
    t.Log("Before:", runtime.NumGoroutine())  // 2
    t.Log(AllResponse())  // the result is from 4
    time.Sleep(time.Second * 1)
    t.Log("After:", runtime.NumGoroutine())  // 2
}

资源池

  • 我们可以使用 buffer channal 做一个资源池
package obj_pool

import (
    "errors"
    "time"
)

type ReusableObj struct {
    // 在这里定义 pool 中的对象
}

type ObjPool struct {
    bufChan chan *ReusableObj  // 使用一个 channal 存放 ReusableObj 的指针
}

func NewObjPool(numOfObj int) *ObjPool{
    objPool := ObjPool{}
    objPool.bufChan = make(chan *ReusableObj, numOfObj)
    for i := 0 ; i < numOfObj ; i++{
        objPool.bufChan <- &ReusableObj{}
    }
    return &objPool
}

func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error){
    select {
    case ret := <- p.bufChan :
        return ret, nil
    case <- time.After(timeout):
        return nil, errors.New("time out")
    }
}

func (p *ObjPool) ReleaseObj(obj *ReusableObj) error{
    select {
    case p.bufChan <- obj:
        return nil
    default:
        return errors.New("overflow")
    }
}

func TestObjPool(t *testing.T){
    pool := NewObjPool(10)  // 建立一个资源池,有 10 个资源对象

    if err := pool.ReleaseObj(&ReusableObj{}); err != nil{  // 尝试向满的资源池中放入新的资源
        t.Error(err)  // overflow
    }

    for i := 0 ; i < 11 ; i++{
        if v, err := pool.GetObj(time.Second * 1); err != nil {  // 注意,每次执行都会首先在 pool 中 get 一下
            t.Error(err)
        } else {  // 然后程序会执行下面的代码,这种方式你可能觉得有点奇怪,但这就是 go 的一大特色
            fmt.Printf("%T,%d\n", v, i)
            if err := pool.ReleaseObj(v); err != nil{
                t.Error(err)
            }
        }
    }
    fmt.Println("Done")
}

sync.Pool(对象缓存)

  • sync.Pool 和我们常见的资源池是不同的,所以严格来说,它不可以作为资源池的实现

1. 对象获取

  • Processor 包含两个部分: 私有对象 和 共享池
  • 注意:这里的 Processor 是 Go 协程中的一个概念
  • 私有对象是协程安全的,共享池是协程不安全的
  • 尝试从私有对象获取
  • 如果私有对象不存在,尝试从当前 Processor 的共享池获取
  • 如果共享池为空,会尝试去其他 Processor 的共享池获取
  • 如果所有池都是空的,最后会使用用户制定的 New 函数产生一个新的对象返回

2.对象放回

  • 如果私有对象不存在,则保存为私有对象
  • 如果存在,放入当前 Processor 子池的共享池中

3.使用方法

image.png
  • New 为 Pool 中的一个变量,我们将这个变量定义为一个函数
  • 使用 Get 方法的时候使用了断言,以判断获取的数据的类型

4.生命周期

  • GC 会清除 sync.pool 缓存的对象(在 1.13 之后,这个清除机制会保留上一次 GC 之后使用过的对象)
  • 所以你会发现,使用 sync.pool 可能会被 GC 处理,所以它不能作为资源池使用

5.代码实例

package sync_pool

import (
    "fmt"
    "sync"
    "testing"
)

func TestSyncPool(t *testing.T)  {
    pool := &sync.Pool{
        New: func() interface{} {  // Pool 中的 New 变量,需要指定为一个函数,用于创建值
            fmt.Println("Create a new object.")
            return 100
        },
    }

    v := pool.Get().(int)  //  所有资源都为空,v 取到为 100
    fmt.Println(v)
    pool.Put(3)  // 放入一个 3
    //runtime.GC()
    v1, _ := pool.Get().(int)  // v1 为 3
    fmt.Println(v1)
    v2, _ := pool.Get().(int)  // v2 为 100
    fmt.Println(v2)
}

func TestSyncPoolInMultiGroutine(t *testing.T){
    pool := &sync.Pool{
        New: func() interface{}{
            fmt.Println("Creat a new object.")
            return 10
        },
    }

    pool.Put(100)
    pool.Put(100)
    pool.Put(100)

    var wg sync.WaitGroup
    for i := 0 ; i < 10 ; i++{
        wg.Add(1)
        go func(id int) {
            fmt.Println(pool.Get())  // 前三个为 100,后面的都是 10
            wg.Done()
        }(i)
    }

    wg.Wait()
}

6.总结

  • 适用于通过复用,降低复杂度对象的创建和 GC 代价
  • 会有锁的开销
  • 生命周期收到 GC 影响,不适合做连接池

相关文章

网友评论

      本文标题:7 - 并发编程

      本文链接:https://www.haomeiwen.com/subject/acguqktx.html