golang 推荐通过消息来共享内存,而不是通过共享内存来通信
channel 是一等公民可以作为函数参数,函数返回值
channel是goroutine 之间的通道因此chanel发出的数据必须有接受者,否则会deadlock
bufferedchanel make(chan int,3),大于缓冲区后会deadlock
发送方可以close() chanel
n,ok:=<-c 判断ok判断是不是关闭了
或者用range来收 会自动检测是不是被close了
package main
import (
"fmt"
"time"
)
func main() {
num := 10
ch := make(chan int, num)
for i := 0; i < num; i++ {
go func(i int) {
fmt.Println("th:", i)
time.Sleep(time.Second * time.Duration(i))
if i%2 == 0 {
ch <- i * 10
} else {
ch <- i * 100
}
}(i)
}
for i := 0; i < num; i++ {
fmt.Println(<-ch)
}
close(ch)
fmt.Println("ending")
}
WaitGroup对象不是一个引用类型,在通过函数传值的时候需要使用地址
main 包中的不同的文件的代码不能相互调用,其他包可以
增加并发数控制
package goroutinePool
import (
"log"
"sync"
)
type Task struct {
worker func(params ...interface{}) string
params []interface{}
}
type pool2 struct {
Results []string
poolSizeChan chan int
tasks []Task
wg *sync.WaitGroup
}
func NewPool2(pool2Size int) *pool2 {
var wg sync.WaitGroup
return &pool2{
Results: []string{},
poolSizeChan: make(chan int, pool2Size),
tasks: []Task{},
wg: &wg,
}
}
func (p *pool2) doWork(task Task) (res string) {
defer func() {
if err := recover(); err != nil {
log.Println(err)
res = "fail"
}
}()
return task.worker(task.params...)
}
func (p *pool2) executor(task Task) {
defer p.wg.Done()
//res := task.worker(task.params...)
res := p.doWork(task)
p.Results = append(p.Results, res)
log.Println("finished:", <-p.poolSizeChan)
}
func (p *pool2) SubmitTask(worker func(params ...interface{}) string, params []interface{}) {
task := Task{worker: worker, params: params}
p.tasks = append(p.tasks, task)
}
func (p *pool2) Start() {
p.wg.Add(len(p.tasks))
for i, task := range p.tasks {
p.poolSizeChan <- i
go p.executor(task)
}
p.wg.Wait()
close(p.poolSizeChan)
}
func main() {
num := 10
fromTable := "poi_hn_road"
tiles := datatodb.NTile(fromTable, "road_id", num, config.Db15450)
interfaceParams := goroutinePool.StringToInterface(tiles)
pool2 := goroutinePool.NewPool2(3)
for i, tile := range interfaceParams {
if i%2 == 0 {
pool2.SubmitTask(task1, tile)
} else {
pool2.SubmitTask(task2, tile)
}
}
pool2.Start()
}
package goroutinePool
import (
"fmt"
"sync"
)
type Pool struct {
ResultChan chan string
poolSizeChan chan int
worker func(params ...interface{}) string
params [][]interface{}
wg *sync.WaitGroup
}
func NewPool(poolSize int, worker func(params ...interface{}) string, params [][]interface{}) *Pool {
var wg sync.WaitGroup
wg.Add(len(params))
return &Pool{
ResultChan: make(chan string, len(params)),
poolSizeChan: make(chan int, poolSize),
worker: worker,
params: params,
wg: &wg,
}
}
func (p *Pool) executor(param ...interface{}) {
defer p.wg.Done()
res := p.worker(param...)
p.ResultChan <- res
fmt.Println("finished:", <-p.poolSizeChan)
}
func (p *Pool) DoWorkStart() {
for i, param := range p.params {
p.poolSizeChan <- i
go p.executor(param...)
}
p.wg.Wait()
close(p.poolSizeChan)
close(p.ResultChan)
}
func StringToInterface(params [][]string) [][]interface{} {
var paramsInterface [][]interface{}
for _, param := range params {
var tmp []interface{}
for _, j := range param {
tmp = append(tmp, j)
}
paramsInterface = append(paramsInterface, tmp)
}
return paramsInterface
}
package main
func task(params ...interface{}) string {
fmt.Println(params...)
time.Sleep(2 * time.Second)
return "s"
}
func main() {
num := 10
fromTable := "poi_hn_road"
tiles := datatodb.NTile(fromTable, "road_id", num, config.Db15450)
interfaceParams := goroutinePool.StringToInterface(tiles)
pool := goroutinePool.NewPool(2, datatodb.DataToDbWorker, interfaceParams)
pool.DoWorkStart()
for result := range pool.ResultChan {
fmt.Println(result)
}
}
网友评论