美文网首页
thinkphp队列think-queue的使用以及通过supe

thinkphp队列think-queue的使用以及通过supe

作者: 回眸淡然笑 | 来源:发表于2023-04-18 14:49 被阅读0次

think-queue是thinkphp官方提供的一个消息队列服务,适用于大并发、返回结果时间较长、需要批量操作等专门支持队列服务的扩展包。例如短信发送、模板消息邮件等推送。可以进行发布、获取、执行、删除、重发、失败处理、延迟执行、操作控制等操作。

1、安装think-queue
使用composer进行安装,在项目根目录,执行: composer require topthink/think-queue
注意thinkphp版本 5.X和6.X版本是不一样的,这里项目是5.0.X,所以我用的是1.1.*

2、配置消息队列的存储环境
执行完composer安装命令后,在application\extra\项目配置目录下生成queue.php配置文件。

<?php

return [
    'connector'  => 'Redis',        // Redis 驱动
    'expire'     => 60,     // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    'default'    => 'default',      // 默认的队列名称
    'host'       => '127.0.0.1',    // redis 主机ip
    'port'       => 6379,       // redis 端口
    'password'   => '',     // redis 密码
    'select'     => 14,     // 使用哪一个,倒数第2个用于存放队列任务
    'timeout'    => 0,      // redis连接的超时时间
    'persistent' => false,      // 是否是长连接
];

3、队列生产者
在模块下添加logic/QueueLogic.php文件用于生产者

<?php
namespace app\api\logic;

use think\Queue;

/**
 * 队列逻辑类
 * 要使用队列的步骤如下:
 *  1. 配置队列配置文件,配置文件位置在 application/extra/queue.php,拷贝queue.php.backup修改为queue.php,并更改配置
 *      默认使用redis队列,更换不同环境需配置ip、账号等信息
 *  2. 确保cli模式下已开启work进程:  php think queue:work  --daemon --queue sanhengQueue
 *  3. 创建一个队列 QueueLogic::creaateQueue 传入处理该队列数据类名($class_name)、要处理的数据($job_data)、延迟时间($delay)
 *  4. 实现处理类 新建一个 app\admin\jobs\$class_name 类,并实现 fire(Job $job, $job_data)方法,方法$job为任务对象类,$job_data
 *      为第三步中传入的第二个参数
 * Class QueueLogic
 * @package app\api\logic
 */
class QueueLogic
{
    /**
     * 队列名字 需要进入cli模式 执行如下
     *    php think queue:work  --daemon --queue sanhengQueue
     */
    const QUEUE_NAME = 'pdfQueue';
    /**
     * 处理任务类名前缀 后台的处理任务一律放在目录 app\admin\jobs 中
     */
    const QUEUE_JOBS_NAMESPACE_PREFIX = 'app\\api\\jobs\\';

    /**
     * 创建一个队列 该队列的处理逻辑在 app\admin\jobs 下面,demo参考 app\admin\jobs\TestJob
     * @param string $class_name 处理类名字 不需要命名空间 只需要传类名 放在 app\admin\jobs目录下的类名
     *       该类只需要在fire方法里写处理逻辑即可
     * @param array $job_data  传入的数据数组 该数组会
     * @param int $delay 延迟时间 0 立即执行  传入大于等于0的数字代表 $delay秒后执行
     * @return bool 创建队列成功或失败
     */
    public static function creaateQueue(string $class_name, array $job_data, int $delay = 0,$name='') : bool
    {
        $delay = (int)$delay;
        $delay < 0 && $delay = 0;
        // 1.当前任务将由哪个类来负责处理。
        //   当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
        $jobHandlerClassName = static::QUEUE_JOBS_NAMESPACE_PREFIX . $class_name;

        // 2.当前任务归属的队列名称,如果为新队列,会自动创建
        $jobQueueName = static::QUEUE_NAME;
       if (!empty($name)){
           $jobQueueName=$name;
       }
        // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
        //   ( jobData 为对象时,存储其public属性的键值对 )

        // 4.将该任务推送到消息队列,等待对应的消费者去执行
        if (0 === $delay) {
            //立即执行
            
            $isPushed = Queue::push($jobHandlerClassName , $job_data , $jobQueueName);
        } else {
            //延迟$delay秒后执行
            $isPushed = Queue::later($delay, $jobHandlerClassName , $job_data , $jobQueueName);
        }

        // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
        if( $isPushed !== false ){
            return true;
        }else{
           return false;
        }
    }
}

在需要调用的地方加上

use app\api\logic\QueueLogic;
public function XXX(){
  QueueLogic::creaateQueue('MakePdfJob', ['id' => $id]);
}

4、消息的消费和删除。
创建jobs/BaseJob.php基类

<?php

namespace app\api\jobs;

