前言
stream是redis最复杂的一个数据结构, 也是redis5.0的一个重要更新. 有很多值得学习的点. 这里会做个小系列, 从基础使用到源码解析.
什么是stream
imagestream实际上是一个消息发布订阅功能组件, 也就是消息队列. 这样的数据结构其实很常见, 比如腾讯云的cmq, 看名字也知道是对标RabbitMQ的. 当然还有kafka等.
除了更好的性能和内存利用率, 它最大的特性是可以获取历史消息, 可以持久化消息.
它主要由消息、生产者、消费者、消费组4部分组成. 这里消费组可能让人有些困惑, 其实就是消费组里面有多个消费者, 他们互相竞争, 当一个消费了某条消息, 消息会被放入待确认队列, 消息队列的迭代器就会前移, 下一个同组消费者不管是谁, 都不会再次消费这个消息, 而是下一个消息.
创建
xadd用来创建, 每个stream有一个唯一key, *意味着让系统给你返回id, id是由unix时间和从0开始下标组成, 也就是这一毫秒的第几个条目. 你可以自己设定, 但是要确保严格单调递增. 后面就是键值对, 也就是消息本身.
xadd mystream * str1 hello str2 world
image你可以用xlen查看信息数, 也可以用xinfo stream查看stream信息.
删除消息
image通过xdel可以删除消息, 但是注意, 其实没有删除, 只是设置了标志位.
遍历
imagexrange可以范围遍历, -代表最前, +代表最后, 你可以将其换成某个id, 因为是严格递增的, 所以, 遍历结果是一样的.
删除
del mystream
独立消费
image前面说到了, 有消费者的概念, 那么它就可以消费消息队列的任何消息. 每读一次, 迭代器都会前进一次.
或者你可以阻塞读, 这里block后面跟毫秒, 如果是0就是一直阻塞.
消费组消费
image首先可以建组, 这里最后是id, 0就代表从最前面开始获取消息, 可以写成$, 意味着或许新消息.
然后起一个消费者的名字, 比如这里的alice, > 意味着, 只把没有分发给别人的消息发给我,这也是最常用的方式.
当然你可以起名别的, 来获取之后的消息, 但是注意消费者是竞争关系, 迭代器会一直向前.
网友评论