goroutine

作者: hehehehe | 来源:发表于2022-02-11 11:18 被阅读0次

golang 推荐通过消息来共享内存,而不是通过共享内存来通信

channel 是一等公民可以作为函数参数,函数返回值

channel是goroutine 之间的通道因此chanel发出的数据必须有接受者,否则会deadlock

bufferedchanel make(chan int,3),大于缓冲区后会deadlock
发送方可以close() chanel
n,ok:=<-c 判断ok判断是不是关闭了
或者用range来收 会自动检测是不是被close了

package main

import (
    "fmt"
    "time"
)

func main() {
    num := 10
    ch := make(chan int, num)
    for i := 0; i < num; i++ {
        go func(i int) {
            fmt.Println("th:", i)
            time.Sleep(time.Second * time.Duration(i))
            if i%2 == 0 {
                ch <- i * 10
            } else {
                ch <- i * 100
            }
        }(i)
    }

    for i := 0; i < num; i++ {
        fmt.Println(<-ch)
    }
    close(ch)

    fmt.Println("ending")

}

WaitGroup对象不是一个引用类型,在通过函数传值的时候需要使用地址
main 包中的不同的文件的代码不能相互调用,其他包可以
增加并发数控制

package goroutinePool

import (
    "log"
    "sync"
)

type Task struct {
    worker func(params ...interface{}) string
    params []interface{}
}

type pool2 struct {
    Results      []string
    poolSizeChan chan int
    tasks        []Task
    wg           *sync.WaitGroup
}

func NewPool2(pool2Size int) *pool2 {
    var wg sync.WaitGroup
    return &pool2{
        Results:      []string{},
        poolSizeChan: make(chan int, pool2Size),
        tasks:        []Task{},
        wg:           &wg,
    }
}
func (p *pool2) doWork(task Task) (res string) {
    defer func() {
        if err := recover(); err != nil {
            log.Println(err)
            res = "fail"
        }
    }()
    return task.worker(task.params...)
}

func (p *pool2) executor(task Task) {
    defer p.wg.Done()
    //res := task.worker(task.params...)
    res := p.doWork(task)
    p.Results = append(p.Results, res)
    log.Println("finished:", <-p.poolSizeChan)
}

func (p *pool2) SubmitTask(worker func(params ...interface{}) string, params []interface{}) {
    task := Task{worker: worker, params: params}
    p.tasks = append(p.tasks, task)
}

func (p *pool2) Start() {
    p.wg.Add(len(p.tasks))
    for i, task := range p.tasks {
        p.poolSizeChan <- i
        go p.executor(task)
    }
    p.wg.Wait()
    close(p.poolSizeChan)
}


func main() {
    num := 10
    fromTable := "poi_hn_road"
    tiles := datatodb.NTile(fromTable, "road_id", num, config.Db15450)
    interfaceParams := goroutinePool.StringToInterface(tiles)
    pool2 := goroutinePool.NewPool2(3)
    for i, tile := range interfaceParams {
        if i%2 == 0 {
            pool2.SubmitTask(task1, tile)
        } else {
            pool2.SubmitTask(task2, tile)
        }
    }
    pool2.Start()

}
package goroutinePool

import (
    "fmt"
    "sync"
)

type Pool struct {
    ResultChan   chan string
    poolSizeChan chan int
    worker       func(params ...interface{}) string
    params       [][]interface{}
    wg           *sync.WaitGroup
}

func NewPool(poolSize int, worker func(params ...interface{}) string, params [][]interface{}) *Pool {
    var wg sync.WaitGroup
    wg.Add(len(params))
    return &Pool{
        ResultChan:   make(chan string, len(params)),
        poolSizeChan: make(chan int, poolSize),
        worker:       worker,
        params:       params,
        wg:           &wg,
    }
}

func (p *Pool) executor(param ...interface{}) {
    defer p.wg.Done()
    res := p.worker(param...)
    p.ResultChan <- res
    fmt.Println("finished:", <-p.poolSizeChan)
}

func (p *Pool) DoWorkStart() {
    for i, param := range p.params {
        p.poolSizeChan <- i
        go p.executor(param...)
    }
    p.wg.Wait()
    close(p.poolSizeChan)
    close(p.ResultChan)
}

func StringToInterface(params [][]string) [][]interface{} {
    var paramsInterface [][]interface{}
    for _, param := range params {
        var tmp []interface{}
        for _, j := range param {
            tmp = append(tmp, j)
        }
        paramsInterface = append(paramsInterface, tmp)
    }
    return paramsInterface
}

package main

func task(params ...interface{}) string {
    fmt.Println(params...)
    time.Sleep(2 * time.Second)
    return "s"
}

func main() {
    num := 10
    fromTable := "poi_hn_road"
    tiles := datatodb.NTile(fromTable, "road_id", num, config.Db15450)
    interfaceParams := goroutinePool.StringToInterface(tiles)
    pool := goroutinePool.NewPool(2, datatodb.DataToDbWorker, interfaceParams)
    pool.DoWorkStart()
    for result := range pool.ResultChan {
        fmt.Println(result)
    }

}

相关文章

网友评论

      本文标题:goroutine

      本文链接:https://www.haomeiwen.com/subject/zuuakrtx.html