use think\queue\Job;
use think\Log;

abstract class BaseJob
{
    abstract public function fire(Job $job, array $job_data);

    /**
     * 错误日志 继承此方法,以便统一格式
     * @param Job $job
     * @param $job_data
     */
    public function log(Job $job, $job_data, $message = '')
    {
        //初始化日志文件目录
        Log::init(['type' => 'File', 'path' => RUNTIME_PATH . 'queue_jobs_log/']);
        Log::record($message . ' 【任务类】:' . var_export($job, true)
            . ' 【数据】:' . var_export($job_data, true));
    }
}

以及MakePdfJob.php类

<?php


namespace app\api\jobs;


use app\api\controller\Pdf;
use think\queue\Job;
/**
 * 
 *
 * Class makePdfJob
 * @package app\admin\jobs
 */
class makePdfJob extends BaseJob
{
    public function fire(Job $job, array $job_data)
    {
        // 有些消息在到达消费者时,可能已经不再需要执行了
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($job_data);
        if(!$isJobStillNeedToBeDone){
            $job->delete();
            return;
        }
        $boolean = $this->executeJob($job_data);
        if ($boolean) {
            //执行成功 删除任务
            $job->delete();
        } else {
            //重试次数大于2次则删除任务
            if ($job->attempts() > 3) {
                $job->delete();
                $message = '任务3次仍然执行失败,删除任务,【任务】:';
                $this->log($job, $job_data, $message);
            }
        }
//        sleep(2);
    }

    /**
     * 检车数据库是否需要被执行
     * @param $job_data
     * @return bool true 继续往下执行 fasle 不执行删除本任务
     */
    protected function checkDatabaseToSeeIfJobNeedToBeDone(array $job_data)
    {
        return true;
    }

    /**
     * 执行任务的逻辑
     * @param $job_data
     */
    protected function executeJob(array $job_data)
    {
        $pdf = new Pdf;
        $return = $pdf->makePdfByJob($job_data);
        if ($return) {
            return true;
        } else {
            return false;
        }
    }

}

5、监听任务并执行
work和listen两种,具体的用法和可选参数可以输入命令加 --help 查看

php think queue:listen  --queue pdfQueue
php think queue:work --daemon --queue pdfQueue

以上queue配置、发布、删除、执行、监听等一个简单的队列完成。实际生产中,我们需要用常驻进程的方式运行,防止因为服务器出现问题导致监听退出手动再次启动,这就得用的我们重点提到的Supervisor。Supervisord 是用 Python 实现的一款的进程管理工具,supervisord 要求管理的程序是非 daemon 程序,supervisord 会帮你把它转成 daemon 程序,因此如果用 supervisord 来管理进程,进程需要以非daemon的方式启动。

6、安装Supervisor

yum install -y supervisor

7、查看Supervisor安装位置
supervisor安装完成后,会在/usr/bin下生成三个执行程序:supervisortd、supervisorctl、echo_supervisord_conf,分别是supervisor的守护进程服务(用于接收进程管理命令)、客户端(用于和守护进程通信,发送管理进程的指令)、生成初始配置文件程序。

[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis supervisord
supervisord: /usr/bin/supervisord /etc/supervisord.d /etc/supervisord.conf
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis echo_supervisord_conf
echo_supervisord_conf: /usr/bin/echo_supervisord_conf
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis supervisorctl    
supervisorctl: /usr/bin/supervisorctl
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# 

8、修改配置文件 vi /etc/supervisord.conf

[include]
files = supervisord.d/*.ini

9、自定义待守护进程配置文件
在 /etc/supervisord.d 下创建以.ini 后缀的文件。编辑如下配置:

[program:pdfQueue] ;程序名称,在 supervisorctl 中通过这个值来对程序进行一系列的操作
command=php /www/wwwroot/templet.tik-im.com/think queue:work --daemon --queue pdfQueue
autostart=true ;在 supervisord 启动的时候也自动启动
autorestart=true ; 程序异常退出后自动重启
user=root ;用哪个用户启动
redirect_stderr=true ;把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes=20MB ;stdout 日志文件大小,默认 50MB
stdout_logfile_backups=20 ;stdout 日志文件备份数
stderr_logfile=/www/wwwroot/templet.tik-im.com/worker_err.log ; 错误日志文件
stdout_logfile=/www/wwwroot/templet.tik-im.com/worker.log  ;输出日志文件

10、Surpervisor的启动

# supervisord二进制启动
supervisord -c /etc/supervisord.conf
# 检查进程
ps aux | grep supervisord
# 更新Supervisor
supervisorctl update

相关文章

网友评论

      本文标题:thinkphp队列think-queue的使用以及通过supe

      本文链接:https://www.haomeiwen.com/subject/dqghjdtx.html