使用方式
composer require topthink/think-queue:1.1.6 // 官方版
composer require sorry510/think-queue // 修复bug版,断线重连
注意事项
redis 服务端
设置超时时间为 0 或一个尽量大的数,否则队列的某个任务执行时间过程,会导致服务器端主动断开连接
timeout 300
redis 客户端
application\extra\queue.php, timeout 为 0,永不超时
<?php
use think\Env;
return [
//Redis驱动
'connector' => 'redis',
"expire" => 60, //任务过期时间默认为秒,禁用为null
"default" => "default", //默认队列名称
"host" => ENV::get('redis.host', '127.0.0.1'), //Redis主机IP地址
"port" => ENV::get('redis.port', '6379'), //Redis端口
"password" => ENV::get('redis.password', ''), //Redis密码
"select" => ENV::get('redis.index', 0), //Redis数据库索引
"timeout" => 0, //Redis连接超时时间
"persistent" => false, //是否长连接
];
使用方式
- listen 模式,每次调用都会开启新的进程,实时调用最新的代码,部署不推荐,特别消耗 cpu 资源
php think queue:listen --queue=xxx
- work 模式,常驻内存模式,无法调用到修改后的代码,需要重启
php think queue:work --daemon --queue=xxx
work 实现原理
redis 数据
2 种数据类型
list
存储将要执行的任务,key 为
queues:队列名称
, 例如,投递一个任务queue($job, '', 0, 'test_queue')
,就会生成一个 key 为queues:test_queue
的 list 数据, 数据元素结构如下:
{
job:"app\job\JobTest"
data:""
id:"CW03tlfol59ClkE7BsC6SuHTIiMkzcVM"
attempts:1
}
sorted set
- 执行中的任务
key 为
queues:队列名称:reserved
, score为当前时间 + 过期时间(--expire)
,数据内容与 list 一致
- 投递的延时的任务
key 为
queues:队列名称:delayed
, score为当前时间 + 延时时间(--delay)
,数据内容与 list 一致
- 执行失败时的重新发布的任务
key 为
queues:队列名称:delayed
, score为当前时间 + 延时时间(--delay)
,数据内容与 list 一致
运行示意图
tp5.0 redis 队列.jpg运行方式
- 无延时投递任务时时以 left 方式 push 到 list,延时时写入到 sorted set 中
- 运行时使用轮训(查看代码
vendor\topthink\think-queue\src\queue\command\Work.php
的 107 行while (true) {
)方式运行,不断循环执行,每次执行后都会睡眠 3 秒钟(可以配置 --sleep 1 修改为 1 秒) - 执行时首先会查询延迟任务(key 为
queues:队列名称:delayed
),如果延时到期了,就从set中删除此任务移动到list中(right push) - 然后查询执行失败的任务,从 set 移动到 list(right push),并记录执行次数+1
- 之后开始正式的执行环节,首先会从 list 中 right pop 最新的记录,移动到 sorted set(执行中的任务) 中然后开始执行 job
- 执行时,首先会判断执行次数是否大于尝试次数(可以配置 --tries 3 设置为3,默认为 0 永久重试),如果大于尝试次数,就从 sorted set 中删除对应数据,并执行 job 的 failed 方法,反之,就正常执行 job 的 fire 方法,成功之后删除 set 中的对应数据,等待休眠,进行下一次循环中,如果 job 的 fire 方法抛出异常且没有被删除(手动调用 $job->delete() 方法),就重新发布 job
- 发布逻辑为从 set 中删除之前的对应数据,重新添加一条数据,
queues:队列名称:delayed
, score 为当前时间 + 延时时间(--delay)
,并记录执行次数+1 - 下面为主体执行代码逻辑,详情请查看源码
vendor\topthink\think-queue\src\queue\Worker.php
文件
public function process(Job $job, $maxTries = 0, $delay = 0)
{
if ($maxTries > 0 && $job->attempts() > $maxTries) {
return $this->logFailedJob($job);
}
try {
$job->fire();
return ['job' => $job, 'failed' => false];
} catch (Exception $e) {
if (!$job->isDeleted()) {
$job->release($delay);
}
throw $e;
}
}
网友评论