队列
Laravel 队列为不同的后台队列服务提供统一的 API,这些队列介质可以是 Beanstalk、Redis,也关系型数据库。使用队列的主要目的是将耗时的任务延时处理,比如发送邮件、发送短信,从而大幅度缩短 Web 请求和响应的时间。
队列特点:
- 异步(延时);
- 削峰(并发);
- 重试(失败);
队列配置文件存放在 config/queue.php 文件中,其中包含了同步(本地使用)驱动和 null 队列驱动,null 队列主要用于那些放弃队列的任务。
驱动介质
Database
// 创建队列数据库迁移表
php artisan queue:table
// 创建队列失败的迁移表
php artisan queue:failed-table
// 运行迁移命令
php artisan migrate
Redis
如果你的 Redis 队列驱动使用了 Redis 集群,你的队列名必须包含一个 key hash tag 。这是为了确保所有的 Redis 键对于一个队列都被放在同一哈希中。
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
]
相关概念
在开始使用 Laravel 队列前,弄明白 「连接」 和 「队列」 的区别是很重要的。
- 连接:config/queue.php 文件中 connections 配置选项;
- 队列:可以被认为是不同的栈或者大量的队列任务;
创建队列
在你的应用程序中,队列的任务类都默认放在 app/Jobs 目录下。如果这个目录不存在,那当你运行 make:job Artisan 命令时目录就会被自动创建。
$ php artisan make:job ProcessPodcast
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* 应该处理任务的队列连接
*
* @var string
*/
public $connection = 'sqs';
/**
* 任务可以尝试的最大次数
*
* @var int
*/
public $tries = 5;
/**
* 任务可以执行的最大时间(注意retry_after)
*
* @var int
*/
public $timeout = 60;
/**
* 队列运行时需要的参数
*
* @var array
*/
public $params = [];
/**
* 创建队列实例
*
* @return void
*/
public function __construct($params)
{
$this->params = $params;
}
/**
* 执行队列任务
*
* @return void
*/
public function handle()
{
// Redis::throttle('key')->block(0)->allow(1)->every(5)->then(
// function () {
// info('Lock obtained...');
// // 处理队列
// },
// function () {
// // 无法获取锁
// return $this->release(5);
// }
// );
$result = app('App\Services\CollectService')->handlerCollectData($this->params);
if ($result < 0) {
if ($this->attempts() < $this->tries) {
$this->release(5);
}
}
return $result;
}
/**
* 任务失败的处理过程
*
* @param Exception $exception
* @return void
*/
public function failed(\Exception $exception)
{
info('运行出现错误:' . $exception->getMessage() . ',参数是:', $this->params);
}
// /**
// * 运行队列中间件
// *
// * @return array
// */
// public function middleware()
// {
// return [new RateLimited];
// }
//
// /**
// * 定义任务超时时间(在给定的时间范围内,任务可以无限次尝试)
// *
// * @return \DateTime
// */
// public function retryUntil()
// {
// return now()->addSeconds(5);
// }
}
队列中间件
/**
* 执行队列任务
*
* @return void
*/
public function handle()
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(
function () {
info('Lock obtained...');
// 处理队列
},
function () {
// 无法获取锁
return $this->release(5);
}
);
}
虽然这段代码有效,但 handle 方法的结构变得有噪声,因为它与 Redis 速率限制逻辑混杂在一起。此外,对于我们要进行速率限制的任何其他任务,必须复制此速率限制逻辑。
<?php
namespace App\Jobs\Middleware;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* 处理队列中的任务.
*
* @param mixed $job
* @param callable $next
* @return mixed
*/
public function handle($job, $next)
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () use ($job, $next) {
// 锁定
$next($job);
}, function () use ($job) {
// 无法获取锁
$job->release(5);
});
}
}
分发队列
// 分发到默认队列
ProcessPodcast::dispatch($podcast);
// 延迟分发队列
ProcessPodcast::dispatch($podcast)->delay(now()->addMinutes(10));
// 同步执行队列
ProcessPodcast::dispatchNow($podcast);
// 发送到指定队列
ProcessPodcast::dispatch($podcast)->onQueue('emails');
// 分发到指定连接
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
// 指定连接和队列
ProcessPodcast::dispatch($podcast)->onConnection('sqs')->onQueue('processing');
// 队列任务链:任务链允许你具体定义一个按序列执行队列任务的列表,一旦序列中的任务失败了,剩余的工作将不会执行。
ProcessPodcast::withChain([
new OptimizePodcast,
new ReleasePodcast
])->dispatch();
// 使用函数分发队列
dispatch((new Job)->onQueue('high'));
执行队列:
$ php artisan queue:work --queue=high,low,default
$ php artisan queue:restart
$ php artisan queue:work --timeout=60
supervisor
sudo apt-get install supervisor
Supervisor 的配置文件通常位于 /etc/supervisor/conf.d 目录下。在该目录中,你可以创建任意数量的配置文件,用来控制 supervisor 将如何监控你的进程。
; supervisor config file
[unix_http_server]
file=/var/run/supervisor.sock ; (the path to the socket file)
chmod=0700 ; sockef file mode (default 0700)
[supervisord]
logfile=/data/wwwlogs/supervisord.log ; (main log file;default $CWD/supervisord.log)
pidfile=/var/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
childlogdir=/data/wwwlogs ; ('AUTO' child log dir, default $TEMP)
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL for a unix socket
[include]
files = /etc/supervisor/conf.d/*.conf
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
$ sudo supervisorctl reread
$ sudo supervisorctl update
$ sudo supervisorctl start laravel-worker:*
相关命令
// 手动重试失败队列
$ php artisan queue:failed
// 重试任务指定队列
$ php artisan queue:retry 5
// 重试所有失败的任务
$ php artisan queue:retry all
// 删除某个失败的任务
$ php artisan queue:forget 5
// 清空所有失败的任务
$ php artisan queue:flush
注意事项
- Laravel 队列在执行异常时会自动被释放回队列并再次尝试运行,直到成功或者达到所设定的最大失败重试次数。
- 如果通过 --queue 命令行或 onQueue 方法指定了队列优先级,没有将指定队列和默认队列添加到优先级中,那么你的队列任务将永远也不会执行。
- 由于队列处理器都是常驻进程,如果代码改变而队列处理器没有重启,是不能应用新代码的,最简单的方式就是重新部署过程中要重启队列处理器。
- --delay 延迟执行队列任务。
- --timeout 设置每个任务被允许运行秒数,0 代表不限制时间,--timeout 的值应该比你在 retry_after 中配置的值至少短几秒。
- --sleep 指定队列要等待多少秒后才能取新的任务来运行。注意:队列在没有任务的状态下才会休眠,如果已经有多个任务在这个队列上等待执行,那么它会持续运行而不休眠。
网友评论