先谈golang
package main
import (
"fmt"
"time"
)
//ex one
func worker(id int, c chan int) {
for {
fmt.Printf("is %d,come %c\n", id, <-c)
}
}
//ex two return is a chan
func createworker(id int) chan int {
c := make(chan int)
go func() {
for {
fmt.Printf("is %d,come %c\n", id, <-c)
}
}()
return c
}
func chanDemo() {
var channels [10]chan int
//ex one
for i := 0; i < 10; i++ {
channels[i] = make(chan int)
go worker(i, channels[i])
channels[i] <- 'a' + i
channels[i] <- 'A' + i
}
//extwo
for i := 0; i < 10; i++ {
channels[i] = createworker(i)
}
for i := 0; i < 10; i++ {
channels[i] <- 'a' + i
}
for i := 0; i < 10; i++ {
channels[i] <- 'B' + i
}
for i := 0; i < 10; i++ {
channels[i] <- 'A' + i
}
//like swoole co::sleep()
time.Sleep(time.Microsecond)
}
func workerTwo(id int, c chan string) {
//use your channal stop output
for n := range c {
fmt.Printf("is %d,come %s\n", id, n)
}
}
//like swoole chan(3)
func bufferChannel() {
c := make(chan string, 3)
go workerTwo(0, c)
// input your channel
c <- "苹果皮"
c <- "豆腐皮"
c <- "香蕉皮"
close(c)
// like swoole co::sleep()
time.Sleep(time.Microsecond)
}
func main() {
chanDemo()
bufferChannel()
}
再看swoole
/**
* 创建一个主协程
*/
const NUM = 1; //总执行次数
$c = new chan(600); //定义管道
// 记录开始时间
$StartTime = time();
const MaxCoroutine = 10; //默认3000
Swoole\Coroutine::set(array(
'max_coroutine' => MaxCoroutine,
));
go(function () use ($c) {
/**
* 连接数据库
*/
$swoole_mysql = new Swoole\Coroutine\MySQL();
$res = $swoole_mysql->connect([
'host' => '127.0.0.1',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'demo',
'fetch_mode' => true, //fetch 要开启这个
]);
if ($res == false) {
// log($res);
echo "MYSQL-CONNECT-ERROR" . PHP_EOL;
return;
}
/**
* 调用pop防止 堵塞 子协程
*/
go(function () use ($c, $swoole_mysql) {
popChan($c, $swoole_mysql);
});
/**
* 调用push 子协程
*/
go(function () use ($c) {
pushChan($c);
});
});
/**
* 调用pop
*
* @param [type] $c
* @param [type] $swoole_mysql
* @return void
*/
function popChan(&$c, &$swoole_mysql)
{
$n = 0;
$stm = $swoole_mysql->prepare('insert into `cate` (`id`,`name`) value(?,?)');
if (false == $stm) {
// log("预处理错误的原因:" . $swoole_mysql->erron . ":" . $swoole_mysql->error);
consolelog("mysql 添加失败" . $swoole_mysql->error);
die();
}
while (true) {
consolelog("wite-begin-insert:" . date('Y-m-d H:i:s') . ":" . $n);
//consolelog("insert-data:".json_encode($c->pop()));
$n++;
consolelog("begin-insert:" . date('Y-m-d H:i:s') . ":" . $n);
$ret = $stm->execute([null, json_encode($c->pop())]);
if (false == $ret) {
// log("添加错误的原因:" . $swoole_mysql->erron . ":" . $swoole_mysql->error);
//consolelog('insert-error'. $swoole_mysql->erron() . ":" . $swoole_mysql->error());
$n--; //回退
$c->push($c->pop());
}
if (NUM == $n) {
consolelog('End at : ' . date('Y-m-d H:i:s') . PHP_EOL
. ' 耗时: ' . (time() - $GLOBALS['StartTime'])
. 'S 速率: ' . round($n / (time() - $GLOBALS['StartTime']), 2) . '/S');
return;
}
}
}
/**
* push
*/
function pushChan(&$c)
{
for ($i = 0; $i < NUM; $i++) {
/**
* 以防万一
*/
while ($c->isFull() || (Swoole\Coroutine::stats())['coroutine_num'] > MaxCoroutine - 5) {
consolelog("sleep:" . $i);
co::sleep(0.3);
}
$data = [
'www.baidu.com',
'www.qq.com',
'www.csdn.com',
];
foreach ($data as $v) {
$ips[] = co::gethostbyname($v, AF_INET, 0.5);
consolelog("begin-select:" . $i);
}
$c->push($ips);
}
}
/**
* 定义打印输出
*
* @param [type] $msg
* @return void
*/
function consolelog($msg)
{
$msg = $msg . PHP_EOL;
echo $msg;
}
/**
* 定义日志文件
*
* @param [type] $msg
* @return void
*/
因为协程是并行执行的他的速度非常快,如果不用time我们看不到他的执行结果,所以现在修改一下代码,也可以实现
package main
import (
"fmt"
)
//create worker
func createWorker(id int) workers {
w := workers{
in: make(chan int),
done: make(chan bool),
}
go doWorker(id, w.in, w.done)
return w
}
// do worker
func doWorker(id int, c chan int, done chan bool) {
for n := range c {
fmt.Printf("is %d,come %c\n", id, n)
done <- true //out
}
}
type workers struct { //type a struct
in chan int //in channel
done chan bool //out channel
}
func chanDemo() {
var workers [10]workers
//extwo
for i := 0; i < 10; i++ {
workers[i] = createWorker(i)
}
//all words goto channel
for i, worker := range workers {
worker.in <- 'a' + i
}
//make word out
for _, woker := range workers {
<-woker.done
}
//all words goto channel
for i, worker := range workers {
worker.in <- 'A' + i
}
//make word out
for _, woker := range workers {
<-woker.done
}
}
func main() {
chanDemo()
}
golang里面内置了一个WaitGroup方法 还可以这样来
package main
import (
"fmt"
"sync"
)
//create worker
func createWorker(id int, wait *sync.WaitGroup) workers {
w := workers{
in: make(chan int),
wait: wait,
}
go doWorker(id, w.in, wait)
return w
}
// do worker
func doWorker(id int, c chan int, wait *sync.WaitGroup) {
for n := range c {
fmt.Printf("is %d,come %c\n", id, n)
wait.Done()
}
}
type workers struct {
in chan int
wait *sync.WaitGroup //usr chanDemo wait so use *
}
func chanDemo() {
var workers [10]workers
var wait sync.WaitGroup
for i := 0; i < 10; i++ {
workers[i] = createWorker(i, &wait)
}
wait.Add(20) //we have 20 work todo
for i, worker := range workers {
worker.in <- 'a' + i
//or we can do this
//wait.Add(1)
}
for i, worker := range workers {
worker.in <- 'A' + i
}
wait.Wait() //wait 20 work to end
}
func main() {
chanDemo()
}
总结:swoole的chan 类似于 队列,先进先出,可以设置一个缓存区,就是你的管道的长度, 在swoole里面有isFull方法进行判断,如果满了,可以将当前协程挂起,(其他判断方法也可以),如果发生I/O阻塞,协程本身就是并发,大家可以想想会发生什么,你的CPU,内存,所以代码一定要严谨。
go里面的channel是并行执行,因为速度非常快,我们第一种通过time的方式可以实现输出,第二种先并行执行前10个,done出来,再并行执行后10个,done出来,第三种,通过内置的waitgroup方法,并行执行20个。
-----个人理解
网友评论