美文网首页
Laravel 文件系统及队列处理

Laravel 文件系统及队列处理

作者: phpnet | 来源:发表于2019-12-19 10:58 被阅读0次

    队列

    Laravel 队列为不同的后台队列服务提供统一的 API,这些队列介质可以是 Beanstalk、Redis,也关系型数据库。使用队列的主要目的是将耗时的任务延时处理,比如发送邮件、发送短信,从而大幅度缩短 Web 请求和响应的时间。

    队列特点:

    • 异步(延时);
    • 削峰(并发);
    • 重试(失败);

    队列配置文件存放在 config/queue.php 文件中,其中包含了同步(本地使用)驱动和 null 队列驱动,null 队列主要用于那些放弃队列的任务。

    驱动介质

    Database

    // 创建队列数据库迁移表
    php artisan queue:table
    // 创建队列失败的迁移表
    php artisan queue:failed-table
    // 运行迁移命令
    php artisan migrate
    

    Redis

    如果你的 Redis 队列驱动使用了 Redis 集群,你的队列名必须包含一个 key hash tag 。这是为了确保所有的 Redis 键对于一个队列都被放在同一哈希中。

    'redis' => [
        'driver' => 'redis',
        'connection' => 'default',
        'queue' => '{default}',
        'retry_after' => 90,
    ]
    

    相关概念

    在开始使用 Laravel 队列前,弄明白 「连接」 和 「队列」 的区别是很重要的。

    • 连接:config/queue.php 文件中 connections 配置选项;
    • 队列:可以被认为是不同的栈或者大量的队列任务;

    创建队列

    在你的应用程序中,队列的任务类都默认放在 app/Jobs 目录下。如果这个目录不存在,那当你运行 make:job Artisan 命令时目录就会被自动创建。

    $ php artisan make:job ProcessPodcast
    
    <?php
    
    namespace App\Jobs;
    
    use Illuminate\Bus\Queueable;
    use Illuminate\Contracts\Queue\ShouldQueue;
    use Illuminate\Foundation\Bus\Dispatchable;
    use Illuminate\Queue\InteractsWithQueue;
    use Illuminate\Queue\SerializesModels;
    
    class ProcessPodcast implements ShouldQueue
    {
        use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
    
        /**
         * 应该处理任务的队列连接
         *
         * @var string
         */
        public $connection = 'sqs';
    
        /**
         * 任务可以尝试的最大次数
         *
         * @var int
         */
        public $tries = 5;
    
        /**
         * 任务可以执行的最大时间(注意retry_after)
         *
         * @var int
         */
        public $timeout = 60;
    
        /**
         * 队列运行时需要的参数
         *
         * @var array
         */
        public $params = [];
    
        /**
         * 创建队列实例
         *
         * @return void
         */
        public function __construct($params)
        {
            $this->params = $params;
        }
    
        /**
         * 执行队列任务
         *
         * @return void
         */
        public function handle()
        {
            // Redis::throttle('key')->block(0)->allow(1)->every(5)->then(
            //     function () {
            //         info('Lock obtained...');
            //         // 处理队列
            //     },
            //     function () {
            //         // 无法获取锁
            //         return $this->release(5);
            //     }
            // );
            $result = app('App\Services\CollectService')->handlerCollectData($this->params);
            if ($result < 0) {
                if ($this->attempts() < $this->tries) {
                    $this->release(5);
                }
            }
            return $result;
        }
    
        /**
         * 任务失败的处理过程
         *
         * @param Exception $exception
         * @return void
         */
        public function failed(\Exception $exception)
        {
            info('运行出现错误:' . $exception->getMessage() . ',参数是:', $this->params);
        }
    
        // /**
        //  * 运行队列中间件
        //  *
        //  * @return array
        //  */
        // public function middleware()
        // {
        //     return [new RateLimited];
        // }
        //
        // /**
        //  * 定义任务超时时间(在给定的时间范围内,任务可以无限次尝试)
        //  *
        //  * @return \DateTime
        //  */
        // public function retryUntil()
        // {
        //     return now()->addSeconds(5);
        // }
    }
    

    队列中间件

    /**
     * 执行队列任务
     *
     * @return void
     */
    public function handle()
    {
        Redis::throttle('key')->block(0)->allow(1)->every(5)->then(
            function () {
                info('Lock obtained...');
                // 处理队列
            },
            function () {
                // 无法获取锁
                return $this->release(5);
            }
        );
    }
    

    虽然这段代码有效,但 handle 方法的结构变得有噪声,因为它与 Redis 速率限制逻辑混杂在一起。此外,对于我们要进行速率限制的任何其他任务,必须复制此速率限制逻辑。

    <?php
    namespace App\Jobs\Middleware;
    
    use Illuminate\Support\Facades\Redis;
    
    class RateLimited
    {
        /**
         * 处理队列中的任务.
         *
         * @param  mixed  $job
         * @param  callable  $next
         * @return mixed
         */
        public function handle($job, $next)
        {
            Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () use ($job, $next) {
                // 锁定
                $next($job);
            }, function () use ($job) {
                // 无法获取锁
                $job->release(5);
            });
        }
    }
    

    分发队列

    // 分发到默认队列
    ProcessPodcast::dispatch($podcast);
    // 延迟分发队列
    ProcessPodcast::dispatch($podcast)->delay(now()->addMinutes(10));
    // 同步执行队列
    ProcessPodcast::dispatchNow($podcast);
    // 发送到指定队列
    ProcessPodcast::dispatch($podcast)->onQueue('emails');
    // 分发到指定连接
    ProcessPodcast::dispatch($podcast)->onConnection('sqs');
    // 指定连接和队列
    ProcessPodcast::dispatch($podcast)->onConnection('sqs')->onQueue('processing');
    
    // 队列任务链:任务链允许你具体定义一个按序列执行队列任务的列表,一旦序列中的任务失败了,剩余的工作将不会执行。
    ProcessPodcast::withChain([
        new OptimizePodcast,
        new ReleasePodcast
    ])->dispatch();
    
    // 使用函数分发队列
    dispatch((new Job)->onQueue('high'));
    

    执行队列:

    $ php artisan queue:work --queue=high,low,default
    $ php artisan queue:restart
    $ php artisan queue:work --timeout=60
    

    supervisor

    sudo apt-get install supervisor
    

    Supervisor 的配置文件通常位于 /etc/supervisor/conf.d 目录下。在该目录中,你可以创建任意数量的配置文件,用来控制 supervisor 将如何监控你的进程。

    ; supervisor config file
    
    [unix_http_server]
    file=/var/run/supervisor.sock   ; (the path to the socket file)
    chmod=0700                      ; sockef file mode (default 0700)
    
    [supervisord]
    logfile=/data/wwwlogs/supervisord.log  ; (main log file;default $CWD/supervisord.log)
    pidfile=/var/run/supervisord.pid     ; (supervisord pidfile;default supervisord.pid)
    childlogdir=/data/wwwlogs           ; ('AUTO' child log dir, default $TEMP)
    
    [rpcinterface:supervisor]
    supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
    
    [supervisorctl]
    serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL  for a unix socket
    
    [include]
    files = /etc/supervisor/conf.d/*.conf
    
    [program:laravel-worker]
    process_name=%(program_name)s_%(process_num)02d
    command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
    autostart=true
    autorestart=true
    user=forge
    numprocs=8
    redirect_stderr=true
    stdout_logfile=/home/forge/app.com/worker.log
    
    $ sudo supervisorctl reread
    $ sudo supervisorctl update
    $ sudo supervisorctl start laravel-worker:*
    

    相关命令

    // 手动重试失败队列
    $ php artisan queue:failed
    // 重试任务指定队列
    $ php artisan queue:retry 5
    // 重试所有失败的任务
    $ php artisan queue:retry all
    // 删除某个失败的任务
    $ php artisan queue:forget 5
    // 清空所有失败的任务
    $ php artisan queue:flush
    

    注意事项

    • Laravel 队列在执行异常时会自动被释放回队列并再次尝试运行,直到成功或者达到所设定的最大失败重试次数。
    • 如果通过 --queue 命令行或 onQueue 方法指定了队列优先级,没有将指定队列和默认队列添加到优先级中,那么你的队列任务将永远也不会执行。
    • 由于队列处理器都是常驻进程,如果代码改变而队列处理器没有重启,是不能应用新代码的,最简单的方式就是重新部署过程中要重启队列处理器。
    • --delay 延迟执行队列任务。
    • --timeout 设置每个任务被允许运行秒数,0 代表不限制时间,--timeout 的值应该比你在 retry_after 中配置的值至少短几秒。
    • --sleep 指定队列要等待多少秒后才能取新的任务来运行。注意:队列在没有任务的状态下才会休眠,如果已经有多个任务在这个队列上等待执行,那么它会持续运行而不休眠。

    相关文章

      网友评论

          本文标题:Laravel 文件系统及队列处理

          本文链接:https://www.haomeiwen.com/subject/hykmnctx.html