只执行一次
- 最常见的场景是单例模式
- 使用 sync.Once 可以实现单例模式
package singletion
import (
"fmt"
"sync"
"testing"
"unsafe"
)
type Singleton struct {
}
var singleInstance *Singleton
var once sync.Once
func GetSingletionObj() *Singleton{ // 使用这个函数,可以实现单例模式
once.Do(func() { // 似乎这段代码只会执行一次
fmt.Println("Create Obj")
singleInstance = new(Singleton)
})
return singleInstance
}
func TestGetSingleonObj(t *testing.T) {
var wg = sync.WaitGroup{}
for i := 0 ; i < 10 ; i++{
wg.Add(1)
go func() {
obj := GetSingletionObj()
fmt.Printf("%x\n", unsafe.Pointer(obj))
wg.Done()
}()
}
wg.Wait()
}
仅需任意任务完成
- 在某些时候,我们开启了一堆任务,只要我们完成一个即可
- 我们可以使用 channel 完成这个功能
package first_response
import (
"fmt"
"runtime"
"testing"
"time"
)
func runTask(id int) string{
time.Sleep(time.Millisecond * 15)
return fmt.Sprintf("the result is from %d", id)
}
func FirstResponse() string{
numOfRunner := 10
ch := make(chan string)
for i:= 0 ; i < numOfRunner ; i++{
go func(i int) {
defer func() {
fmt.Println("over: ", i)
}()
ret := runTask(i)
ch <- ret // [1]
}(i)
}
return <- ch
}
func TestFirstResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine()) // 2
t.Log(FirstResponse()) // the result is from 4
time.Sleep(time.Second * 1)
t.Log("After:", runtime.NumGoroutine()) // 11
}
- 你会发现,由于 channal 是阻塞的(在[1]处),所以当一个任务运行完之后,其他 9 个任务依然在阻塞状态。
- 这可能导致严重的内存泄漏问题
- 使用 buffer channel,可以解决这个问题
package first_response
import (
"fmt"
"runtime"
"testing"
"time"
)
func runTask(id int) string{
time.Sleep(time.Millisecond * 15)
return fmt.Sprintf("the result is from %d", id)
}
func FirstResponse() string{
numOfRunner := 10
ch := make(chan string, numOfRunner) // 这里是 buffer channal
for i:= 0 ; i < numOfRunner ; i++{
go func(i int) {
defer func() {
fmt.Println("over: ", i)
}()
ret := runTask(i)
ch <- ret
}(i)
}
return <- ch
}
func TestFirstResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine()) // 2
t.Log(FirstResponse()) // the result is from 4
time.Sleep(time.Second * 1)
t.Log("After:", runtime.NumGoroutine()) // 2
}
等待所有任务完成
- 这和上一个任务的要求是完全相反的,所以我们可以使用上面的例子
- 当然,你也可以使用我们之前使用过的 sync. WaitGroup 来实现
package all_response
import (
"fmt"
"runtime"
"testing"
"time"
)
func runTask(id int) string{
time.Sleep(time.Millisecond * 15)
return fmt.Sprintf("the result is from %d", id)
}
func AllResponse() string{
numOfRunner := 10
ch := make(chan string, numOfRunner)
for i:= 0 ; i < numOfRunner ; i++{
go func(i int) {
defer func() {
fmt.Println("over: ", i)
}()
ret := runTask(i)
ch <- ret // [1]
}(i)
}
finalRet := ""
for j := 0 ; j < numOfRunner ; j++{ // 注意,这里是串行的
finalRet += <- ch + "\n"
}
return finalRet
}
func TestFirstResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine()) // 2
t.Log(AllResponse()) // the result is from 4
time.Sleep(time.Second * 1)
t.Log("After:", runtime.NumGoroutine()) // 2
}
资源池
- 我们可以使用 buffer channal 做一个资源池
package obj_pool
import (
"errors"
"time"
)
type ReusableObj struct {
// 在这里定义 pool 中的对象
}
type ObjPool struct {
bufChan chan *ReusableObj // 使用一个 channal 存放 ReusableObj 的指针
}
func NewObjPool(numOfObj int) *ObjPool{
objPool := ObjPool{}
objPool.bufChan = make(chan *ReusableObj, numOfObj)
for i := 0 ; i < numOfObj ; i++{
objPool.bufChan <- &ReusableObj{}
}
return &objPool
}
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error){
select {
case ret := <- p.bufChan :
return ret, nil
case <- time.After(timeout):
return nil, errors.New("time out")
}
}
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error{
select {
case p.bufChan <- obj:
return nil
default:
return errors.New("overflow")
}
}
func TestObjPool(t *testing.T){
pool := NewObjPool(10) // 建立一个资源池,有 10 个资源对象
if err := pool.ReleaseObj(&ReusableObj{}); err != nil{ // 尝试向满的资源池中放入新的资源
t.Error(err) // overflow
}
for i := 0 ; i < 11 ; i++{
if v, err := pool.GetObj(time.Second * 1); err != nil { // 注意,每次执行都会首先在 pool 中 get 一下
t.Error(err)
} else { // 然后程序会执行下面的代码,这种方式你可能觉得有点奇怪,但这就是 go 的一大特色
fmt.Printf("%T,%d\n", v, i)
if err := pool.ReleaseObj(v); err != nil{
t.Error(err)
}
}
}
fmt.Println("Done")
}
sync.Pool(对象缓存)
- sync.Pool 和我们常见的资源池是不同的,所以严格来说,它不可以作为资源池的实现
1. 对象获取
- Processor 包含两个部分: 私有对象 和 共享池
- 注意:这里的 Processor 是 Go 协程中的一个概念
- 私有对象是协程安全的,共享池是协程不安全的
- 尝试从私有对象获取
- 如果私有对象不存在,尝试从当前 Processor 的共享池获取
- 如果共享池为空,会尝试去其他 Processor 的共享池获取
- 如果所有池都是空的,最后会使用用户制定的 New 函数产生一个新的对象返回
2.对象放回
- 如果私有对象不存在,则保存为私有对象
- 如果存在,放入当前 Processor 子池的共享池中
3.使用方法
image.png
- New 为 Pool 中的一个变量,我们将这个变量定义为一个函数
- 使用 Get 方法的时候使用了断言,以判断获取的数据的类型
4.生命周期
- GC 会清除 sync.pool 缓存的对象(在 1.13 之后,这个清除机制会保留上一次 GC 之后使用过的对象)
- 所以你会发现,使用 sync.pool 可能会被 GC 处理,所以它不能作为资源池使用
5.代码实例
package sync_pool
import (
"fmt"
"sync"
"testing"
)
func TestSyncPool(t *testing.T) {
pool := &sync.Pool{
New: func() interface{} { // Pool 中的 New 变量,需要指定为一个函数,用于创建值
fmt.Println("Create a new object.")
return 100
},
}
v := pool.Get().(int) // 所有资源都为空,v 取到为 100
fmt.Println(v)
pool.Put(3) // 放入一个 3
//runtime.GC()
v1, _ := pool.Get().(int) // v1 为 3
fmt.Println(v1)
v2, _ := pool.Get().(int) // v2 为 100
fmt.Println(v2)
}
func TestSyncPoolInMultiGroutine(t *testing.T){
pool := &sync.Pool{
New: func() interface{}{
fmt.Println("Creat a new object.")
return 10
},
}
pool.Put(100)
pool.Put(100)
pool.Put(100)
var wg sync.WaitGroup
for i := 0 ; i < 10 ; i++{
wg.Add(1)
go func(id int) {
fmt.Println(pool.Get()) // 前三个为 100,后面的都是 10
wg.Done()
}(i)
}
wg.Wait()
}
6.总结
- 适用于通过复用,降低复杂度对象的创建和 GC 代价
- 会有锁的开销
- 生命周期收到 GC 影响,不适合做连接池
网友评论