使用redis stream实现队列服务

作者: 跑马溜溜的球 | 来源:发表于2020-11-16 16:17 被阅读0次

    1. stream类型

    Redis5.0引入了Stream类型。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:

    • 消息ID的序列化生成
    • 消息遍历
    • 消息的阻塞和非阻塞读取
    • 消息的分组消费
    • 未完成消息的处理

    关于stream的具体介绍可以参见:

    2. 队列接口简介

    我们基于redis stream实现了一个基础的,类似beanstalk的队列服务。用于多个无差别的消费者从一个队列消费任务的情况。如果您对stream有所了解,那么我们其实是使用了stream+group当作了beanstalk的tube。

    提供最基础的功能:

    • addTask:添加任务。
    /*                                                         
     * 向流中添加任务                                          
     * $data: 数组形式的任务数据                                            
     * return: 任务id                                          
     */    
    addTask(array $data){}
    
    • getTask:获取任务。
     /*                                                         
      * 获取任务                                       
      * $block:阻塞时间,毫秒. null不阻塞                       
      * $count:读取条数, 只要有数据,条数不够也会立刻返回,即使设置了block。
      * $start:'>'表示接受最新数据. 若设置id,则读取大于该id,且未被确认(ack)的历史任务
      * 普通使用时,只要设置$block即可。
      * 
      * return [
      *      'id1' => taskdata1,                                
      *      'id2' => taskdata2,                                
      *      ... ...                                            
      * ]
      *
      * 无数据返回[]                                            
      */    
    getTask($block=null, $count = 1, $start = '>'){}
    
    • delTask:删除任务
    /*
     * 根据id确认任务完成并从stream中删除该任务
     * $ids: 可以是单条taskid,也可以是数组形式的多条id
     *
     * 
     * 该方法其实完成了两个动作
     * ack:确认任务完成
     * del:stream中删除任务
     * 所以返回值中包括两个值,第一个为ack是否成功,第二个为del是否成功
     */
    delTask($ids){}
    

    3. 代码实现

    <?php
    /*
     * 需要redis-server5.0以上 
     * php-redis扩展版本要适配redis-5.0
     * 
     * 使用redis stream仿照beanstalk封装的队列服务
     */
    class RedisQueue{
        protected $_mRedis = null;
        protected $_mStream = '';
        protected $_mGroup = '';
        protected $_mConsumer = '';
    
        //默认0 不限制长度
        protected $_mMaxLength = 0;
    
        /* 
         * 创建队列, stream+group确认唯一队列
         * $config必须包括:
         * stream: stream名
         * server: 格式ip:port[:auth]
         * 
         * 可选参数:
         * maxLength:队列最大长度
         * group:分组名, 默认与stream相同. stream+group相当于beanstalk的tube
         * consumer:消费者名, 默认与stream相同. 
         * */
        public function __construct(array $config){
            if(!isset($config['stream'])){
                throw new Exception("you must config the stream");
            }
    
            $this->_mStream = $config['stream'];
    
            if(!isset($config['server'])){
                throw new Exception("you must config the server");
            }
    
            $tmp = explode(':', $config['server']);
            $host = $tmp[0];
            $port = $tmp[1];
            $auth = $tmp[2] ?? null;
    
            if ($host && $port){
                $this->_mRedis = new Redis();
                $this->_mRedis->connect($host,$port,1);
                if($auth){
                    $this->_mRedis->auth($auth);
                }
            }
            else{
                throw new Exception("can not get redis server conf");
            }
    
            if(isset($config['maxLength'])){
                $this->_mMaxLength = $config['maxLength'];
            }
    
            $this->_mGroup = $config['group'] ?? $config['stream'];       
            $this->_mConsumer = $config['consumer'] ?? $config['stream'];
    
            $this->creatGroup();
        }
    
        /*
         * 删除当前流(队列)
         * */
        public function destoryStream(){
            $this->_mRedis->del($this->_mStream);
        }
    
        /*
         * 向流中添加任务
         * $data: array
         * return: taskid
         * */
        public function addTask(array $data){
            return $this->_mRedis->xAdd($this->_mStream, "*", $data , $this->_mMaxLength);
        }
    
        /*
         * 从group中获取任务
         * $block:阻塞时间,毫秒. null不阻塞
         * $count:读取条数, 只要有数据,条数不够也会立刻返回,即使设置了block
         * $start:'>'接受最新数据. 若设置id,则读取大于该id,且未被ack的历史任务
         *
         * return [
         *      'id1' => taskdata1,
         *      'id2' => taskdata2,
         *      ... ...
         * ]
         *
         * 无数据返回[]
         * */
        public function getTask($block=null, $count = 1, $start = '>'){
            $d = $this->_mRedis->xReadGroup($this->_mGroup, $this->_mConsumer, [$this->_mStream => $start], $count, $block);
    
            if (is_array($d) && count($d) > 0){
                return $d[$this->_mStream];
            }
    
            return $d;
        }
        
        /*
         * ack任务--从pending中删除
         * 同时从stream中删除
         */
        public function delTask($ids){
            if(!is_array($ids)){
                $ids = array($ids);
            }
            $multi = $this->_mRedis->multi(Redis::PIPELINE);
    
            $multi->xAck($this->_mStream, $this->_mGroup, $ids);
            $multi->xDel($this->_mStream, $ids);        
            $res = $this->_mRedis->exec();        
            return $res;
        }
    
        protected function creatGroup($startID = 0){
            return $this->_mRedis->xGroup('CREATE', $this->_mStream, $this->_mGroup, $startID, true);
        }
    }
    

    git代码库
    https://github.com/qmhball/redisQueue

    • RedisQueue.php 队列实现
    • RedisQueueTest.php 对应测试

    4. 使用示例

    $config = [
            'server' => '10.10.10.1:6379:auth',
            'stream' => 'balltube', 
            'consumer' => 'normalprocessor'//可以不设置
        ];
    
    //创建队列
    $q = new RedisQueue($config);
    
    //添加任务
    $task = ['task'=>1];
    $q->addTask($task);
    
    //获取
    $timeout = 1000;
    $task = $q->getTask($timeout);
    
    //确认并删除
    $taskid = key($task);
    $q->delTask($taskid);
    

    5. 对于pending任务的处理

    当任务被取出且未被确认时,该任务处理pending状态。beanstalk中,对于这种任务可以设置一个超时时间timeout,当任务超过timeout未被确认,该任务会被还回队列中。对于stream,应该如何处理这种任务呢?请参见:

    《关于redis stream中pending数据的处理》

    6. beanstalk与redis的stream队列性能对比

    6.1 测试环境

    • 队列所在机器配置:4CPU, 6G内存。redis开启aof,每一秒钟持久化一次。
    • 压测机:8CPU,24G。

    6.2 测试结果

    在任务大小为1k和10k的时候,开启不同个数的进程进行10000次读/写操作,测试结果如下:

    任务大小为1k

    进程数 10 20 50
    redis万次读 1.64928s 0.864051s 0.542352s
    beanstalk万次读 1.702436s 0.915132s 0.503198s
    redis万次写 3.328083s 1.714555s 0.837429s
    beanstalk次写 3.402431s 1.702654s 0.9317s

    任务大小为10k

    进程数 10 20 50
    redis万次读 1.962591s 1.569581s 1.001159s
    beanstalk万次读 3.30333s 1.72248s 0.940097s
    redis万次写 3.360724s 1.77125s 0.921126s
    beanstalk次写 3.418932s 1.766198s 0.823796s

    7. redis stream队列与beanstalk队列整体比较

    stream beanstalk
    主从 支持 不支持
    性能 相当 相当
    任务持久化 支持 支持
    任务优先级 不支持 支持
    任务延迟 不支持 支持
    超时任务 额外处理 自动
    批量任务读写 支持 不支持

    相关文章

      网友评论

        本文标题:使用redis stream实现队列服务

        本文链接:https://www.haomeiwen.com/subject/xnkzbktx.html