美文网首页
redis stream中pending数据的处理

redis stream中pending数据的处理

作者: 跑马溜溜的球 | 来源:发表于2020-11-16 16:19 被阅读0次

    1. pending数据的产生

    在消费者组模式下,当一个消息被消费者取出,为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读(XREADGROUP)取但并未处理完毕(未ACK)的消息。

    2. 对pending数据的几种处理方式

    下面的讨论基于几点:

    1. 面向的场景为多个无差别消费者(每个消费者名子相同,功能相同)在同一group下消费任务。
    2. 我们要保证的是,每个任务至多只做一次。
    3. 代码实现是在使用redis stream实现队列服务一文的封装基础上实现的。

    2.1 无需处理

    如果你的处理逻辑是:

    getTask()
    delTask()
    yourProcessFuc();
    

    即不太关注任务的丢失,此时无需做什么特别处理。但一定记得delTask(),不然pending队列会越积越多,占用大量存储空间。

    2.2 从pending中按条件读取,放回原队列

        /*
         * 将pending队列中超时的数据重新放回队列
         * 
         * $idleTime: 超时时间, 毫秒
         * $perPage:每次从pending队列中取的任务数, 之所以分页是为防止队列太长,一下取出内存不够
         *
         * 注意:只能有一个进程执行pendingRestore
         *
         * 优点: consumer不需要做任何改动
         * 缺点: 
         * 先del再add, 成本上不划算,
         * 如果del和add中间断掉任务就丢了
         * 无法保留任务被重复投递的次数,不过如果你的任务只想重做一次,或者不关注此数据那就无所谓了。
         * 
         * return: restore的数量
         * */
        public function pendingRestore($idleTime = 5000, $perPage = 20){
            /**
             * 比较简单粗暴的取pending数据方式
             * 依赖
             * 1.每次从pending取走/删除超时数据
             * 2.id是按时间排序,小id未超时,大id一定未超时
             *
             */
            $restoreNum = 0;
            while(1){
                $thisNum = 0;
                $data = $this->getPending($perPage);
    
                foreach($data as $one){
                    $id = $one[0];
                    $duration = $one[2];
                    if ($duration > $idleTime){
                        $data = $this->getRange($id, $id);
                        $task = $data[$id];
    
                        $this->delTask($id);
                        $this->addTask($task);
    
                        $thisNum++;
                    }
                }
                
                $restoreNum += $thisNum;
                
                if ($thisNum < $perPage){
                    break;
                }
            }
    
            return $restoreNum;
        }
        
        /* 从pending队列中取任务
         */
        protected function getPending($count = 1, $start='-', $end='+', $consumer = null){
            if (!$consumer){
                return $this->_mRedis->xPending($this->_mStream, $this->_mGroup, $start, $end, $count);
            }
    
            return $this->_mRedis->xPending($this->_mStream, $this->_mGroup, $start, $end, $count, $consumer);
        }
    
        /*
         * 取[$start, $end]范围内的数据, 注意是闭区间
         *
         * $count:条数,null时表示取全部
         * */
        protected function getRange($start = '-', $end = '+', $count = null){
            if(is_null($count)){
                return $this->_mRedis->xRange($this->_mStream, $start, $end);
            }else{
                return $this->_mRedis->xRange($this->_mStream, $start, $end, $count);
            }
        }
        
    

    2.3 使用claim

    将超时任务放入另一个名子的消费者pending队列中,然后从新的消费者历史数据中取出数据并处理。

        /*
         * 另一种恢复超时任务的方法
         * 思路:将超时任务放入newConsumer的pending中,后续可以从newConsume的历史中取出数据并处理
         *
         * 优点:
         * 恢复数据没有重复读,删,插,效率高
         * 任务投递次数会保留在新的pending中 
         *
         * 缺点:
         * consumer需要做改动,至少要改变consumer的名子
         * 只能用单进程从历史数据中读数据,然后处理。
         *
         *
         * $idleTime: 超时时间, 毫秒
         * $newConsumer: 之后处理pending任务的消费者名称
         * $perPage: 每次取pending任务的条数
         *
         * return: 满足条件且成功claim的条数
         * */
        public function pendingClaim($idleTime = 5000, $newConsumer=null, $perPage = 20){
            if (!$newConsumer){
                return false;
            }
        
            $info = $this->getPendingInfo();
            $startID = $info[1];
            $endID = $info[2];
        
            $claimNum = 0;
            /*
             * 使用startid, endid遍历pending列表
             * 因为getpending取的是[startid, endid]
             * 所以边界处的id可能被重复取出,但不影响结果的正确性
             * perPage越大/符合xclaim条件的id越多,重复的可能性越小
             * */
            while($startID != $endID){
                //var_dump([$startID, $endID]);
                $data = $this->getPending($perPage, $startID, $endID, $this->_mConsumer);
            
                foreach($data as $one){
                    $ids[] = $one[0];
                    $startID = $one[0];
                }
                
                //xClaim会根据条件自动过滤任务
                $res = $this->_mRedis->xClaim($this->_mStream, $this->_mGroup, $newConsumer, $idleTime, $ids, ['JUSTID']);
                
                $thisNum = count($res);   
                $claimNum += $thisNum;
                
                //id是按时间排列,小id未超时,则后面不会超时
                //在所有id都有相同的投递次数的基础上
                if ($thisNum < $perPage){
                    break;
                }
            }
    
            return $claimNum;
        }
    
    

    使用pendingClaim后,可以使用一个单独进程,通过下面方式获取超时任务并处理。

    $config = [
                'server' => '10.10.10.1:6379',
                'stream' => 'balltube',    
                'consumer' => 'pendingProcessor',//pendingClaim中的newConsumer
    ];
    
    $q = new RedisQueue($this->_config);
    $block = 1000;
    $num = 1;
    
    while(1){
        $d = $q->getTask($block, $num, 0);
        if (empty($d)){
            break;
        }
    
        $id = key($d);
        $data = $d[$id];
        $q->delTask($id);
        //处理任务逻辑
        yourTaskProcessFunc($data);
    }
    
    

    3. git代码库

    https://github.com/qmhball/redisQueue

    • RedisQueue.php 队列实现
    • RedisQueueTest.php 对应测试

    相关文章

      网友评论

          本文标题:redis stream中pending数据的处理

          本文链接:https://www.haomeiwen.com/subject/hchwsktx.html