package main
import (
"fmt"
"sync"
)
var (
finish bool
ch = make(chan int, 10)
)
func main() {
//test01()
test02()
}
func test01() {
var wgp sync.WaitGroup
var wgc sync.WaitGroup
wgp.Add(3)
go producer(&wgp, 10)
go producer(&wgp, 20)
go producer(&wgp, 30)
//
wgc.Add(5)
go consumer(&wgc, 1)
go consumer(&wgc, 2)
go consumer(&wgc, 3)
go consumer(&wgc, 4)
go consumer(&wgc, 5)
wgp.Wait()
finish = true
wgc.Wait()
}
func producer(wg *sync.WaitGroup, num int) {
defer wg.Done()
for i := 1; i <= 10; i++ {
ch <- i*num + 1
}
}
func consumer(wg *sync.WaitGroup, num int) {
defer wg.Done()
for !finish {
data := <-ch
fmt.Printf("num:%d data:%d\n", num, data)
}
}
用循环队列的方式
package main
import (
"fmt"
"sync"
)
type Queue struct {
Cache []int
head int
tail int
finish bool
lock sync.RWMutex
}
func newQueue(k int) *Queue {
return &Queue{
make([]int, k),
0,
0,
false,
sync.RWMutex{},
}
}
func (q *Queue) Put(x int) {
if (q.tail + 1) != q.head {
q.Cache[q.tail] = x
q.tail++
if q.tail >= len(q.Cache)-1 {
q.tail = 0
}
}
}
func (q *Queue) Get() int {
if q.head == q.tail {
return -1
}
x := q.Cache[q.head]
q.head++
if q.head >= len(q.Cache)-1 {
q.head = 0
}
return x
}
func test02() {
var wgp sync.WaitGroup
var wgc sync.WaitGroup
queue := newQueue(20)
go queue.pro(&wgp, 1)
go queue.pro(&wgp, 2)
go queue.pro(&wgp, 3)
wgp.Add(3)
go queue.cus(&wgc, 1)
go queue.cus(&wgc, 2)
go queue.cus(&wgc, 3)
go queue.cus(&wgc, 4)
go queue.cus(&wgc, 5)
wgc.Add(5)
wgp.Wait()
queue.finish = true
wgc.Wait()
}
func (q *Queue) pro(wg *sync.WaitGroup, num int) {
defer wg.Done()
for i := 0; i < 10; i++ {
q.lock.Lock()
q.Put(num*10 + i)
q.lock.Unlock()
fmt.Println("p", num, ":", num*10+i)
}
}
func (q *Queue) cus(wg *sync.WaitGroup, num int) {
defer wg.Done()
for !q.finish {
i := -1
q.lock.RLock()
if i = q.Get(); i == -1 {
q.lock.RUnlock()
continue
}
q.lock.RUnlock()
fmt.Println("c", num, ":", i)
}
}
网友评论