PHP多进程实例:
本地并发只能通过语言自己的特性在程序本身实现多任务效果,一般来说现在的语言会通过多线程或多进程的方式来实现。由于PHP不支持多线程,目前只能采用多进程方式,让操作系统来帮助实现本地并发。至于代码实现,可以通过pcntl扩展、 proc_open、popen等方式,注意PHP需要先安装pcntl扩展。
场景:日常任务中,有时需要通过php脚本执行一些日志分析,队列处理等任务,当数据量比较大时,可以使用多进程来处理。
准备:php多进程需要pcntl,posix扩展支持,可以通过 php - m 查看,没安装的话需要重新编译php,加上参数--enable-pcntl,posix一般默认会有。
创建子进程的函数fork
pcntl_fork — 在当前进程当前位置产生分支(子进程)。译注:fork是创建了一个子进程,父进程和子进程 都从fork的位置开始向下继续执行,不同的是父进程执行过程中,得到的fork返回值为子进程号,而子进程得到的是0。
一个fork子进程的基础示例:
<?php
$pid = pcntl_fork();//父进程和子进程都会执行下面代码
if ($pid == -1) {
//错误处理:创建子进程失败时返回-1.
die('could not fork');
} else if ($pid) {
//父进程会得到子进程号,所以这里是父进程执行的逻辑
pcntl_wait($status);
//等待子进程中断,防止子进程成为僵尸进程。
} else {
//子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
}
如果一个任务被分解成多个进程执行,就会减少整体的耗时。
比如有一个比较大的数据(文件)要处理,这个数据(文件)由很多行组成。如果单进程执行要处理的任务,量很大时要耗时比较久。这时可以考虑多进程。
多进程处理分解任务,每个进程处理数据(文件)的一部分,这样需要均分割一下这个大数据(文件)成多个小数据(文件)(进程数和小数据(文件)的个数等同就可以)。
案例:
现在有两张表: 工单表ticket和工单跟进ticket_reply,两表根据ticket_no关联,一条工单记录可以有多条工单跟进记录(一对多)。现在需要在工单表(t_ticket)表中新增一个跟进时间(follow_time),该follow_time其实就是ticket_reply中的创建时间(create_time)---取最近那条工单跟进的create_time。
思路解析: 先查询出工单跟进表中总记录条数,由于两张表都是千万级的数据量(1300万), 那么假如我们开10个子进程,那么每个子进程的处理总量为130万,在每个子进程中每次foreach 只处理1000条数据, 举个例子:子进程1的下限区间为 1300000, 上限区间为2600000, 那么sql的查询条件为 : 1300000 <= t_id <= 2600000, 对这个区间中的数据每次取1000条,那么第一次foreach的查询条件为1300000 <= t_id <= 2600000 order by t_id asc limit 1000, 第二次foreach的查询条件为1301000 <= t_id <= 2600000 order by t_id asc limit 1000 (注意:千万不要使用类似与 select t_id from t_ticket limit 1301000, 1000 来处理)
代码如下:
<?php
class Test
{
public function testPm()
{
$total = 7098500; //总数据 mysql
$num = 10; //进程数
$page_size = 1000;
$per_total = ceil($total / $num); //每个进程处理总数
$page = ceil($per_total / $page_size);//当前进程需要处理多少次
for ($i = 0; $i <= 9; $i++) {
// 每个进程对应的区间下限, 例如:进程1 -> 0, 进程2 -> 1300000 , 进程3 -> 2600000 ...
$minId = floor(( $i / $num) * $total) ;
//每个进程对应的区间上限, 例如: 进程1 -> 1300000, 进程2 -> 2600000 , 进程3 -> 3900000 ...
$maxId = ceil( (($i +1) / $num) * $total);
Lib_Pm::fork(function ($msg, $i) use ($num,$per_total,$minId,$maxId) {
$this->error($msg);
//处理工单跟进的具体逻辑
$this->handleReply($minId,$maxId);
sleep($i);
}, 'hello', $i);
}
while (($pid = pcntl_waitpid(0, $status)) != -1) {
$status = pcntl_wexitstatus($status);
Lib_Climate::success("子进程$pid 完成 状态 $status");
}
}
public function handleReply($minId, $maxId)
{
for ($i = $minId; $i <= $maxId; $i+=1000) {
$field = 'ticket_no,t_id,follow_time,create_time as t_create_time';
$where = 't_id >= ' .$i . ' AND t_id <= ' . $maxId . ' order by `t_id` asc limit ' . 1000;
//查询t_ticket
$result = App_Db::getInstance()->select('t_ticket', $field, $where);
if(empty($result)) {
break;
}
$data = [];
$resultlist = [];
foreach ($result as $k => $v) {
$res_key = $v['ticket_no'];
array_push($data, $v['ticket_no']);
$resultlist[$res_key] = [
't_create_time' => $v['t_create_time'],
];
}
if (!empty($data)) {
$ticket_ids = implode(',', $data);
$condition = 'ticket_no in ' . '(' . $ticket_ids . ')';
$datalist = App_Db::getInstance()->select('t_ticket_reply', 'ticket_no,create_time', $condition);
// t_ticket_reply 处理之后的数据
$resdata = $this->handleData($datalist);
foreach ($resultlist as $kk => $vv) {
if (!isset($resdata[$kk])) {
$resdata[$kk] = $vv['t_create_time'];
}
}
if (!empty($resdata)) {
//批量写入数据
$ticket_nos = implode(',', array_keys($resdata));
$sql = 'UPDATE t_ticket SET follow_time = ' . ' CASE ticket_no ';
foreach ($resdata as $ticket_no => $follow_time) {
$sql .= sprintf(" WHEN %s THEN %s ", $ticket_no, " '$follow_time' ");
}
if (trim($ticket_nos) != '') {
$sql .= ' END WHERE ticket_no IN ' . '(' . $ticket_nos . ')';
$res = App_Db::getInstance()->query($sql);
}
} else {
continue;
}
} else {
continue;
}
unset($result,$datalist,$resdata,$ticket_nos);
}
}
/**
* 处理t_ticket_reply数据
* @param $datalist array : t_ticket_reply数据
* @param $result array : t_ticket数据
*/
public function handleData($datalist)
{
$data = [];
$list = [];
if (is_array($datalist) && !empty($datalist)) {
foreach ($datalist as $k => $v) {
//去重,取最大值
$key = $v['ticket_no'];
if (isset($data[$key])) {
$res = $this->dateBdate($data[$key]['create_time'], $v['create_time']);
if ($res == 1) { //覆盖
$data[$key] = [
'ticket_no' => $v['ticket_no'],
'create_time' => $v['create_time']
];
$list[$key] = $v['create_time'];
}
} else {
$data[$key] = [
'ticket_no' => $v['ticket_no'],
'create_time' => $v['create_time']
];
$list[$key] = $v['create_time'];
}
}
}
return $list;
}
/**
* 比较两个日期的大小
* @param $date1
* @param $date2
* @return int
*/
public function dateBdate($date1, $date2)
{
if (strtotime($date1) < strtotime($date2)) {
return 1;
} else {
return 0;
}
}
}
网友评论