美文网首页
PHP多进程

PHP多进程

作者: 手扶拖拉机_6e4d | 来源:发表于2019-03-25 14:14 被阅读0次

    PHP多进程实例:
    本地并发只能通过语言自己的特性在程序本身实现多任务效果,一般来说现在的语言会通过多线程或多进程的方式来实现。由于PHP不支持多线程,目前只能采用多进程方式,让操作系统来帮助实现本地并发。至于代码实现,可以通过pcntl扩展、 proc_open、popen等方式,注意PHP需要先安装pcntl扩展。
    场景:日常任务中,有时需要通过php脚本执行一些日志分析,队列处理等任务,当数据量比较大时,可以使用多进程来处理。
    准备:php多进程需要pcntl,posix扩展支持,可以通过 php - m 查看,没安装的话需要重新编译php,加上参数--enable-pcntl,posix一般默认会有。

    创建子进程的函数fork
    pcntl_fork — 在当前进程当前位置产生分支(子进程)。译注:fork是创建了一个子进程,父进程和子进程 都从fork的位置开始向下继续执行,不同的是父进程执行过程中,得到的fork返回值为子进程号,而子进程得到的是0。
    一个fork子进程的基础示例:

        <?php
    $pid = pcntl_fork();//父进程和子进程都会执行下面代码
    if ($pid == -1) {   
         //错误处理:创建子进程失败时返回-1.
         die('could not fork');
    } else if ($pid) {
       //父进程会得到子进程号,所以这里是父进程执行的逻辑
         pcntl_wait($status); 
        //等待子进程中断,防止子进程成为僵尸进程。
    } else {     
     //子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
    }
    

    如果一个任务被分解成多个进程执行,就会减少整体的耗时。
    比如有一个比较大的数据(文件)要处理,这个数据(文件)由很多行组成。如果单进程执行要处理的任务,量很大时要耗时比较久。这时可以考虑多进程。
    多进程处理分解任务,每个进程处理数据(文件)的一部分,这样需要均分割一下这个大数据(文件)成多个小数据(文件)(进程数和小数据(文件)的个数等同就可以)。
    案例:
    现在有两张表: 工单表ticket和工单跟进ticket_reply,两表根据ticket_no关联,一条工单记录可以有多条工单跟进记录(一对多)。现在需要在工单表(t_ticket)表中新增一个跟进时间(follow_time),该follow_time其实就是ticket_reply中的创建时间(create_time)---取最近那条工单跟进的create_time。
    思路解析: 先查询出工单跟进表中总记录条数,由于两张表都是千万级的数据量(1300万), 那么假如我们开10个子进程,那么每个子进程的处理总量为130万,在每个子进程中每次foreach 只处理1000条数据, 举个例子:子进程1的下限区间为 1300000, 上限区间为2600000, 那么sql的查询条件为 : 1300000 <= t_id <= 2600000, 对这个区间中的数据每次取1000条,那么第一次foreach的查询条件为1300000 <= t_id <= 2600000 order by t_id asc limit 1000, 第二次foreach的查询条件为1301000 <= t_id <= 2600000 order by t_id asc limit 1000 (注意:千万不要使用类似与 select t_id from t_ticket limit 1301000, 1000 来处理)

    代码如下:

     <?php
    
        class Test
        {
            public function testPm()
            {
                $total = 7098500;     //总数据 mysql
                $num = 10;         //进程数
                $page_size = 1000;
                $per_total = ceil($total / $num); //每个进程处理总数 
                $page = ceil($per_total / $page_size);//当前进程需要处理多少次 
    
                for ($i = 0; $i <= 9; $i++) {
                    // 每个进程对应的区间下限, 例如:进程1 -> 0, 进程2 -> 1300000 , 进程3  -> 2600000    ...
                    $minId = floor(( $i  / $num) * $total) ;
                    //每个进程对应的区间上限, 例如: 进程1 -> 1300000, 进程2 -> 2600000 , 进程3  -> 3900000    ...
                    $maxId = ceil( (($i +1)  / $num) * $total);
    
                    Lib_Pm::fork(function ($msg, $i) use ($num,$per_total,$minId,$maxId) {
                        $this->error($msg);
                        //处理工单跟进的具体逻辑
                        $this->handleReply($minId,$maxId);
                        sleep($i);
                    }, 'hello', $i);
                }
    
                while (($pid = pcntl_waitpid(0, $status)) != -1) {
                    $status = pcntl_wexitstatus($status);
                    Lib_Climate::success("子进程$pid 完成 状态 $status");
                }
            }
    
            public function handleReply($minId, $maxId)
            {
                for ($i = $minId; $i <= $maxId; $i+=1000) {
                    $field = 'ticket_no,t_id,follow_time,create_time as t_create_time';
                    $where = 't_id >= '  .$i . ' AND t_id <= ' . $maxId . ' order by `t_id` asc limit ' . 1000;
                    //查询t_ticket
                    $result = App_Db::getInstance()->select('t_ticket', $field, $where);
    
                    if(empty($result)) {
                        break;
                    }
    
                    $data = [];
                    $resultlist = [];
                    foreach ($result as $k => $v) {
                        $res_key = $v['ticket_no'];
                        array_push($data, $v['ticket_no']);
                        $resultlist[$res_key] = [
                            't_create_time' => $v['t_create_time'],
                        ];
                    }
                    if (!empty($data)) {
                        $ticket_ids = implode(',', $data);
                        $condition = 'ticket_no in ' . '(' . $ticket_ids . ')';
                        $datalist = App_Db::getInstance()->select('t_ticket_reply', 'ticket_no,create_time', $condition);
                        // t_ticket_reply 处理之后的数据
                        $resdata = $this->handleData($datalist);   
                        foreach ($resultlist as $kk => $vv) {
                            if (!isset($resdata[$kk])) {
                                $resdata[$kk] = $vv['t_create_time'];
                            }
                        }
                        if (!empty($resdata)) {
                            //批量写入数据
                            $ticket_nos = implode(',', array_keys($resdata));
                            $sql = 'UPDATE t_ticket SET follow_time = ' . ' CASE ticket_no ';
                            foreach ($resdata as $ticket_no => $follow_time) {
                                $sql .= sprintf(" WHEN %s THEN %s ", $ticket_no, " '$follow_time' ");
                            }
                            if (trim($ticket_nos) != '') {
                                $sql .= ' END WHERE ticket_no IN ' . '(' . $ticket_nos . ')';
                                $res = App_Db::getInstance()->query($sql);
                            }
                        } else {
                            continue;
                        }
                    } else {
                        continue;
                    }
                    unset($result,$datalist,$resdata,$ticket_nos);
                }
            }
    
    
             /**
             * 处理t_ticket_reply数据
             * @param $datalist  array  : t_ticket_reply数据
             * @param $result    array  : t_ticket数据
             */
            public function handleData($datalist)
            {
                $data = [];
                $list = [];
                if (is_array($datalist) && !empty($datalist)) {
                    foreach ($datalist as $k => $v) {
                        //去重,取最大值
                        $key = $v['ticket_no'];
                        if (isset($data[$key])) {
                            $res = $this->dateBdate($data[$key]['create_time'], $v['create_time']);
                            if ($res == 1) {  //覆盖
                                $data[$key] = [
                                    'ticket_no' => $v['ticket_no'],
                                    'create_time' => $v['create_time']
                                ];
                                $list[$key] = $v['create_time'];
                            }
                        } else {
                            $data[$key] = [
                                'ticket_no' => $v['ticket_no'],
                                'create_time' => $v['create_time']
                            ];
                            $list[$key] = $v['create_time'];
                        }
                    }
                }
                return $list;
            }
    
    
            /**
             * 比较两个日期的大小
             * @param $date1
             * @param $date2
             * @return int
             */
            public function dateBdate($date1, $date2)
            {
                if (strtotime($date1) < strtotime($date2)) {
                    return 1;
                } else {
                    return 0;
                }
            }
        }
    

    相关文章

      网友评论

          本文标题:PHP多进程

          本文链接:https://www.haomeiwen.com/subject/klwvvqtx.html