美文网首页
PHP多进程

PHP多进程

作者: 手扶拖拉机_6e4d | 来源:发表于2019-03-25 14:14 被阅读0次

PHP多进程实例:
本地并发只能通过语言自己的特性在程序本身实现多任务效果,一般来说现在的语言会通过多线程或多进程的方式来实现。由于PHP不支持多线程,目前只能采用多进程方式,让操作系统来帮助实现本地并发。至于代码实现,可以通过pcntl扩展、 proc_open、popen等方式,注意PHP需要先安装pcntl扩展。
场景:日常任务中,有时需要通过php脚本执行一些日志分析,队列处理等任务,当数据量比较大时,可以使用多进程来处理。
准备:php多进程需要pcntl,posix扩展支持,可以通过 php - m 查看,没安装的话需要重新编译php,加上参数--enable-pcntl,posix一般默认会有。

创建子进程的函数fork
pcntl_fork — 在当前进程当前位置产生分支(子进程)。译注:fork是创建了一个子进程,父进程和子进程 都从fork的位置开始向下继续执行,不同的是父进程执行过程中,得到的fork返回值为子进程号,而子进程得到的是0。
一个fork子进程的基础示例:

    <?php
$pid = pcntl_fork();//父进程和子进程都会执行下面代码
if ($pid == -1) {   
     //错误处理:创建子进程失败时返回-1.
     die('could not fork');
} else if ($pid) {
   //父进程会得到子进程号,所以这里是父进程执行的逻辑
     pcntl_wait($status); 
    //等待子进程中断,防止子进程成为僵尸进程。
} else {     
 //子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
}

如果一个任务被分解成多个进程执行,就会减少整体的耗时。
比如有一个比较大的数据(文件)要处理,这个数据(文件)由很多行组成。如果单进程执行要处理的任务,量很大时要耗时比较久。这时可以考虑多进程。
多进程处理分解任务,每个进程处理数据(文件)的一部分,这样需要均分割一下这个大数据(文件)成多个小数据(文件)(进程数和小数据(文件)的个数等同就可以)。
案例:
现在有两张表: 工单表ticket和工单跟进ticket_reply,两表根据ticket_no关联,一条工单记录可以有多条工单跟进记录(一对多)。现在需要在工单表(t_ticket)表中新增一个跟进时间(follow_time),该follow_time其实就是ticket_reply中的创建时间(create_time)---取最近那条工单跟进的create_time。
思路解析: 先查询出工单跟进表中总记录条数,由于两张表都是千万级的数据量(1300万), 那么假如我们开10个子进程,那么每个子进程的处理总量为130万,在每个子进程中每次foreach 只处理1000条数据, 举个例子:子进程1的下限区间为 1300000, 上限区间为2600000, 那么sql的查询条件为 : 1300000 <= t_id <= 2600000, 对这个区间中的数据每次取1000条,那么第一次foreach的查询条件为1300000 <= t_id <= 2600000 order by t_id asc limit 1000, 第二次foreach的查询条件为1301000 <= t_id <= 2600000 order by t_id asc limit 1000 (注意:千万不要使用类似与 select t_id from t_ticket limit 1301000, 1000 来处理)

