PHP消息队列

作者: 477f0b404a70 | 来源:发表于2017-07-26 11:19 被阅读501次

    PHP消息队列

    应用场景

    业务系统的某一环节,执行任务并发量很大,或者执行时间很长的情景下,程序可能无法快速及时的进行处理,适合使用消息队列。

    高度解耦系统

    流量削峰(秒杀,抢购类)

    异步处理业务

    高扩展系统需求

    业务处理顺序有需求

    消息队列概念

    • 队列结构的中间件
    • 消息进入队列后,直接返回结果,不立即进行处理
    • 根据队列顺序,进行处理

    模型

    业务系统 ---> 消息队列 ---> 队列处理系统
    

    业务系统:推送消息给消息队列
    消息队列:所有的消息在消息队列中,等待处理
    队列处理系统:取出消息,进行处理

    常见的消息队列介质

    • MySQL:可靠性高,效率低
    • Redis:效率高,单个队列消息过大时,效率低
    • RabbitMQ:专业,可靠,操作复杂

    消息队列处理机制

    • 死循环读取:易实现,无法故障恢复
    • 定时任务:定时执行处理,分担压力
    • 守护进程:通过进程,检测消息队列中是否符合自定的出队触发条件,进行除对处理

    示例

    一、MySQL实现消息队列

    文件结构:

    ├── vender              #存放数据库操作类
    ├── deal_queue.sh       #处理队列的shell脚本 
    ├── deal_order.php      #从队列中取出消息处理
    └── order.php           #订单推入队列
    

    数据库结构:

    CREATE TABLE `order_queue` (
      `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
      `order_no` varchar(32) NOT NULL DEFAULT '',
      `ctime` int(11) NOT NULL DEFAULT '0',
      `utime` int(11) NOT NULL DEFAULT '0',
      `queue_status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '0->未处理,1->已处理',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    

    order.php 文件:

    <?php
    require 'vendor/autoload.php';
    
    $db = new \Buki\Pdox();
    
    //将生成的订单,放入队列中
    $orderInfo = array(
        'order_no' => time().rand(1000,9999), //模拟一个订单号
        'ctime' => time(),
        'queue_status' => 0,      //入队时,状态为未处理
    );
    $insertId = $db->table('order_queue')
        ->insert($orderInfo);
    
    if($insertId){
        echo '成功添加id为:'.$insertId;
    }else{
        echo '添加失败';
    }
    

    测试运行该脚本:

    $ php order.php 
    成功添加id为:1
    $ php order.php
    成功添加id为:2
    

    deal_order.php 文件:

    <?php
    require 'vendor/autoload.php';
    
    $db = new \Buki\Pdox();
    
    //从队列中,取出消息进行处理
    $update = array(
        'queue_status'=>1,
        'utime'=>time(),
    );
    $isUpdate = $db->table('order_queue')
        ->where('queue_status',0)
        ->limit(3) //方便观察效果,这里每次处理3条
        ->update($update);
    
    if($isUpdate){
        echo 'success deal '.$isUpdate.' messages';
    }else{
        echo 'queue is clean';
    }
    

    测试运行该脚本:

    $ php deal_order.php 
    success deal 2 messages
    

    deal_queue.sh 文件:

    #!/bin/bash
    
    cd /usr/local/var/www/queue
    php deal_order.php
    

    编写 crontab 定时任务:

    编辑crontab:

    $ crontab -e
    

    写入该任务:

    */1 * * * * /usr/local/var/www/queue/deal_queue.sh >> /usr/local/var/www/queue/mq.log 2>&1
    

    查看一下:

    $ crontab -l
    */1 * * * * /usr/local/var/www/queue/deal_queue.sh >> /usr/local/var/www/queue/mq.log 2>&1
    

    测试使用

    多执行几次该命令,推送消息到队列

    $ php order.php 
    

    然后观察数据库的数据变化

    同时可以观察mq.log文件的变化,查看队列的执行情况

    $ tail -f mq.log
    success deal 3 messages2017-07-26 02:58:05
    

    定时任务,执行shell脚本时,用php命令,执行php脚本时,数据库操作类连接数据库时,使用localhost,会导致连接不上数据库,但是手动执行crontab内的命令,是可以连接并运行脚本的。

    跟本地的环境以及数据库操作类的兼容性有关。

    二、Redis实现消息队列

    情景:模拟抢购/秒杀场景,瞬时的大量并发,使用Redis队列削峰

    思路:在抢购开始时,将所有物品存放进一个List中,每次用户点击抢购,就从list中,pop出一个元素,由于list的pop操作是原子性的,所以,在高并发的情况下,也不会出现多抢的情况。
    ps:使用list来进行原子操作,如果抢购库存非常大的话,就需要预先生成好一个非常大的list,每次进行pop。这种情况,可以使用decr操作(也是原子性的),每次抢购对库存值,进行decr操作,直至decr之后,值小于0,便不能抢购。

    文件结构

    ├── deal_order.php      #从队列中取出消息处理
    ├── sale_start.php      #秒杀活动开始脚本
    └── order.php           #从商品中读取数据,生成预订单推入队列
    

    order.php 文件:

    <?php
    //连接Redis
    $redis = new Redis();
    $redis->connect('127.0.0.1', 6379);
    //模拟用户id
    $user_id = mt_rand(10000,99999);
    //取出一个数据
    $goods = $redis->lpop('goods');
    //判断是否成功取出
    if($goods){
        //将成功抢购的信息,进行储存
        $redis->lpush('pre_order',$user_id.'@'.$goods.time()); //将用户id@商品id@时间,存入新的list中,等待后续处理
    }else{
        echo '没抢到!';
    }
    

    测试运行该脚本:

    $ php order.php 
    没抢到!
    

    deal_order.php 和 sale_start.php

    sale_start.php :在秒杀活动开始时,执行该脚本,将库存的商品,存入goods列表中
    deal_order.php :处理秒杀过后的pre_order列表,持久化订单
    

    注意

    一定要避免出现 Check Then Act 的操作
    参考其他文档,发现很多抢购案例,用llen来检查list的长度,进行抢购成功与否的判断,这样在大量的并发下,还是会有多抢情况的发生

    三、RabbitMQ实现消息队列

    这个没细研究过,学会了再来补上,简单的工作队列,还是比较容易实现的

    相关文章

      网友评论

        本文标题:PHP消息队列

        本文链接:https://www.haomeiwen.com/subject/ohvnkxtx.html