美文网首页
使用 swoole_process 实现 PHP 进程池

使用 swoole_process 实现 PHP 进程池

作者: _Amauri | 来源:发表于2018-10-27 18:39 被阅读124次

swoole_process 主要是用来代替 PHP 的 pcntl 扩展。我们知道 pcntl 是用来进行多进程编程的,而 pcntl 只提供了 fork 这样原始的接口,容易使用错误,并且没有提供进程间通信以及重定向标准输入输出的功能。

而 swoole_process 则提供了比 pcntl 更强大的功能,更易用的API,使PHP在多进程编程方面更加轻松。

本文使用 swoole_process 与 EventLoop 完成一个 php 的进程池,并且支持动态创建新进程。

EventLoop

swoole 有一个 Reactor 线程,这个线程可以说是对 epoll 模型的封装,可以设置 read 事件和 write 事件的监听回调函数。

下面会用到一个函数:

bool swoole_event_add(mixed $sock, mixed $read_callback, mixed $write_callback = null, int $flags = null);
  • 参数1为一个文件描述符,包括swoole_client->$sockswoole_process->$pipe或者其他 fd(socket_create 创建的资源 , stream_socket_client/fsockopen创建的资源)
  • 参数2为可读事件回调函数
  • 参数3为可写事件回调函数

多进程编程少不了进程之间的通讯,swoole 的进程之间有两种通信方式,一种是消息队列(queue),另一种是管道(pipe)。那么本文使用的是 pipe 的方式。

下面是一个定时向进程池投递任务的例子。

代码:

<?php
class ProcessPool{

    private $process;

    /**
     * Worker 进程数组
     * @var array
     */
    private $process_list = [];

    /**
     * 正在被使用的进程
     * @var array
     */
    private $process_use = [];

    /**
     * 最少进程数量
     * @var int
     */
    private $min_worker_num = 3;

    /**
     * 最多进程数量
     * @var int
     */
    private $max_worker_num = 6;

    /**
     * 当前进程数量
     * @var int
     */
    private $current_num;


    public function __construct()
    {
        $this->process = new swoole_process(array($this, 'run'), false, 2);
        $this->process->start();
        swoole_process::wait();
    }

    public function run()
    {
        $this->current_num = $this->min_worker_num;
        //创建所有的worker进程
        for($i = 0; $i < $this->current_num; $i++){
            $process = new swoole_process(array($this, 'task_run'), false, 2);
            $pid = $process->start();
            $this->process_list[$pid] = $process;
            $this->process_use[$pid] = 0;
        }

        foreach($this->process_list as $process){
            swoole_event_add($process->pipe, function ($pipe) use ($process){
                $data = $process->read();
                var_dump($data . '空闲');
                //接收子进程处理完成的信息,并且重置为空闲
                $this->process_use[$data] = 0;
            });
        }

        //每秒定时向worker管道投递任务
        swoole_timer_tick(1000 ,function ($timer_id){
            static $index = 0;
            $index = $index + 1;
            $flag = true; //是否新建worker
            foreach ($this->process_use as $pid => $used){
                if($used == 0){
                    $flag = false;
                    //标记为正在使用
                    $this->process_use[$pid] = 1;
                    // 在父进程内调用write,子进程可以调用read接收此数据
                    $this->process_list[$pid]->write($index. "hello");
                    break;
                }
            }

            if($flag && $this->current_num < $this->max_worker_num){
                //没有闲置worker,新建worker来处理
                $process = new swoole_process(array($this, 'task_run'), false, 2);
                $pid = $process->start();
                $this->process_list[$pid] = $process;
                $this->process_use[$pid] = 1;
                $this->process_list[$pid]->write($index. "hello");
                $this->current_num++;
            }
            var_dump('第' .$index. '个任务');
            if($index == 10){
                foreach($this->process_list as $process){
                    $process->write("exit");
                }
                swoole_timer_clear($timer_id);
                $this->process->exit();
            }

        });
    }

    /**
     * 子进程处理
     * @param $worker
     */
    public function task_run($worker)
    {
        swoole_event_add($worker->pipe, function($pipe)use($worker){
            $data = $worker->read();
            var_dump($worker->pid . ':' . $data);
            if($data == 'exit'){
                $worker->exit();
                exit;
            }
            //模拟耗时任务
            sleep(5);
            //告诉主进程处理完成
            //在子进程内调用write,父进程可以调用read接收此数据
            $worker->write($worker->pid);
        });
    }

}

new ProcessPool();

首先定义几个重要的属性:

  • $process_list :Worker 进程数组
  • $process_use:正在被使用的进程
  • $min_worker_num :最少进程数量
  • $max_worker_num :最多进程数量
  • $current_num :当前进程数量
  • $process : 主进程

在实例化的时候创建主进程,并且运行 run 方法,在 run 方法里面先创建所有的 worker 进程,并且设置为空闲状态。

接着遍历所有的 worker 进程,并且加入 EventLoop 中,设置可读事件,用于接收子进程的空闲信号。

最后每隔一秒向 worker 进程投递任务。动态扩充进程池则在这里实现,如果没有闲置的进程,而此时又有新的任务,则需要动态创建一个新的进程并且置为繁忙状态。由于只模拟了十次任务,则第十个任务完成之后在父进程中发送 exit 使所有子进程退出。

运行效果与图解:
Snipaste_2018-10-27_18-36-11.png
参考链接:

https://wiki.swoole.com/wiki/page/p-process.html
https://opso.coding.me/2018/07/07/swoole-process/

相关文章

  • 使用 swoole_process 实现 PHP 进程池

    swoole_process 主要是用来代替 PHP 的 pcntl 扩展。我们知道 pcntl 是用来进行多进程...

  • php-fpm

    php-fpm cpu 高排查思路 解决思路 解决思路 设置控制php-fpm进程池进程数量。 开启慢日志 编辑p...

  • [PHP] - 编译参数 --enable-pcntl

    官网:PHP - PCNTL 红框翻译 PHP进程控制支持 实现了类unix的进程创建、程序运行、消息处理、进程终...

  • php-fpm

    php-fpm说明 php-fpm是FastCGI的实现,并提供进程管理的功能。进程包括master进程和work...

  • Supervisor的配置步骤

    最近的PHP项目中,需要用到常驻后台的脚本,使用Supervisor管理进程,可以实现对异常中断的子进程的自动重启...

  • PHP-FPM详解

    目录 作用 安装 全局配置 配置进程池参考Company开发环境 转发请求给PHP-FPM 思考 作用 PHP-F...

  • Python - 多任务

    一、进程 1、multiprocessing.Process创建进程 2、使用Pool进程池创建进程 3、进程间的...

  • php sleep()函数使用注意事项

    php代码的执行是单线程的,php使用php-fpm(Fastcgi 进程管理器) 负责进程的分配和管理,如果ph...

  • PHP多进程

    PHP多进程 1.多开几个进程,这种方式简单实用,推荐,比如说使用shell脚本: 2.pcntl扩展 php多进...

  • python多进程

    这里使用官方推荐的进程池的方式☞

网友评论

      本文标题:使用 swoole_process 实现 PHP 进程池

      本文链接:https://www.haomeiwen.com/subject/zgpdtqtx.html