美文网首页
通过redis的有序集合[zset] 实现延迟队列

通过redis的有序集合[zset] 实现延迟队列

作者: 骑蚂蚁上高速_jun | 来源:发表于2020-10-03 22:33 被阅读0次

    php使用redis的有序集合zset实现延迟队列

    我们通过redis的有序集合zset来实现简单的延迟队列,将消息数据序列化,作为zset的基本元素,把 消息生产时间戳 + 消息处理延迟时间戳 作为score,每次通过zRangeByScore获取一条消息进行处理,后通过zRem删除集合元素:相当于移除需要消费的 Job。

    优点:

    1. 实现简单,适合做中小型对延迟时间要求不高的业务场景。

    缺点:
    1.不适合延迟时间高的业务场景。延迟时间可能有几秒钟的误差。
    2.不适合大型项目 ,大型项目建议使用rabbitmq的延迟i消息队列

    下面是简单的实现demo

    /**
     * redis 通过有序集合 zset 实现延迟队列 发送定时邮件
     */
    class DelayQueue
    {
    
        private $zsetQueue = "delay:email";
    
        /**
         * 链接redis 服务器
         */
        public function connect():\Redis{
            try {
                $redis = new \Redis();
                $redis->connect("127.0.0.1",6379);
                $redis->auth("123456");
                return $redis;
            } catch (Exception $e) {
                throw new Exception("connect error");
            }
        }
    
        /**
         * 生产者
         * @param int $secreod 延迟的时间 单位:秒
         * @param mixed $data 投入到队列的数据,由于集合元素具有唯一性,故在调用生产消息时,在消息体$data中加入 uuid属性,确保消息体的唯一性
         * @return int|false 返回值=1的时候表示成功
         */
        public function produceTask(int $secreod,/* */ $data){
            $redis = $this->connect();
            // 向有序集合插入一个元素,元素关联一个数值,插入成功返回1,同时集合元素不可以重复, 如果元素已经存在返回 0
            return $redis->zAdd(
                $this->zsetQueue, // 集合队列名称
                time()+$secreod,// 相当于 score 
                json_encode($data,JSON_UNESCAPED_UNICODE) // 需要消费的消息
            );
        }
    
        /**
         * 消费者方法,在cli命令行中运行。建议最好在swoole中运行
         * 纯演示代码,模拟守护进程在cli中运行
         */
        public function consumeTask(){
            $redis = $this->connect();
            while (true) {
                sheep(3);
                try{
                    if($redis->ping() != "+PONG"){
                        $redis = $this->connect();
                    }
                }catch(Throwable $e){
                    return null;// 模拟跳出当前函数,退出当前子进程,swoole会重新拉起新的进程
                }
                // 取出排序值 score=0-当前时间戳的元素,一次取出一个元素
                $job = $redis->zRangeByScore($this->zsetQueue, 0, time(), ["limit" => [0, 1]]);
                if(!$job || empty($job[0])) continue;// 无消费数据,返回
                $json = $job[0];
                $redis->zRem($this->zsetQueue,$json); // 删除集合中的元素[移除队列中的job],删除成功返回1
                $std = json_decode($json);
                // ...处理你的业务逻辑
                // ... todo
            }
        }
    }
    
    $delayQueue = new DelayQueue();
    // 将消息加入zset 消息队列 ,延迟30s执行
    $delayQueue ->produceTask(30,["id"=>1,"msg"=>"....."]);
    // 将消息加入zset 消息队列 ,延迟2小时执行
    $delayQueue ->produceTask(2*3600,["id"=>2,"msg"=>"....."]);
    
    
    
    

    相关文章

      网友评论

          本文标题:通过redis的有序集合[zset] 实现延迟队列

          本文链接:https://www.haomeiwen.com/subject/wndguktx.html