代码如下:

 <?php

    class Test
    {
        public function testPm()
        {
            $total = 7098500;     //总数据 mysql
            $num = 10;         //进程数
            $page_size = 1000;
            $per_total = ceil($total / $num); //每个进程处理总数 
            $page = ceil($per_total / $page_size);//当前进程需要处理多少次 

            for ($i = 0; $i <= 9; $i++) {
                // 每个进程对应的区间下限, 例如:进程1 -> 0, 进程2 -> 1300000 , 进程3  -> 2600000    ...
                $minId = floor(( $i  / $num) * $total) ;
                //每个进程对应的区间上限, 例如: 进程1 -> 1300000, 进程2 -> 2600000 , 进程3  -> 3900000    ...
                $maxId = ceil( (($i +1)  / $num) * $total);

                Lib_Pm::fork(function ($msg, $i) use ($num,$per_total,$minId,$maxId) {
                    $this->error($msg);
                    //处理工单跟进的具体逻辑
                    $this->handleReply($minId,$maxId);
                    sleep($i);
                }, 'hello', $i);
            }

            while (($pid = pcntl_waitpid(0, $status)) != -1) {
                $status = pcntl_wexitstatus($status);
                Lib_Climate::success("子进程$pid 完成 状态 $status");
            }
        }

        public function handleReply($minId, $maxId)
        {
            for ($i = $minId; $i <= $maxId; $i+=1000) {
                $field = 'ticket_no,t_id,follow_time,create_time as t_create_time';
                $where = 't_id >= '  .$i . ' AND t_id <= ' . $maxId . ' order by `t_id` asc limit ' . 1000;
                //查询t_ticket
                $result = App_Db::getInstance()->select('t_ticket', $field, $where);

                if(empty($result)) {
                    break;
                }

                $data = [];
                $resultlist = [];
                foreach ($result as $k => $v) {
                    $res_key = $v['ticket_no'];
                    array_push($data, $v['ticket_no']);
                    $resultlist[$res_key] = [
                        't_create_time' => $v['t_create_time'],
                    ];
                }
                if (!empty($data)) {
                    $ticket_ids = implode(',', $data);
                    $condition = 'ticket_no in ' . '(' . $ticket_ids . ')';
                    $datalist = App_Db::getInstance()->select('t_ticket_reply', 'ticket_no,create_time', $condition);
                    // t_ticket_reply 处理之后的数据
                    $resdata = $this->handleData($datalist);   
                    foreach ($resultlist as $kk => $vv) {
                        if (!isset($resdata[$kk])) {
                            $resdata[$kk] = $vv['t_create_time'];
                        }
                    }
                    if (!empty($resdata)) {
                        //批量写入数据
                        $ticket_nos = implode(',', array_keys($resdata));
                        $sql = 'UPDATE t_ticket SET follow_time = ' . ' CASE ticket_no ';
                        foreach ($resdata as $ticket_no => $follow_time) {
                            $sql .= sprintf(" WHEN %s THEN %s ", $ticket_no, " '$follow_time' ");
                        }
                        if (trim($ticket_nos) != '') {
                            $sql .= ' END WHERE ticket_no IN ' . '(' . $ticket_nos . ')';
                            $res = App_Db::getInstance()->query($sql);
                        }
                    } else {
                        continue;
                    }
                } else {
                    continue;
                }
                unset($result,$datalist,$resdata,$ticket_nos);
            }
        }


         /**
         * 处理t_ticket_reply数据
         * @param $datalist  array  : t_ticket_reply数据
         * @param $result    array  : t_ticket数据
         */
        public function handleData($datalist)
        {
            $data = [];
            $list = [];
            if (is_array($datalist) && !empty($datalist)) {
                foreach ($datalist as $k => $v) {
                    //去重,取最大值
                    $key = $v['ticket_no'];
                    if (isset($data[$key])) {
                        $res = $this->dateBdate($data[$key]['create_time'], $v['create_time']);
                        if ($res == 1) {  //覆盖
                            $data[$key] = [
                                'ticket_no' => $v['ticket_no'],
                                'create_time' => $v['create_time']
                            ];
                            $list[$key] = $v['create_time'];
                        }
                    } else {
                        $data[$key] = [
                            'ticket_no' => $v['ticket_no'],
                            'create_time' => $v['create_time']
                        ];
                        $list[$key] = $v['create_time'];
                    }
                }
            }
            return $list;
        }


        /**
         * 比较两个日期的大小
         * @param $date1
         * @param $date2
         * @return int
         */
        public function dateBdate($date1, $date2)
        {
            if (strtotime($date1) < strtotime($date2)) {
                return 1;
            } else {
                return 0;
            }
        }
    }

相关文章

  • PHP中的“进程”系列1——PHP-FPM模型

    PHP中的“进程”系列 这个系列会分几个部分,从PHP-FPM进程模式起,到Linux进程,最后回到PHP本身谈一...

  • 杀掉所有grep到的进程

    以php进程为例 grep php 查询关键字带有php的进程(查询结果会带有grep php)grep -v g...

  • PHP创建守护进程

    PHP 创建守护进程 执行守护进程

  • 2019-06-27

    用PHP玩转进程之二 — 多进程PHPServer 2018-09-02 系统设计 语言 PHP 经过用 PHP ...

  • [PHP] - 编译参数 --enable-pcntl

    官网:PHP - PCNTL 红框翻译 PHP进程控制支持 实现了类unix的进程创建、程序运行、消息处理、进程终...

  • php-fpm

    php-fpm说明 php-fpm是FastCGI的实现,并提供进程管理的功能。进程包括master进程和work...

  • php-fpm解读-进程管理的三种模式

    《我是程序媛》系列——php-fpm进程管理,感谢大表哥亲情赞助时间,读了php-fpm源码。 php-fpm进程...

  • PHP 技能精进之 PHP-FPM 多进程模型

    PHP-FPM 提供了更好的 PHP 进程管理方式,可以有效控制内存和进程、可以平滑重载PHP配置。那么当我们谈论...

  • PHP-FPM

    PHP-FPM(PHP FastCGI Process Manager简称,意思是PHP FastCGI的进程管理...

  • php sleep()函数使用注意事项

    php代码的执行是单线程的,php使用php-fpm(Fastcgi 进程管理器) 负责进程的分配和管理,如果ph...

网友评论

      本文标题:PHP多进程

      本文链接:https://www.haomeiwen.com/subject/klwvvqtx.html