Swoole在2.0开始内置协程(Coroutine)的能力,可以在特定封装的IO操作中自动触发协程切换。
刚好需要爬一个资源站,就拿来试了一下。
<?php
/**
* Created by PhpStorm.
* User: wen
*/
use Swoole\Coroutine as co;
date_default_timezone_set('Asia/Shanghai');
// 起始资源ID
const StartResourceId = 1;
// 执行数量
const Num = 30000;
// 最大协程数量
const MaxCoroutine = 360;
// 协程配置
Swoole\Coroutine::set(array(
'max_coroutine' => MaxCoroutine,
));
// 记录开始时间
$StartTime = time();
// 创建处理管道
$chan = new co\Channel(600);
/**
* 主协程
*/
go(function () use ($chan) {
echo 'chan Coroutine start'.PHP_EOL;
// 初始化数据库 异步Mysql必须在协程内使用
$swoole_mysql = new Swoole\Coroutine\MySQL();
$bol = $swoole_mysql->connect([
'host' => '127.0.0.1',
'port' => 3306,
'user' => 'root',
'password' => 'root',
'database' => 'test',
'fetch_mode' => true,
]);
// Mysql连接失败
if (false == $bol){
echo 'MySQL Connect Error'.PHP_EOL;
return;
}
// 开启消费协程【消费协程最好先开,以免生产者协程的不合理协程嵌套导致Channel溢出,无法消费】
go(function () use ($chan, $swoole_mysql){
chanToMysql($chan, $swoole_mysql);
});
// 开启获取资源协程
go(function ()use ($chan){
getCurlRes($chan);
});
});
/**
* 消费Channel数据存储到数据库
* @param $chan
* @param $swoole_mysql
*/
function chanToMysql(&$chan, &$swoole_mysql){
$n = 0;
$stmt = $swoole_mysql->prepare('INSERT INTO test (resource_id, source_content) VALUES (?, ?)');
if (false == $stmt){
consolelog('MySQL预处理失败: ' . $swoole_mysql->errno . ' : ' . $swoole_mysql->error);
die();
}
// 单协程阻塞消费chan
while(true) {
consolelog('wait chan... n: '. $n);
$data = $chan->pop();
$n++;
consolelog('chan: ' . $n);
$bol = $stmt->execute($data, 1);
if (false == $bol){
consolelog('MySQL 插入失败: ' . $swoole_mysql->errno . ' : ' . $swoole_mysql->error);
$n--;
$chan->push($data);
}
consolelog('MySQL 插入成功: ' . $n);
// 消费完毕,关闭最后一个协程,脚本结束
if (Num == $n) {
consolelog('End at : '. date('Y-m-d H:i:s') . PHP_EOL
. ' 耗时: ' . (time()-$GLOBALS['StartTime'])
. 'S 速率: ' . round($n/(time()-$GLOBALS['StartTime']), 2) . '/S');
return;
}
}
}
/**
* 获取资源数据并push到Channel
* @param $chan
*/
function getCurlRes(&$chan){
for ($i = 0; $i < Num; $i++) {
// chan满 || 协程资源不足 暂时让出当前协程的执行权
while ($chan->isFull()
|| (Swoole\Coroutine::stats())['coroutine_num'] > MaxCoroutine-5) {
consolelog('for sleep ' . $i);
co::sleep(0.2);
}
go(function () use ($i, $chan) {
$resource_id = StartResourceId + $i;
$cli = new Swoole\Coroutine\Http\Client('www.***.com', 80);
$cli->post('/getFileUrl', [
'file_id' => $resource_id
]);
$data = $cli->body;
$cli->close();
echo 'CURL: ' . $i . PHP_EOL;
// 【单线程模型,递归协程,子协程将会把控制权先让给父协程】
// chan满时,让出执行权
while ($chan->isFull()) {
consolelog('isFull sleep ' . $i);
co::sleep(0.01);
}
consolelog('CURL To Chan...' . $i);
$chan->push([$resource_id, $data]);
return;
});
}
}
function consolelog($str){
// echo $str, ' ---- ', json_encode(Swoole\Coroutine::stats()), PHP_EOL;
echo $str, PHP_EOL;
return;
}
echo 'load over' . PHP_EOL;
实际跑下来几次,处理效率约在26~30个/秒,其中数据入库是一条一条插远程库,会有一些影响,总体比起纯同步代码好很多了。
有一些要注意的地方:
- 协程满了,再创建的协程无法正常运行
- 需要使用Swoole封装的Coroutine IO方法才能触发协程自动调度
- 嵌套协程让出控制权时,调度器会优先让给父级(上面代码getCurlRes方法中有2处主动让出控制权)
- 同一Mysql会话不可以在多个协程里使用【参照 https://wiki.swoole.com/wiki/page/963.html】
- 最后一个协程退出,脚本结束
网友评论