美文网首页
thinkphp5 的 redis 队列实现原理与bug修复

thinkphp5 的 redis 队列实现原理与bug修复

作者: sorry510 | 来源:发表于2021-11-25 11:05 被阅读0次

    使用方式

    composer require topthink/think-queue:1.1.6 // 官方版
    composer require sorry510/think-queue // 修复bug版,断线重连
    

    注意事项

    redis 服务端

    设置超时时间为 0 或一个尽量大的数,否则队列的某个任务执行时间过程,会导致服务器端主动断开连接

    timeout 300
    

    redis 客户端

    application\extra\queue.php, timeout 为 0,永不超时

    <?php
    use think\Env;
    return [
        //Redis驱动
        'connector' => 'redis',
        "expire" => 60, //任务过期时间默认为秒,禁用为null
        "default" => "default", //默认队列名称
        "host" => ENV::get('redis.host', '127.0.0.1'), //Redis主机IP地址
        "port" => ENV::get('redis.port', '6379'), //Redis端口
        "password" => ENV::get('redis.password', ''), //Redis密码
        "select" => ENV::get('redis.index', 0), //Redis数据库索引
        "timeout" => 0, //Redis连接超时时间
        "persistent" => false, //是否长连接
    ];
    

    使用方式

    • listen 模式,每次调用都会开启新的进程,实时调用最新的代码,部署不推荐,特别消耗 cpu 资源
    php think queue:listen --queue=xxx
    
    • work 模式,常驻内存模式,无法调用到修改后的代码,需要重启
    php think queue:work --daemon --queue=xxx
    

    work 实现原理

    redis 数据

    2 种数据类型

    list

    存储将要执行的任务,key 为 queues:队列名称, 例如,投递一个任务queue($job, '', 0, 'test_queue'),就会生成一个 key 为 queues:test_queue 的 list 数据, 数据元素结构如下:

    {
        job:"app\job\JobTest"
        data:""
        id:"CW03tlfol59ClkE7BsC6SuHTIiMkzcVM"
        attempts:1
    }
    
    sorted set
    • 执行中的任务

    key 为 queues:队列名称:reserved, score为当前时间 + 过期时间(--expire),数据内容与 list 一致

    • 投递的延时的任务

    key 为 queues:队列名称:delayed, score为当前时间 + 延时时间(--delay),数据内容与 list 一致

    • 执行失败时的重新发布的任务

    key 为 queues:队列名称:delayed, score为当前时间 + 延时时间(--delay),数据内容与 list 一致

    运行示意图

    tp5.0 redis 队列.jpg

    运行方式

    1. 无延时投递任务时时以 left 方式 push 到 list,延时时写入到 sorted set 中
    2. 运行时使用轮训(查看代码 vendor\topthink\think-queue\src\queue\command\Work.php 的 107 行 while (true) {)方式运行,不断循环执行,每次执行后都会睡眠 3 秒钟(可以配置 --sleep 1 修改为 1 秒)
    3. 执行时首先会查询延迟任务(key 为 queues:队列名称:delayed),如果延时到期了,就从set中删除此任务移动到list中(right push)
    4. 然后查询执行失败的任务,从 set 移动到 list(right push),并记录执行次数+1
    5. 之后开始正式的执行环节,首先会从 list 中 right pop 最新的记录,移动到 sorted set(执行中的任务) 中然后开始执行 job
    6. 执行时,首先会判断执行次数是否大于尝试次数(可以配置 --tries 3 设置为3,默认为 0 永久重试),如果大于尝试次数,就从 sorted set 中删除对应数据,并执行 job 的 failed 方法,反之,就正常执行 job 的 fire 方法,成功之后删除 set 中的对应数据,等待休眠,进行下一次循环中,如果 job 的 fire 方法抛出异常且没有被删除(手动调用 $job->delete() 方法),就重新发布 job
    7. 发布逻辑为从 set 中删除之前的对应数据,重新添加一条数据, queues:队列名称:delayed, score 为 当前时间 + 延时时间(--delay),并记录执行次数+1
    8. 下面为主体执行代码逻辑,详情请查看源码 vendor\topthink\think-queue\src\queue\Worker.php 文件
    public function process(Job $job, $maxTries = 0, $delay = 0)
    {
        if ($maxTries > 0 && $job->attempts() > $maxTries) {
            return $this->logFailedJob($job);
        }
    
        try {
            $job->fire();
    
            return ['job' => $job, 'failed' => false];
        } catch (Exception $e) {
            if (!$job->isDeleted()) {
                $job->release($delay);
            }
    
            throw $e;
        }
    }
    

    相关文章

      网友评论

          本文标题:thinkphp5 的 redis 队列实现原理与bug修复

          本文链接:https://www.haomeiwen.com/subject/xuitxrtx.html