redis-stream
原理
- 底层维护了一个 radix tree,每个node存储了一个listpack 存储100~1000条msg,按节点删除比按msg删除效率高.
- listpack是一块连续的内存block,用于序列化msg entry及相关信息,如msg i,用于节省内存是ziplist的升级版.
- 如果
xadd
每次添加的<field,value>对中的field是一样的,那么field不会重复存储. - 对于简单的msg(十几~几十个字节),100mb的内存大约可以存储数百万个msg.
- listpack在内存和disk上的存储结构是一样的,所以stream数据在做RDB的效率非常高,0.3s,500w entries
- listpack中预留了delete flag,未来会支持从中间删除msg
group
- 每个group有唯一的名字,每个group内的consumer有唯一的名字
- 按添加顺序想Consumer返回msg
- 支持从历史位点读取msg
- 不需要预先定义路由规则,即任意client中断后重新获取之前未ACK的msg(在客户端未处理)
- 尽可能保证 "至少提交一次"语义,但是由于redis本身的一致性限制,可能会消息丢失
- 也可能投递多次,所以业务端逻辑不是幂等的,需要doublecheck
对比
List,pub/sub,Zset | redis Stream |
---|---|
List不能高效的从中间获取成员,O(N) | 可以,即使是亿级别的member,O(logN) |
List没有offset的概念,如果成员发生evict,无法确定最新的成员,也没办法指定到哪个成员进行消费 | 每个msg都有一个唯一的id,老的成员被淘汰,id不变 |
Pub/sub不能持久化消息 | 可以保存在RDB和AOF中 |
Pub/Sub没有consumergroup的概念 | 有,更贴近真是的业务场景(后面有介绍) |
Pub/Sub的性能和订阅某个频道的client的数量正相关 | 不存在 |
Zset不能添加重复的成员,也不支持成员淘汰和block的操作,内存开销大 | 允许添加重复的成员,支持按时间线来淘汰历史数据,支持block操作,基于radix tree和listspack,内存开销低 |
Zset需要支持删除任意元素 | 不支持从中间删除(log属性),more compact and memory efficent |
使用
xadd
// `*` 表示让redis自动生产新增消息的ID,同一个message中的id是递增的
127.0.0.1:6379> xadd mystream * r1 v1
"1545961771832-0"
127.0.0.1:6379> xadd mystream * r2 v2
"1545961777562-0"
127.0.0.1:6379> xadd mystream * r2 v3
"1545961797242-0"
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> xadd mystream * r4 r4
QUEUED
127.0.0.1:6379> xadd mystream * r5 r5
QUEUED
127.0.0.1:6379> exec
1) "1545961881978-0"
2) "1545961881978-1"
127.0.0.1:6379>
xrange
// 这个就是获取一条
127.0.0.1:6379> XRANGE mystream 1545961881978-0 1545961881978-0
1) 1) "1545961881978-0"
2) 1) "r4"
2) "r4"
// 这个就是获取id范围内的
127.0.0.1:6379> XRANGE mystream 1545961881978-0 1545961881978-1
1) 1) "1545961881978-0"
2) 1) "r4"
2) "r4"
2) 1) "1545961881978-1"
2) 1) "r5"
2) "r5"
127.0.0.1:6379> XRANGE mystream 1545961771832-0 1545961881978-1
1) 1) "1545961771832-0"
2) 1) "r1"
2) "v1"
2) 1) "1545961777562-0"
2) 1) "r2"
2) "v2"
3) 1) "1545961797242-0"
2) 1) "r2"
2) "v3"
4) 1) "1545961881978-0"
2) 1) "r4"
2) "r4"
5) 1) "1545961881978-1"
2) 1) "r5"
2) "r5"
// count 限定结果集合的大小,`-`表示最小ID,`+`表示最大的ID
127.0.0.1:6379> xrange mystream - + count 2
1) 1) "1545961771832-0"
2) 1) "r1"
2) "v1"
2) 1) "1545961777562-0"
2) 1) "r2"
2) "v2"
xread
- xrage 类似于lrange ,一次性获取
- xread,类似于blpop,可阻塞,流式获取
27.0.0.1:6379> XADD mystream * msm ks
"1545976502940-0"
127.0.0.1:6379> XADD otherstream * msm ks
"1545976589453-0"
阻塞读
127.0.0.1:6379> XREAD block 50000 streams mystream $
1) 1) "mystream"
2) 1) 1) "1545976502940-0"
2) 1) "msm"
2) "ks"
(4.94s)
127.0.0.1:6379> XREAD block 50000 streams mystream otherstream $ $
1) 1) "otherstream"
2) 1) 1) "1545976589453-0"
2) 1) "msm"
2) "ks"
(3.18s)
drop old messages
没有专门的命令,可以在xadd
的时候指定maxlen
参数,每次执行命令都要加上
xadd mystream maxlen 23 * max2 max2
// 使用 `~` 表示按照节点删除
xadd mystream maxlen ~ 100000 * max2 max2
网友评论