美文网首页
Swoole之Coroutine与Channel初体验

Swoole之Coroutine与Channel初体验

作者: 许文同学 | 来源:发表于2018-11-11 01:22 被阅读0次

    Swoole在2.0开始内置协程(Coroutine)的能力,可以在特定封装的IO操作中自动触发协程切换。
    刚好需要爬一个资源站,就拿来试了一下。

    <?php
    /**
     * Created by PhpStorm.
     * User: wen
     */
    use Swoole\Coroutine as co;
    date_default_timezone_set('Asia/Shanghai');
    // 起始资源ID
    const StartResourceId = 1;
    // 执行数量
    const Num = 30000;
    // 最大协程数量
    const MaxCoroutine = 360;
    // 协程配置
    Swoole\Coroutine::set(array(
        'max_coroutine' => MaxCoroutine,
    ));
    
    // 记录开始时间
    $StartTime = time();
    // 创建处理管道
    $chan = new co\Channel(600);
    
    /**
     * 主协程
     */
    go(function () use ($chan) {
        echo 'chan Coroutine start'.PHP_EOL;
        // 初始化数据库 异步Mysql必须在协程内使用
        $swoole_mysql = new Swoole\Coroutine\MySQL();
        $bol = $swoole_mysql->connect([
            'host' => '127.0.0.1',
            'port' => 3306,
            'user' => 'root',
            'password' => 'root',
            'database' => 'test',
            'fetch_mode' => true,
        ]);
        // Mysql连接失败
        if (false == $bol){
            echo 'MySQL Connect Error'.PHP_EOL;
            return;
        }
        // 开启消费协程【消费协程最好先开,以免生产者协程的不合理协程嵌套导致Channel溢出,无法消费】
        go(function () use ($chan, $swoole_mysql){
            chanToMysql($chan, $swoole_mysql);
        });
        // 开启获取资源协程
        go(function ()use ($chan){
            getCurlRes($chan);
        });
    });
    
    /**
     * 消费Channel数据存储到数据库
     * @param $chan
     * @param $swoole_mysql
     */
    function chanToMysql(&$chan, &$swoole_mysql){
        $n = 0;
        $stmt = $swoole_mysql->prepare('INSERT INTO test (resource_id, source_content) VALUES (?, ?)');
        if (false == $stmt){
            consolelog('MySQL预处理失败: ' . $swoole_mysql->errno . ' : ' . $swoole_mysql->error);
            die();
        }
        // 单协程阻塞消费chan
        while(true) {
            consolelog('wait chan... n: '. $n);
            $data = $chan->pop();
            $n++;
            consolelog('chan: ' . $n);
            $bol = $stmt->execute($data, 1);
            if (false == $bol){
                consolelog('MySQL 插入失败: ' . $swoole_mysql->errno . ' : ' . $swoole_mysql->error);
                $n--;
                $chan->push($data);
            }
            consolelog('MySQL 插入成功: ' . $n);
            // 消费完毕,关闭最后一个协程,脚本结束
            if (Num == $n) {
                consolelog('End at : '. date('Y-m-d H:i:s') . PHP_EOL
                    . ' 耗时: ' . (time()-$GLOBALS['StartTime'])
                    . 'S 速率: ' . round($n/(time()-$GLOBALS['StartTime']), 2) . '/S');
                return;
            }
        }
    }
    
    /**
     * 获取资源数据并push到Channel
     * @param $chan
     */
    function getCurlRes(&$chan){
        for ($i = 0; $i < Num; $i++) {
            // chan满 || 协程资源不足 暂时让出当前协程的执行权
            while ($chan->isFull()
                || (Swoole\Coroutine::stats())['coroutine_num'] > MaxCoroutine-5) {
                consolelog('for sleep ' . $i);
                co::sleep(0.2);
            }
            go(function () use ($i, $chan) {
                $resource_id = StartResourceId + $i;
                $cli = new Swoole\Coroutine\Http\Client('www.***.com', 80);
                $cli->post('/getFileUrl', [
                    'file_id' => $resource_id
                ]);
                $data = $cli->body;
                $cli->close();
                echo 'CURL: ' . $i . PHP_EOL;
                // 【单线程模型,递归协程,子协程将会把控制权先让给父协程】
                // chan满时,让出执行权
                while ($chan->isFull()) {
                    consolelog('isFull sleep ' . $i);
                    co::sleep(0.01);
                }
                consolelog('CURL To Chan...' . $i);
                $chan->push([$resource_id, $data]);
                return;
            });
        }
    }
    
    function consolelog($str){
    //    echo $str, ' ---- ', json_encode(Swoole\Coroutine::stats()), PHP_EOL;
        echo $str, PHP_EOL;
        return;
    }
    
    
    echo 'load over' . PHP_EOL;
    

    实际跑下来几次,处理效率约在26~30个/秒,其中数据入库是一条一条插远程库,会有一些影响,总体比起纯同步代码好很多了。

    有一些要注意的地方:
    • 协程满了,再创建的协程无法正常运行
    • 需要使用Swoole封装的Coroutine IO方法才能触发协程自动调度
    • 嵌套协程让出控制权时,调度器会优先让给父级(上面代码getCurlRes方法中有2处主动让出控制权)
    • 同一Mysql会话不可以在多个协程里使用【参照 https://wiki.swoole.com/wiki/page/963.html
    • 最后一个协程退出,脚本结束

    相关文章

      网友评论

          本文标题:Swoole之Coroutine与Channel初体验

          本文链接:https://www.haomeiwen.com/subject/lveuxqtx.html