think-queue是thinkphp官方提供的一个消息队列服务,适用于大并发、返回结果时间较长、需要批量操作等专门支持队列服务的扩展包。例如短信发送、模板消息邮件等推送。可以进行发布、获取、执行、删除、重发、失败处理、延迟执行、操作控制等操作。
1、安装think-queue
使用composer进行安装,在项目根目录,执行: composer require topthink/think-queue
注意thinkphp版本 5.X和6.X版本是不一样的
,这里项目是5.0.X,所以我用的是1.1.*
2、配置消息队列的存储环境
执行完composer安装命令后,在application\extra\项目配置目录下生成queue.php配置文件。
<?php
return [
'connector' => 'Redis', // Redis 驱动
'expire' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
'default' => 'default', // 默认的队列名称
'host' => '127.0.0.1', // redis 主机ip
'port' => 6379, // redis 端口
'password' => '', // redis 密码
'select' => 14, // 使用哪一个,倒数第2个用于存放队列任务
'timeout' => 0, // redis连接的超时时间
'persistent' => false, // 是否是长连接
];
3、队列生产者
在模块下添加logic/QueueLogic.php文件用于生产者
<?php
namespace app\api\logic;
use think\Queue;
/**
* 队列逻辑类
* 要使用队列的步骤如下:
* 1. 配置队列配置文件,配置文件位置在 application/extra/queue.php,拷贝queue.php.backup修改为queue.php,并更改配置
* 默认使用redis队列,更换不同环境需配置ip、账号等信息
* 2. 确保cli模式下已开启work进程: php think queue:work --daemon --queue sanhengQueue
* 3. 创建一个队列 QueueLogic::creaateQueue 传入处理该队列数据类名($class_name)、要处理的数据($job_data)、延迟时间($delay)
* 4. 实现处理类 新建一个 app\admin\jobs\$class_name 类,并实现 fire(Job $job, $job_data)方法,方法$job为任务对象类,$job_data
* 为第三步中传入的第二个参数
* Class QueueLogic
* @package app\api\logic
*/
class QueueLogic
{
/**
* 队列名字 需要进入cli模式 执行如下
* php think queue:work --daemon --queue sanhengQueue
*/
const QUEUE_NAME = 'pdfQueue';
/**
* 处理任务类名前缀 后台的处理任务一律放在目录 app\admin\jobs 中
*/
const QUEUE_JOBS_NAMESPACE_PREFIX = 'app\\api\\jobs\\';
/**
* 创建一个队列 该队列的处理逻辑在 app\admin\jobs 下面,demo参考 app\admin\jobs\TestJob
* @param string $class_name 处理类名字 不需要命名空间 只需要传类名 放在 app\admin\jobs目录下的类名
* 该类只需要在fire方法里写处理逻辑即可
* @param array $job_data 传入的数据数组 该数组会
* @param int $delay 延迟时间 0 立即执行 传入大于等于0的数字代表 $delay秒后执行
* @return bool 创建队列成功或失败
*/
public static function creaateQueue(string $class_name, array $job_data, int $delay = 0,$name='') : bool
{
$delay = (int)$delay;
$delay < 0 && $delay = 0;
// 1.当前任务将由哪个类来负责处理。
// 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
$jobHandlerClassName = static::QUEUE_JOBS_NAMESPACE_PREFIX . $class_name;
// 2.当前任务归属的队列名称,如果为新队列,会自动创建
$jobQueueName = static::QUEUE_NAME;
if (!empty($name)){
$jobQueueName=$name;
}
// 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
// ( jobData 为对象时,存储其public属性的键值对 )
// 4.将该任务推送到消息队列,等待对应的消费者去执行
if (0 === $delay) {
//立即执行
$isPushed = Queue::push($jobHandlerClassName , $job_data , $jobQueueName);
} else {
//延迟$delay秒后执行
$isPushed = Queue::later($delay, $jobHandlerClassName , $job_data , $jobQueueName);
}
// database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false
if( $isPushed !== false ){
return true;
}else{
return false;
}
}
}
在需要调用的地方加上
use app\api\logic\QueueLogic;
public function XXX(){
QueueLogic::creaateQueue('MakePdfJob', ['id' => $id]);
}
4、消息的消费和删除。
创建jobs/BaseJob.php基类
<?php
namespace app\api\jobs;
use think\queue\Job;
use think\Log;
abstract class BaseJob
{
abstract public function fire(Job $job, array $job_data);
/**
* 错误日志 继承此方法,以便统一格式
* @param Job $job
* @param $job_data
*/
public function log(Job $job, $job_data, $message = '')
{
//初始化日志文件目录
Log::init(['type' => 'File', 'path' => RUNTIME_PATH . 'queue_jobs_log/']);
Log::record($message . ' 【任务类】:' . var_export($job, true)
. ' 【数据】:' . var_export($job_data, true));
}
}
以及MakePdfJob.php类
<?php
namespace app\api\jobs;
use app\api\controller\Pdf;
use think\queue\Job;
/**
*
*
* Class makePdfJob
* @package app\admin\jobs
*/
class makePdfJob extends BaseJob
{
public function fire(Job $job, array $job_data)
{
// 有些消息在到达消费者时,可能已经不再需要执行了
$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($job_data);
if(!$isJobStillNeedToBeDone){
$job->delete();
return;
}
$boolean = $this->executeJob($job_data);
if ($boolean) {
//执行成功 删除任务
$job->delete();
} else {
//重试次数大于2次则删除任务
if ($job->attempts() > 3) {
$job->delete();
$message = '任务3次仍然执行失败,删除任务,【任务】:';
$this->log($job, $job_data, $message);
}
}
// sleep(2);
}
/**
* 检车数据库是否需要被执行
* @param $job_data
* @return bool true 继续往下执行 fasle 不执行删除本任务
*/
protected function checkDatabaseToSeeIfJobNeedToBeDone(array $job_data)
{
return true;
}
/**
* 执行任务的逻辑
* @param $job_data
*/
protected function executeJob(array $job_data)
{
$pdf = new Pdf;
$return = $pdf->makePdfByJob($job_data);
if ($return) {
return true;
} else {
return false;
}
}
}
5、监听任务并执行
work和listen两种,具体的用法和可选参数可以输入命令加 --help 查看
php think queue:listen --queue pdfQueue
php think queue:work --daemon --queue pdfQueue
以上queue配置、发布、删除、执行、监听等一个简单的队列完成。实际生产中,我们需要用常驻进程的方式运行,防止因为服务器出现问题导致监听退出手动再次启动,这就得用的我们重点提到的Supervisor。Supervisord 是用 Python 实现的一款的进程管理工具,supervisord 要求管理的程序是非 daemon 程序,supervisord 会帮你把它转成 daemon 程序,因此如果用 supervisord 来管理进程,进程需要以非daemon的方式启动。
6、安装Supervisor
yum install -y supervisor
7、查看Supervisor安装位置
supervisor安装完成后,会在/usr/bin下生成三个执行程序:supervisortd、supervisorctl、echo_supervisord_conf,分别是supervisor的守护进程服务(用于接收进程管理命令)、客户端(用于和守护进程通信,发送管理进程的指令)、生成初始配置文件程序。
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]#
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]#
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis supervisord
supervisord: /usr/bin/supervisord /etc/supervisord.d /etc/supervisord.conf
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis echo_supervisord_conf
echo_supervisord_conf: /usr/bin/echo_supervisord_conf
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]#
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]# whereis supervisorctl
supervisorctl: /usr/bin/supervisorctl
[root@iZ2ze7bmgk6rb7s77apcr8Z supervisord.d]#
8、修改配置文件 vi /etc/supervisord.conf
[include]
files = supervisord.d/*.ini
9、自定义待守护进程配置文件
在 /etc/supervisord.d 下创建以.ini 后缀的文件。编辑如下配置:
[program:pdfQueue] ;程序名称,在 supervisorctl 中通过这个值来对程序进行一系列的操作
command=php /www/wwwroot/templet.tik-im.com/think queue:work --daemon --queue pdfQueue
autostart=true ;在 supervisord 启动的时候也自动启动
autorestart=true ; 程序异常退出后自动重启
user=root ;用哪个用户启动
redirect_stderr=true ;把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes=20MB ;stdout 日志文件大小,默认 50MB
stdout_logfile_backups=20 ;stdout 日志文件备份数
stderr_logfile=/www/wwwroot/templet.tik-im.com/worker_err.log ; 错误日志文件
stdout_logfile=/www/wwwroot/templet.tik-im.com/worker.log ;输出日志文件
10、Surpervisor的启动
# supervisord二进制启动
supervisord -c /etc/supervisord.conf
# 检查进程
ps aux | grep supervisord
# 更新Supervisor
supervisorctl update
网友评论