美文网首页
redis-stream

redis-stream

作者: zhangsanzhu | 来源:发表于2018-12-29 09:16 被阅读0次

    redis-stream

    原理

    • 底层维护了一个 radix tree,每个node存储了一个listpack 存储100~1000条msg,按节点删除比按msg删除效率高.
    • listpack是一块连续的内存block,用于序列化msg entry及相关信息,如msg i,用于节省内存是ziplist的升级版.
    • 如果xadd每次添加的<field,value>对中的field是一样的,那么field不会重复存储.
    • 对于简单的msg(十几~几十个字节),100mb的内存大约可以存储数百万个msg.
    • listpack在内存和disk上的存储结构是一样的,所以stream数据在做RDB的效率非常高,0.3s,500w entries
    • listpack中预留了delete flag,未来会支持从中间删除msg

    group

    • 每个group有唯一的名字,每个group内的consumer有唯一的名字
    • 按添加顺序想Consumer返回msg
    • 支持从历史位点读取msg
    • 不需要预先定义路由规则,即任意client中断后重新获取之前未ACK的msg(在客户端未处理)
    • 尽可能保证 "至少提交一次"语义,但是由于redis本身的一致性限制,可能会消息丢失
    • 也可能投递多次,所以业务端逻辑不是幂等的,需要doublecheck

    对比

    List,pub/sub,Zset redis Stream
    List不能高效的从中间获取成员,O(N) 可以,即使是亿级别的member,O(logN)
    List没有offset的概念,如果成员发生evict,无法确定最新的成员,也没办法指定到哪个成员进行消费 每个msg都有一个唯一的id,老的成员被淘汰,id不变
    Pub/sub不能持久化消息 可以保存在RDB和AOF中
    Pub/Sub没有consumergroup的概念 有,更贴近真是的业务场景(后面有介绍)
    Pub/Sub的性能和订阅某个频道的client的数量正相关 不存在
    Zset不能添加重复的成员,也不支持成员淘汰和block的操作,内存开销大 允许添加重复的成员,支持按时间线来淘汰历史数据,支持block操作,基于radix tree和listspack,内存开销低
    Zset需要支持删除任意元素 不支持从中间删除(log属性),more compact and memory efficent

    使用

    xadd

    // `*` 表示让redis自动生产新增消息的ID,同一个message中的id是递增的
    
    127.0.0.1:6379> xadd mystream * r1 v1
    "1545961771832-0"
    127.0.0.1:6379> xadd mystream * r2 v2
    "1545961777562-0"
    127.0.0.1:6379> xadd mystream * r2 v3
    "1545961797242-0"
    
    127.0.0.1:6379> MULTI
    OK
    127.0.0.1:6379> xadd mystream * r4 r4 
    QUEUED
    127.0.0.1:6379> xadd mystream * r5 r5
    QUEUED
    127.0.0.1:6379> exec
    1) "1545961881978-0"
    2) "1545961881978-1"
    127.0.0.1:6379>
    

    xrange

    // 这个就是获取一条
    127.0.0.1:6379> XRANGE mystream 1545961881978-0 1545961881978-0
    1) 1) "1545961881978-0"
       2) 1) "r4"
          2) "r4"
    // 这个就是获取id范围内的
    127.0.0.1:6379> XRANGE mystream 1545961881978-0 1545961881978-1
    1) 1) "1545961881978-0"
       2) 1) "r4"
          2) "r4"
    2) 1) "1545961881978-1"
       2) 1) "r5"
          2) "r5"
    127.0.0.1:6379> XRANGE mystream  1545961771832-0  1545961881978-1
    1) 1) "1545961771832-0"
       2) 1) "r1"
          2) "v1"
    2) 1) "1545961777562-0"
       2) 1) "r2"
          2) "v2"
    3) 1) "1545961797242-0"
       2) 1) "r2"
          2) "v3"
    4) 1) "1545961881978-0"
       2) 1) "r4"
          2) "r4"
    5) 1) "1545961881978-1"
       2) 1) "r5"
          2) "r5"
          
    // count 限定结果集合的大小,`-`表示最小ID,`+`表示最大的ID
    127.0.0.1:6379> xrange mystream - + count 2
    1) 1) "1545961771832-0"
       2) 1) "r1"
          2) "v1"
    2) 1) "1545961777562-0"
       2) 1) "r2"
          2) "v2"
    

    xread

    1. xrage 类似于lrange ,一次性获取
    2. xread,类似于blpop,可阻塞,流式获取
    27.0.0.1:6379> XADD mystream * msm ks
    "1545976502940-0"
    127.0.0.1:6379> XADD otherstream * msm ks
    "1545976589453-0"
    

    阻塞读

    127.0.0.1:6379> XREAD block 50000 streams mystream $
    
    1) 1) "mystream"
       2) 1) 1) "1545976502940-0"
             2) 1) "msm"
                2) "ks"
    (4.94s)
    
    127.0.0.1:6379> XREAD block 50000 streams mystream  otherstream  $ $
    1) 1) "otherstream"
       2) 1) 1) "1545976589453-0"
             2) 1) "msm"
                2) "ks"
    (3.18s)
    

    drop old messages

    没有专门的命令,可以在xadd的时候指定maxlen参数,每次执行命令都要加上

    xadd mystream maxlen 23  * max2 max2
    // 使用 `~` 表示按照节点删除
    xadd mystream maxlen ~ 100000  * max2 max2
    

    参考

    参考 1
    参考 2

    相关文章

      网友评论

          本文标题:redis-stream

          本文链接:https://www.haomeiwen.com/subject/eefulqtx.html