美文网首页
Redis消息队列

Redis消息队列

作者: 阳公子_ | 来源:发表于2020-07-23 11:33 被阅读0次

    一、概述

    消息队列,Message Queue,常用于解决并发系统中的资源一致性问题,提升峰值的处理能力,同时保证消息的顺序性、可恢复性、必送达性,对应用进行解耦,或者实现异步通讯等。市面上的 MQ应用有很多(例如:Kafka,RabbitMQ,Disque),同时也可以基于 Redis 来实现,比较典型的方案有:

    • 基于List的 LPUSH+BRPOP 的实现
    • PUB/SUB,订阅/发布模式
    • 基于Sorted-Set的实现
    • 基于Stream类型的实现
      在消息队列使用中,有生产者producter和消费者consumer。生产者负责生成消息,消费者负责使用处理消息。生产,指的是将消息放入消息队列。 消费,指的是读取并处理消息。通常一个消息再被消费后,就应该从消息队列中删除。


    二、实现

    1、基于List的LPUSH+BRPOP的实现
    LPUSH,将消息放入消息队列(生产者)
    BRPOP,从队列中取出消息,阻塞模式(消费者)
    

    TBase中不支持BRPOP,只支持RPOP,BRPOP是RPOP的阻塞版本
    该模式优点:

    • 实现简单
    • Reids支持持久化消息,意味着消息不会丢失,可以重复查看(注意不是消费,只看不用,LRANGE类的指令)。
    • 可以保证顺序,保证使用LPUSH命令,可以保证消息的顺序性
    • 使用RPUSH,可以将消息放在队列的开头,达到优先消息的目的,可以实现简易的消息优先队列。
      该模式缺点:
    • 做消费确认ACK比较麻烦,就是不能保证消费者在读取之后,未处理后的宕机问题。导致消息意外丢失。通常需要自己维护一个Pending列表,保证消息的处理确认。
    • 不能做广播模式,例如典型的Pub/Discribe模式。
    • 不能重复消费,一旦消费就会被删除
    • 不支持分组消费,需要自己在业务逻辑层解决
    2、PUB/SUB,订阅/发布模式
    SUBSCRIBE,用于订阅信道
    PUBLISH,向信道发送消息
    UNSUBSCRIBE,取消订阅
    

    生产者和消费者通过相同的一个信道(Channel)进行交互。信道其实也就是队列。通常会有多个消费者。多个消费者订阅同一个信道,当生产者向信道发布消息时,该信道会立即将消息逐一发布给每个消费者。可见,该信道对于消费者是发散的信道,每个消费者都可以得到相同的消息。典型的对多的关系。
    该模式优点:

    • 典型的广播模式,一个消息可以发布到多个消费者
    • 多信道订阅,消费者可以同时订阅多个信道,从而接收多类消息
    • 消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息
      该模式缺点:
    • 消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回
    • 不能保证每个消费者接收的时间是一致的
    • 若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。通常发生在消息的生产远大于消费速度时
      Pub/Sub 模式不适合做消息存储,消息积压类的业务,而是擅长处理广播,即时通讯,即时反馈的业务。
    3、基于SortedSet有序集合的实现
    ZADD KEY score member,压入集合
    ZRANGEBYSCORE,依据score获取成员
    

    有序集合的方案是在自己确定消息顺ID时比较常用,使用集合成员的Score来作为消息ID,保证顺序,还可以保证消息ID的单调递增。通常可以使用时间戳+序号的方案。确保了消息ID的单调递增,利用SortedSet的依据Score排序的特征,就可以制作一个有序的消息队列了。
    和上面的方案相比,优点就是可以自定义消息ID,在消息ID有意义时,比较重要。缺点也明显,不允许重复消息(以为是集合),同时消息ID确定有错误会导致消息的顺序出错。

    4、基于stream实现

    TBase还不支持该数据结构
    Redis5.0中发布的Stream类型,也用来实现典型的消息队列。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:

    • 消息ID的序列化生成
    • 消息遍历
    • 消息的阻塞和非阻塞读取
    • 消息的分组消费
    • 未完成消息的处理
    • 消息队列监控
      追加新消息,XADD,生产消息
      XADD,命令用于在某个stream(流数据)中追加消息,演示如下:
    127.0.0.1:6379> XADD memberMessage * user kang msg Hello
    "1553439850328-0"
    127.0.0.1:6379> XADD memberMessage * user zhong  msg nihao
    "1553439858868-0"
    

    语法格式为:

    XADD key ID field string [field string ...]
    

    需要提供key,消息ID方案,消息内容,其中消息内容为key-value型数据。 ID,最常使用*,表示由Redis生成消息ID,这也是强烈建议的方案。field string [field string]就是当前消息内容,由1个或多个key-value构成。
    上面的例子中,在memberMemsages这个key中追加了user kang msg Hello这个消息。Redis使用毫秒时间戳和序号生成了消息ID。此时,消息队列中就有一个消息可用了。

    从消息队列中获取消息,XREAD,消费消息
    XREAD,从Stream中读取消息,演示如下:

    127.0.0.1:6379> XREAD streams memberMessage 0
    1) 1) "memberMessage"
       2) 1) 1) "1553439850328-0"
             2) 1) "user"
                2) "kang"
                3) "msg"
                4) "Hello"
          2) 1) "1553439858868-0"
             2) 1) "user"
                2) "zhong"
                3) "msg"
                4) "nihao"
    

    语法格式为:

    XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
    
    • [COUNT count] 用于限定获取的消息数量
    • [BLOCK milliseconds] 用于设置XREAD为阻塞模式,默认为非阻塞模式
    • ID 用于设置由哪个消息ID开始读取。使用0表示从第一条消息开始。(本例中就是使用0)此处需要注意,消息队列ID是单调递增的,所以通过设置起点,可以向后读取。在阻塞模式中,可以使用,表示最新的消息ID。(在非阻塞模式下无意义)。
      XRED读消息时分为阻塞和非阻塞模式,使用BLOCK选项可以表示阻塞模式,需要设置阻塞时长。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。

      Pending 等待列表
      为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了Pending 列表,用于记录读取但并未处理完毕的消息。命令XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息。演示如下:
    127.0.0.1:6379> XPENDING mq mqGroup # mpGroup的Pending情况
    1) (integer) 5 # 5个已读取但未处理的消息
    2) "1553585533795-0" # 起始ID
    3) "1553585533795-4" # 结束ID
    4) 1) 1) "consumerA" # 消费者A有3个
          2) "3"
       2) 1) "consumerB" # 消费者B有1个
          2) "1"
       3) 1) "consumerC" # 消费者C有1个
          2) "1"
    ​
    127.0.0.1:6379> XPENDING mq mqGroup - + 10 # 使用 start end count 选项可以获取详细信息
    1) 1) "1553585533795-0" # 消息ID
       2) "consumerA" # 消费者
       3) (integer) 1654355 # 从读取到现在经历了1654355ms,IDLE
       4) (integer) 5 # 消息被读取了5次,delivery counter
    2) 1) "1553585533795-1"
       2) "consumerA"
       3) (integer) 1654355
       4) (integer) 4
    # 共5个,余下3个省略 ...
    ​
    127.0.0.1:6379> XPENDING mq mqGroup - + 10 consumerA # 在加上消费者参数,获取具体某个消费者的Pending列表
    1) 1) "1553585533795-0"
       2) "consumerA"
       3) (integer) 1641083
       4) (integer) 5
    # 共3个,余下2个省略 ...
    

    每个Pending的消息有4个属性:

    • 消息ID
    • 所属消费者
    • IDLE,已读取时长
    • delivery counter,消息被读取次数
      上面的结果我们可以看到,我们之前读取的消息,都被记录在Pending列表中,说明全部读到的消息都没有处理,仅仅是读取了。那如何表示消费者处理完毕了消息呢?使用命令XACK 完成告知消息处理完成,演示如下:
    127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # 通知消息处理结束,用消息ID标识
    (integer) 1
    ​
    127.0.0.1:6379> XPENDING mq mqGroup # 再次查看Pending列表
    1) (integer) 4 # 已读取但未处理的消息已经变为4个
    2) "1553585533795-1"
    3) "1553585533795-4"
    4) 1) 1) "consumerA" # 消费者A,还有2个消息处理
          2) "2"
       2) 1) "consumerB"
          2) "1"
       3) 1) "consumerC"
          2) "1"
    

    有了这样一个Pending机制,就意味着在某个消费者读取消息但未处理后,消息是不会丢失的。等待消费者再次上线后,可以读取该Pending列表,就可以继续处理该消息了,保证消息的有序和不丢失。
    此时还有一个问题,就是若某个消费者宕机之后,没有办法再上线了,那么就需要将该消费者Pending的消息,转义给其他的消费者处理,就是消息转移。

    消息转移
    消息转移的操作时将某个消息转移到自己的Pending列表中。使用语法XCLAIM来实现,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。演示如下:

    # 当前属于消费者A的消息1553585533795-1,已经15907,787ms未处理了
    127.0.0.1:6379> XPENDING mq mqGroup - + 10
    1) 1) "1553585533795-1"
       2) "consumerA"
       3) (integer) 15907787
       4) (integer) 4
    ​
    # 转移超过3600s的消息1553585533795-1到消费者B的Pending列表
    127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
    1) 1) "1553585533795-1"
       2) 1) "msg"
          2) "2"
    ​
    # 消息1553585533795-1已经转移到消费者B的Pending中。
    127.0.0.1:6379> XPENDING mq mqGroup - + 10
    1) 1) "1553585533795-1"
       2) "consumerB"
       3) (integer) 84404 # 注意IDLE,被重置了
       4) (integer) 5 # 注意,读取次数也累加了1次
    

    以上代码,完成了一次消息转移。转移除了要指定ID外,还需要指定IDLE,保证是长时间未处理的才被转移。被转移的消息的IDLE会被重置,用以保证不会被重复转移,以为可能会出现将过期的消息同时转移给多个消费者的并发操作,设置了IDLE,则可以避免后面的转移不会成功,因为IDLE不满足条件。例如下面的连续两条转移,第二条不会成功。

    127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
    127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1
    

    这就是消息转移。至此我们使用了一个Pending消息的ID,所属消费者和IDLE的属性,还有一个属性就是消息被读取次数,delivery counter,该属性的作用由于统计消息被读取的次数,包括被转移也算。这个属性主要用在判定是否为错误数据上。

    坏消息问题,Dead Letter,死信问题
    正如上面所说,如果某个消息,不能被消费者处理,也就是不能被XACK,这是要长时间处于Pending列表中,即使被反复的转移给各个消费者也是如此。此时该消息的delivery counter就会累加(上一节的例子可以看到),当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息),由于有了判定条件,我们将坏消息处理掉即可,删除即可。删除一个消息,使用XDEL语法,演示如下:

    # 删除队列中的消息
    127.0.0.1:6379> XDEL mq 1553585533795-1
    (integer) 1
    # 查看队列中再无此消息
    127.0.0.1:6379> XRANGE mq - +
    1) 1) "1553585533795-0"
       2) 1) "msg"
          2) "1"
    2) 1) "1553585533795-2"
       2) 1) "msg"
          2) "3"
    

    注意本例中,并没有删除Pending中的消息因此你查看Pending,消息还会在。可以执行XACK标识其处理完毕!

    信息监控,XINFO
    Stream提供了XINFO来实现对服务器信息的监控,可以查询、查看队列信息:

    127.0.0.1:6379> Xinfo stream mq
     1) "length"
     2) (integer) 7
     3) "radix-tree-keys"
     4) (integer) 1
     5) "radix-tree-nodes"
     6) (integer) 2
     7) "groups"
     8) (integer) 1
     9) "last-generated-id"
    10) "1553585533795-9"
    11) "first-entry"
    12) 1) "1553585533795-3"
        2) 1) "msg"
           2) "4"
    13) "last-entry"
    14) 1) "1553585533795-9"
        2) 1) "msg"
           2) "10"
    

    消费组信息:

    127.0.0.1:6379> Xinfo groups mq
    1) 1) "name"
       2) "mqGroup"
       3) "consumers"
       4) (integer) 3
       5) "pending"
       6) (integer) 3
       7) "last-delivered-id"
       8) "1553585533795-4"
    

    消费者组成员信息:

    127.0.0.1:6379> XINFO CONSUMERS mq mqGroup
    1) 1) "name"
       2) "consumerA"
       3) "pending"
       4) (integer) 1
       5) "idle"
       6) (integer) 18949894
    2) 1) "name"
       2) "consumerB"
       3) "pending"
       4) (integer) 1
       5) "idle"
       6) (integer) 3092719
    3) 1) "name"
       2) "consumerC"
       3) "pending"
       4) (integer) 1
       5) "idle"
       6) (integer) 23683256
    


    命令一览

    命令 说明
    XACK 结束Pending
    XADD 生成消息
    XCLAIM 消息转移
    XDEL 删除消息
    XGROUP 消费组管理
    XINFO 得到消费组信息
    XLEN 消息队列长度
    Pending列表 Pending列表
    XRANGE 获取消息队列中消息
    XREAD 消费消息
    XREADGROUP 分组消费消息
    XREVRANGE 逆序获取消息队列中消息
    XTRIM 消息队列容量

    Reference

    [1] 基于Redis实现消息队列典型方案
    [2] Stream 类型

    相关文章

      网友评论

          本文标题:Redis消息队列

          本文链接:https://www.haomeiwen.com/subject/wwiclktx.html