PHP消息队列
应用场景
业务系统的某一环节,执行任务并发量很大,或者执行时间很长的情景下,程序可能无法快速及时的进行处理,适合使用消息队列。
高度解耦系统
流量削峰(秒杀,抢购类)
异步处理业务
高扩展系统需求
业务处理顺序有需求
消息队列概念
- 队列结构的中间件
- 消息进入队列后,直接返回结果,不立即进行处理
- 根据队列顺序,进行处理
模型
业务系统 ---> 消息队列 ---> 队列处理系统
业务系统:推送消息给消息队列
消息队列:所有的消息在消息队列中,等待处理
队列处理系统:取出消息,进行处理
常见的消息队列介质
- MySQL:可靠性高,效率低
- Redis:效率高,单个队列消息过大时,效率低
- RabbitMQ:专业,可靠,操作复杂
消息队列处理机制
- 死循环读取:易实现,无法故障恢复
- 定时任务:定时执行处理,分担压力
- 守护进程:通过进程,检测消息队列中是否符合自定的出队触发条件,进行除对处理
示例
一、MySQL实现消息队列
文件结构:
├── vender #存放数据库操作类
├── deal_queue.sh #处理队列的shell脚本
├── deal_order.php #从队列中取出消息处理
└── order.php #订单推入队列
数据库结构:
CREATE TABLE `order_queue` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`order_no` varchar(32) NOT NULL DEFAULT '',
`ctime` int(11) NOT NULL DEFAULT '0',
`utime` int(11) NOT NULL DEFAULT '0',
`queue_status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '0->未处理,1->已处理',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
order.php 文件:
<?php
require 'vendor/autoload.php';
$db = new \Buki\Pdox();
//将生成的订单,放入队列中
$orderInfo = array(
'order_no' => time().rand(1000,9999), //模拟一个订单号
'ctime' => time(),
'queue_status' => 0, //入队时,状态为未处理
);
$insertId = $db->table('order_queue')
->insert($orderInfo);
if($insertId){
echo '成功添加id为:'.$insertId;
}else{
echo '添加失败';
}
测试运行该脚本:
$ php order.php
成功添加id为:1
$ php order.php
成功添加id为:2
deal_order.php 文件:
<?php
require 'vendor/autoload.php';
$db = new \Buki\Pdox();
//从队列中,取出消息进行处理
$update = array(
'queue_status'=>1,
'utime'=>time(),
);
$isUpdate = $db->table('order_queue')
->where('queue_status',0)
->limit(3) //方便观察效果,这里每次处理3条
->update($update);
if($isUpdate){
echo 'success deal '.$isUpdate.' messages';
}else{
echo 'queue is clean';
}
测试运行该脚本:
$ php deal_order.php
success deal 2 messages
deal_queue.sh 文件:
#!/bin/bash
cd /usr/local/var/www/queue
php deal_order.php
编写 crontab 定时任务:
编辑crontab:
$ crontab -e
写入该任务:
*/1 * * * * /usr/local/var/www/queue/deal_queue.sh >> /usr/local/var/www/queue/mq.log 2>&1
查看一下:
$ crontab -l
*/1 * * * * /usr/local/var/www/queue/deal_queue.sh >> /usr/local/var/www/queue/mq.log 2>&1
测试使用
多执行几次该命令,推送消息到队列
$ php order.php
然后观察数据库的数据变化
同时可以观察mq.log文件的变化,查看队列的执行情况
$ tail -f mq.log
success deal 3 messages2017-07-26 02:58:05
坑
定时任务,执行shell脚本时,用php命令,执行php脚本时,数据库操作类连接数据库时,使用localhost
,会导致连接不上数据库,但是手动执行crontab内的命令,是可以连接并运行脚本的。
跟本地的环境以及数据库操作类的兼容性有关。
二、Redis实现消息队列
情景:模拟抢购/秒杀场景,瞬时的大量并发,使用Redis队列削峰
思路:在抢购开始时,将所有物品存放进一个List中,每次用户点击抢购,就从list中,pop出一个元素,由于list的pop操作是原子性的,所以,在高并发的情况下,也不会出现多抢的情况。
ps:使用list来进行原子操作,如果抢购库存非常大的话,就需要预先生成好一个非常大的list,每次进行pop。这种情况,可以使用decr操作(也是原子性的),每次抢购对库存值,进行decr操作,直至decr之后,值小于0,便不能抢购。
文件结构
├── deal_order.php #从队列中取出消息处理
├── sale_start.php #秒杀活动开始脚本
└── order.php #从商品中读取数据,生成预订单推入队列
order.php 文件:
<?php
//连接Redis
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
//模拟用户id
$user_id = mt_rand(10000,99999);
//取出一个数据
$goods = $redis->lpop('goods');
//判断是否成功取出
if($goods){
//将成功抢购的信息,进行储存
$redis->lpush('pre_order',$user_id.'@'.$goods.time()); //将用户id@商品id@时间,存入新的list中,等待后续处理
}else{
echo '没抢到!';
}
测试运行该脚本:
$ php order.php
没抢到!
deal_order.php 和 sale_start.php
sale_start.php :在秒杀活动开始时,执行该脚本,将库存的商品,存入goods列表中
deal_order.php :处理秒杀过后的pre_order列表,持久化订单
注意
一定要避免出现 Check Then Act 的操作
参考其他文档,发现很多抢购案例,用llen来检查list的长度,进行抢购成功与否的判断,这样在大量的并发下,还是会有多抢情况的发生
三、RabbitMQ实现消息队列
这个没细研究过,学会了再来补上,简单的工作队列,还是比较容易实现的
网友评论