美文网首页
golang并发任务 - fan-out&fan-in

golang并发任务 - fan-out&fan-in

作者: wyfwo | 来源:发表于2019-03-24 17:55 被阅读0次

问题

开发过程中,有批量处理的逻辑,依赖下游一同的接口(这个接口耗时感人,400 ~ 600ms),如果串行执行批量操作的话。。emmm。。所以想法就是尽量减少耗时,并行执行~

思路&解法

golang在这个方面,天然有优势。写了个通用的小工具(代码如下),并行执行任务,其中,ProcFunc是任务的具体执行逻辑,ProCtx是任务的上下文结构体,一个生产者向生产队列生产上下文,paralNum个goroutine作为worker并行执行任务,worker执行完后向consum队列返回上下文(包含输出和err信息),主线程获取执行完的结果。如果超时任务将会通过goCtx被取消。

package common

import (
    "context"
    "time"
)

// 处理函数定义,根据自己需求变动
type ProcFunc func(p interface{}) (interface{}, error)

type ProCtx struct {
    In     interface{}
    DoFunc ProcFunc
    Out    interface{}
    err    error
}

func Produce(proChan chan *ProCtx, items []*ProCtx) {
    for _, item := range items {
        proChan <- item
    }
}

func Worker(goCtx context.Context, proChan chan *ProCtx, consumChan chan *ProCtx) {
    for {
        select {
        case item := <-proChan:
            item.Out, item.err = item.DoFunc(item.In)
            consumChan <- item
        case <-goCtx.Done():
            // 任务被父线程取消了
            //println("任务取消!!!!!!")
            return
        }
    }
}

/*
ps:执行任务的上下文
timeOut:超时时长,单位毫秒
paralNum:并行度
*/
func ParalProc(ps []*ProCtx, timeOut time.Duration, paralNum int) []*ProCtx {
    n := len(ps)
    ret := []*ProCtx{}
    pChan := make(chan *ProCtx, n)
    cChan := make(chan *ProCtx, n)
    timeOutChan := time.After(timeOut)
    contex, cancel := context.WithCancel(context.Background())
    go Produce(pChan, ps)
    for i := 0; i < paralNum; i++ {
        go Worker(contex, pChan, cChan)
    }
L:
    for i := 0; i < n; i++ {
        select {
        case item := <-cChan:
            ret = append(ret, item)
        case <-timeOutChan:
            // 超时取消子goroutine的任务
            //println("################超时啦#################")
            cancel()
            break L
        }
    }
        cancel() // 注:这里之前漏掉了,导致了内存泄漏。。。
    return ret
}

使用示例

package main

import (
    "errors"
    "fanoutfanin/paral"
    "math/rand"
    "time"
)

type A struct {
    Num int
}

func TestProc(p interface{}) (interface{}, error) {
    a, ok := p.(A)
    if !ok {
        return nil, errors.New("type wrong!!!!!!!!!")
    }
    //time.Sleep(time.Second * time.Duration(10))
    time.Sleep(time.Second * time.Duration(rand.Intn(10)))
    println(" -> ", a.Num)
    return a.Num * a.Num, nil
}

func main() {
    n := 100
    ps := []*paral.ProCtx{}

    for i := 0; i < n; i++ {
        ps = append(ps, &paral.ProCtx{
            In:     A{i},
            DoFunc: TestProc,
        })
    }

    ret := paral.ParalProc(ps, time.Second*time.Duration(1), len(ps))

    for _, a := range ret {
        println(a.Out.(int))
    }
    println("**********************", len(ret))

    time.Sleep(time.Second * time.Duration(rand.Intn(5)))

}

改进点

  • 某些场景能搞个线程池是最好的(虽然go一下就好,但是可以装13,哈哈哈)
  • 无缓冲的chan也是可以的

相关文章

网友评论

      本文标题:golang并发任务 - fan-out&fan-in

      本文链接:https://www.haomeiwen.com/subject/slouvqtx.html