小工具
并发场景用一些经历过生产检验的小工具,比自己临时磨刀要安全,go-zero的core里面很多他们自己写好的小工具,没啥依赖,你要觉得好完全可以copy到你自己的项目里面使用,真香。
并发下多个 goroutine ,任意一个返回error,则拿到error并finish不在阻塞
更多使用魔法https://github.com/tal-tech/zero-doc/blob/main/doc/mapreduce.md
package mr
import (
"errors"
"io/ioutil"
"log"
"runtime"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/stringx"
"github.com/tal-tech/go-zero/core/syncx"
)
//调用链路 Finish -> MapReduceVoid-> MapReduce -> MapReduceWithSource->executeMappers
// 多个goroutine 有一个返回error,如果还没有执行的func就会不执行了,已经执行了的,业务就不管它了,也不会去等待它返回
func TestFinish(t *testing.T) {
var total uint32
err := Finish(func() error {
atomic.AddUint32(&total, 2)
err := errors.New("aaaa")
return err
}, func() error {
atomic.AddUint32(&total, 3)
return nil
}, func() error {
atomic.AddUint32(&total, 5)
return nil
})
assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
assert.Nil(t, err)
}
并发场景下的func 串的方式执行
- 当一个func被并发调用,只会有一个goroutine拿到锁执行,该func在执行期间,并发的调用行为都不会真正执行,直接返回结果,但该func执行完毕后,又来了并发调用则又走以上流程。 适用与突增并发类型,用于保证该func同时执行的次数为1
举例:func执行花了1秒,期间该函数被并发调用了1000次,则该func在这1S调用的1000次里面实际只执行1次;但是如果你这1000次是每隔1s调用一次,则该函数就会被真正执行1000次。内部用到了lock
https://github.com/tal-tech/zero-doc/blob/main/doc/sharedcalls.md
package syncx
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
)
//值得注意的是,不是分布式控制的,只能达到去重目的,并不能保证真的只执行一次,因为源代码里面有个delete掉key的行为
// 可能是作者的思想是只保证fn()在执行期间来的并发就不需要执行,直接返回结果
// 但一开始我还以为是只执行一次
func TestExclusiveCallDo(t *testing.T) {
g := NewSharedCalls()
v, err := g.Do("key", func() (interface{}, error) {
return "bar", nil
})
if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
t.Errorf("Do = %v; want %v", got, want)
}
if err != nil {
t.Errorf("Do error = %v", err)
}
}
// 这个这样子写 能保证只执行一次,但其实时伪保证,是在故意加大fn执行时间
func TestExclusiveCallDoDupSuppress(t *testing.T) {
g := NewSharedCalls()
c := make(chan string)
var calls int32
fn := func() (interface{}, error) {
atomic.AddInt32(&calls, 1)
return <-c, nil
}
const n = 10
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
v, err := g.Do("key", fn)
if err != nil {
t.Errorf("Do error: %v", err)
}
if v.(string) != "bar" {
t.Errorf("got %q; want %q", v, "bar")
}
wg.Done()
}()
}
time.Sleep(100 * time.Millisecond) // let goroutines above block
c <- "bar"
wg.Wait()
if got := atomic.LoadInt32(&calls); got != 1 {
t.Errorf("number of calls = %d; want 1", got)
}
}
生成只执行一次的函数
package syncx
import (
"fmt"
"github.com/stretchr/testify/assert"
"testing"
"time"
"sync"
)
func Once(fn func()) func() {
once := new(sync.Once)
return func() {
once.Do(fn)
}
}
//只打印一条内容,因为add只执行一次
func TestOnce(t *testing.T) {
var v int
add := Once(func() {
v++
})
for i := 0; i < 5; i++ {
add()
}
fmt.Printf("--- 执行次数 ---")
time.Sleep(time.Second)
}
让函数触发多次只执行一次,并能拿到返回结果,非并发安全
package syncx
import (
"sync"
"time"
"sync/atomic"
"github.com/tal-tech/go-zero/core/timex"
)
const defaultRefreshInterval = time.Second
type (
ImmutableResourceOption func(resource *ImmutableResource)
ImmutableResource struct {
fetch func() (interface{}, error)
resource interface{}
err error
lock sync.RWMutex
refreshInterval time.Duration
lastTime *AtomicDuration
}
)
func NewImmutableResource(fn func() (interface{}, error), opts ...ImmutableResourceOption) *ImmutableResource {
// cannot use executors.LessExecutor because of cycle imports
ir := ImmutableResource{
fetch: fn,
refreshInterval: defaultRefreshInterval,
lastTime: NewAtomicDuration(),
}
for _, opt := range opts {
opt(&ir)
}
return &ir
}
func (ir *ImmutableResource) Get() (interface{}, error) {
ir.lock.RLock()
resource := ir.resource
ir.lock.RUnlock()
if resource != nil {
return resource, nil
}
ir.maybeRefresh(func() {
res, err := ir.fetch()
ir.lock.Lock()
if err != nil {
ir.err = err
} else {
ir.resource, ir.err = res, nil
}
ir.lock.Unlock()
})
ir.lock.RLock()
resource, err := ir.resource, ir.err
ir.lock.RUnlock()
return resource, err
}
func (ir *ImmutableResource) maybeRefresh(execute func()) {
now := timex.Now()
lastTime := ir.lastTime.Load()
if lastTime == 0 || lastTime+ir.refreshInterval < now {
ir.lastTime.Set(now)
execute()
}
}
// Set interval to 0 to enforce refresh every time if not succeeded. default is time.Second.
func WithRefreshIntervalOnFailure(interval time.Duration) ImmutableResourceOption {
return func(resource *ImmutableResource) {
resource.refreshInterval = interval
}
}
type AtomicDuration int64
func NewAtomicDuration() *AtomicDuration {
return new(AtomicDuration)
}
func ForAtomicDuration(val time.Duration) *AtomicDuration {
d := NewAtomicDuration()
d.Set(val)
return d
}
func (d *AtomicDuration) CompareAndSwap(old, val time.Duration) bool {
return atomic.CompareAndSwapInt64((*int64)(d), int64(old), int64(val))
}
func (d *AtomicDuration) Load() time.Duration {
return time.Duration(atomic.LoadInt64((*int64)(d)))
}
func (d *AtomicDuration) Set(val time.Duration) {
atomic.StoreInt64((*int64)(d), int64(val))
}
//
func main(){
var count int
r := NewImmutableResource(func() (interface{}, error) {
count++
time.Sleep(time.Millisecond*1) //这里sleep一下并发就不能保证只执行一次了
return "hello", nil
})
for i:=0;i<100;i++{
go func() {
res, err := r.Get() //就需要你在这里的Get加锁后再执行了
assert.Equal(t, "hello", res)
assert.Equal(t, 1, count)
assert.Nil(t, err)
}()
}
time.Sleep(time.Second * 1)
// again
res, err := r.Get()
}
网友评论