美文网首页
thinkphp队列think-queue的使用以及通过supe

thinkphp队列think-queue的使用以及通过supe

作者: 回眸淡然笑 | 来源:发表于2023-04-18 14:49 被阅读0次

    think-queue是thinkphp官方提供的一个消息队列服务,适用于大并发、返回结果时间较长、需要批量操作等专门支持队列服务的扩展包。例如短信发送、模板消息邮件等推送。可以进行发布、获取、执行、删除、重发、失败处理、延迟执行、操作控制等操作。

    1、安装think-queue
    使用composer进行安装,在项目根目录,执行: composer require topthink/think-queue
    注意thinkphp版本 5.X和6.X版本是不一样的,这里项目是5.0.X,所以我用的是1.1.*

    2、配置消息队列的存储环境
    执行完composer安装命令后,在application\extra\项目配置目录下生成queue.php配置文件。

    <?php
    
    return [
        'connector'  => 'Redis',        // Redis 驱动
        'expire'     => 60,     // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
        'default'    => 'default',      // 默认的队列名称
        'host'       => '127.0.0.1',    // redis 主机ip
        'port'       => 6379,       // redis 端口
        'password'   => '',     // redis 密码
        'select'     => 14,     // 使用哪一个,倒数第2个用于存放队列任务
        'timeout'    => 0,      // redis连接的超时时间
        'persistent' => false,      // 是否是长连接
    ];
    
    

    3、队列生产者
    在模块下添加logic/QueueLogic.php文件用于生产者

    <?php
    namespace app\api\logic;
    
    use think\Queue;
    
    /**
     * 队列逻辑类
     * 要使用队列的步骤如下:
     *  1. 配置队列配置文件,配置文件位置在 application/extra/queue.php,拷贝queue.php.backup修改为queue.php,并更改配置
     *      默认使用redis队列,更换不同环境需配置ip、账号等信息
     *  2. 确保cli模式下已开启work进程:  php think queue:work  --daemon --queue sanhengQueue
     *  3. 创建一个队列 QueueLogic::creaateQueue 传入处理该队列数据类名($class_name)、要处理的数据($job_data)、延迟时间($delay)
     *  4. 实现处理类 新建一个 app\admin\jobs\$class_name 类,并实现 fire(Job $job, $job_data)方法,方法$job为任务对象类,$job_data
     *      为第三步中传入的第二个参数
     * Class QueueLogic
     * @package app\api\logic
     */
    class QueueLogic
    {
        /**
         * 队列名字 需要进入cli模式 执行如下
         *    php think queue:work  --daemon --queue sanhengQueue
         */
        const QUEUE_NAME = 'pdfQueue';
        /**
         * 处理任务类名前缀 后台的处理任务一律放在目录 app\admin\jobs 中
         */
        const QUEUE_JOBS_NAMESPACE_PREFIX = 'app\\api\\jobs\\';
    
        /**
         * 创建一个队列 该队列的处理逻辑在 app\admin\jobs 下面,demo参考 app\admin\jobs\TestJob
         * @param string $class_name 处理类名字 不需要命名空间 只需要传类名 放在 app\admin\jobs目录下的类名
         *       该类只需要在fire方法里写处理逻辑即可
         * @param array $job_data  传入的数据数组 该数组会
         * @param int $delay 延迟时间 0 立即执行  传入大于等于0的数字代表 $delay秒后执行
         * @return bool 创建队列成功或失败
         */
        public static function creaateQueue(string $class_name, array $job_data, int $delay = 0,$name='') : bool
        {
            $delay = (int)$delay;
            $delay < 0 && $delay = 0;
            // 1.当前任务将由哪个类来负责处理。
            //   当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
            $jobHandlerClassName = static::QUEUE_JOBS_NAMESPACE_PREFIX . $class_name;
    
            // 2.当前任务归属的队列名称,如果为新队列,会自动创建
            $jobQueueName = static::QUEUE_NAME;
           if (!empty($name)){
               $jobQueueName=$name;
           }
            // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
            //   ( jobData 为对象时,存储其public属性的键值对 )
    
            // 4.将该任务推送到消息队列,等待对应的消费者去执行
            if (0 === $delay) {
                //立即执行
                
                $isPushed = Queue::push($jobHandlerClassName , $job_data , $jobQueueName);
            } else {
                //延迟$delay秒后执行
                $isPushed = Queue::later($delay, $jobHandlerClassName , $job_data , $jobQueueName);
            }
    
            // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
            if( $isPushed !== false ){
                return true;
            }else{
               return false;
            }
        }
    }
    

    在需要调用的地方加上

    use app\api\logic\QueueLogic;
    public function XXX(){
      QueueLogic::creaateQueue('MakePdfJob', ['id' => $id]);
    }
    
    

    4、消息的消费和删除。
    创建jobs/BaseJob.php基类

    <?php
    
    namespace app\api\jobs;
    
    use think\queue\Job;
    use think\Log;
    
    abstract class BaseJob
    {
        abstract public function fire(Job $job, array $job_data);
    
        /**
         * 错误日志 继承此方法,以便统一格式
         * @param Job $job
         * @param $job_data
         */
        public function log(Job $job, $job_data, $message = '')
        {
            //初始化日志文件目录
            Log::init(['type' => 'File', 'path' => RUNTIME_PATH . 'queue_jobs_log/']);
            Log::record($message . ' 【任务类】:' . var_export($job, true)
                . ' 【数据】:' . var_export($job_data, true));
        }
    }
    

    以及MakePdfJob.php类

    <?php
    
    
    namespace app\api\jobs;
    
    
    use app\api\controller\Pdf;
    use think\queue\Job;
    /**
     * 
     *
     * Class makePdfJob
     * @package app\admin\jobs
     */
    class makePdfJob extends BaseJob
    {
        public function fire(Job $job, array $job_data)
        {
            // 有些消息在到达消费者时,可能已经不再需要执行了
            $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($job_data);
            if(!$isJobStillNeedToBeDone){
                $job->delete();
                return;
            }
            $boolean = $this->executeJob($job_data);
            if ($boolean) {
                //执行成功 删除任务
                $job->delete();
            } else {
                //重试次数大于2次则删除任务
                if ($job->attempts() > 3) {
                    $job->delete();
                    $message = '任务3次仍然执行失败,删除任务,【任务】:';
                    $this->log($job, $job_data, $message);
                }
            }
    //        sleep(2);
        }
    
        /**
         * 检车数据库是否需要被执行
         * @param $job_data
         * @return bool true 继续往下执行 fasle 不执行删除本任务
         */
        protected function checkDatabaseToSeeIfJobNeedToBeDone(array $job_data)
        {
            return true;
        }
    
        /**
         * 执行任务的逻辑
         * @param $job_data
         */
        protected function executeJob(array $job_data)
        {
            $pdf = new Pdf;
            $return = $pdf->makePdfByJob($job_data);
            if ($return) {
                return true;
            } else {
                return false;
            }
        }
    
    }
    

    5、监听任务并执行
    work和listen两种,具体的用法和可选参数可以输入命令加 --help 查看

    php think queue:listen  --queue pdfQueue
    php think queue:work --daemon --queue pdfQueue
    

    以上queue配置、发布、删除、执行、监听等一个简单的队列完成。实际生产中,我们需要用常驻进程的方式运行,防止因为服务器出现问题导致监听退出手动再次启动,这就得用的我们重点提到的Supervisor。Supervisord 是用 Python 实现的一款的进程管理工具,supervisord 要求管理的程序是非 daemon 程序,supervisord 会帮你把它转成 daemon 程序,因此如果用 supervisord 来管理进程,进程需要以非daemon的方式启动。

    6、安装Supervisor

    yum install -y supervisor
    

    7、查看Supervisor安装位置
    supervisor安装完成后,会在/usr/bin下生成三个执行程序:supervisortd、supervisorctl、echo_supervisord_conf,分别是supervisor的守护进程服务(用于接收进程管理命令)、客户端(用于和守护进程通信,发送管理进程的指令)、生成初始配置文件程序。

    [root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 
    [root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 
    [root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis supervisord
    supervisord: /usr/bin/supervisord /etc/supervisord.d /etc/supervisord.conf
    [root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis echo_supervisord_conf
    echo_supervisord_conf: /usr/bin/echo_supervisord_conf
    [root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 
    [root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis supervisorctl    
    supervisorctl: /usr/bin/supervisorctl
    [root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 
    

    8、修改配置文件 vi /etc/supervisord.conf

    [include]
    files = supervisord.d/*.ini
    

    9、自定义待守护进程配置文件
    在 /etc/supervisord.d 下创建以.ini 后缀的文件。编辑如下配置:

    [program:pdfQueue] ;程序名称,在 supervisorctl 中通过这个值来对程序进行一系列的操作
    command=php /www/wwwroot/templet.tik-im.com/think queue:work --daemon --queue pdfQueue
    autostart=true ;在 supervisord 启动的时候也自动启动
    autorestart=true ; 程序异常退出后自动重启
    user=root ;用哪个用户启动
    redirect_stderr=true ;把 stderr 重定向到 stdout,默认 false
    stdout_logfile_maxbytes=20MB ;stdout 日志文件大小,默认 50MB
    stdout_logfile_backups=20 ;stdout 日志文件备份数
    stderr_logfile=/www/wwwroot/templet.tik-im.com/worker_err.log ; 错误日志文件
    stdout_logfile=/www/wwwroot/templet.tik-im.com/worker.log  ;输出日志文件
    

    10、Surpervisor的启动

    # supervisord二进制启动
    supervisord -c /etc/supervisord.conf
    # 检查进程
    ps aux | grep supervisord
    # 更新Supervisor
    supervisorctl update
    

    相关文章

      网友评论

          本文标题:thinkphp队列think-queue的使用以及通过supe

          本文链接:https://www.haomeiwen.com/subject/dqghjdtx.html