美文网首页
redis基础

redis基础

作者: 坤坤坤坤杨 | 来源:发表于2022-02-21 23:24 被阅读0次

    1.数据结构

    1.1 结构类型简介

    首先对redis来说,所有的key(键)都是字符串。我们在谈基础数据结构时,讨论的是存储值的数据类型,主要包括常见的5种数据类型,分别是:String、List、Set、Zset、Hash。

    image
    结构类型 结构存储的值 读写能力
    String字符串 字符串、整数、浮点数 对整个字符串或字符串的一部分进行操作;对整数或浮点数进行自增或自减操作;
    List列表 一个链表,链表上的每个节点都包含一个字符串 对链表的两端进行push和pop操作,读取单个或多个元素;根据值查找或删除元素;
    Set集合 包含字符串的无序集合 字符串的集合,包含基础的方法有看是否存在添加、获取、删除;还包含计算交集、并集、差集等;
    Hash散列 包含键值对的无序散列表 包含方法有添加、获取、删除单个元素;
    Zset有序集合 和散列一样,用于存储键值对 字符串成员与浮点数分数之间的有序映射;元素的排列顺序由分数的大小决定;包含方法有添加、获取、删除单个元素以及根据分值范围或成员来获取元素

    1.2 数据结构详解

    String字符串:

    String是redis中最基本的数据类型,一个key对应一个value。

    String类型是二进制安全的,意思是 redis 的 string 可以包含任何数据。如数字,字符串,jpg图片或者序列化的对象。

    命令使用:

    命令 简述 使用
    GET 获取存储在给定键中的值 GET name
    SET 设置存储在给定键中的值 SET name value
    DEL 删除存储在给定键中的值 DEL name
    INCR 将键存储的值加1 INCR key
    DECR 将键存储的值减1 DECR key
    INCRBY 将键存储的值加上整数 INCRBY key amount
    DECRBY 将键存储的值减去整数 DECRBY key amount

    实战场景:

    1. 缓存: 经典使用场景,把常用信息,字符串,图片或者视频等信息放到redis中,redis作为缓存层,mysql做持久化层,降低mysql的读写压力。

    2. 计数器:redis是单线程模型,一个命令执行完才会执行下一个,同时数据可以一步落地到其他的数据源。

    3. session:常见方案spring session + redis实现session共享

    List列表:

    Redis中的List其实就是链表(Redis用双端链表实现List)。

    使用List结构,我们可以轻松地实现最新消息排队功能(比如新浪微博的TimeLine)。List的另一个应用就是消息队列,可以利用List的 PUSH 操作,将任务存放在List中,然后工作线程再用 POP 操作将任务取出进行执行,列表中的元素可以重复出现。

    命令使用:

    命令 简述 使用
    RPUSH 将给定值推入到列表右端 RPUSH key value
    LPUSH 将给定值推入到列表左端 LPUSH key value
    RPOP 从列表的右端弹出一个值,并返回被弹出的值 RPOP key
    LPOP 从列表的左端弹出一个值,并返回被弹出的值 LPOP key
    LRANGE 获取列表在给定范围上的所有值 LRANGE key 0 -1
    LINDEX 通过索引获取列表中的元素。你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推。 LINDEX key index

    使用列表的技巧:

    • lpush+lpop=Stack(栈)

    • lpush+rpop=Queue(队列)

    • lpush+ltrim=Capped Collection(有限集合)

    • lpush+brpop=Message Queue(消息队列)

    实战场景:

    • 微博TimeLine:有人发博微博,用lpush加入时间轴,展示新的列表信息。

    • 消息队列

    Set集合:

    Redis 的 Set 是 String 类型的无序集合。集合成员是唯一的,这就意味着集合中不能出现重复的数据。

    Redis 中集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是 O(1)。

    命令使用:

    命令 简述 使用
    SADD 向集合添加一个或多个成员 SADD key value
    SCARD 获取集合的成员数量 SCARD key
    SMEMBER 返回集合中的所有成员 SMEMBER key member
    SISMEMBER 判断member元素是否是集合的成员 SISMEMBER key member

    实战场景:

    • 标签:给用户添加标签,或者给用户消息添加标签,这样有同一标签的可以给推荐关注的事或者关注的人。

    • 点赞,收藏都可以放到set中实现。

    Hash散列:

    Redis hash 是一个 string 类型的 field(字段) 和 value(值) 的映射表,hash 特别适合用于存储对象。

    命令使用:

    命令 简述 使用
    HSET 添加键值对 HSET hash-key sub-key1 value1
    HGET 获取指定散列键的值 HGET hash-key key1
    HGETALL 获取散列中包含的所有键值对 HGETALL hash-key
    HDEL 如果给定键存在于散列中,那么就移除这个键 HDEL hash-key sub-key1

    实战场景:

    • 缓存:直观,相比String更节省空间的维护缓存信息,如用户信息、视频信息等。

    Zset有序集合:

    Redis 有序集合和集合一样也是 string 类型元素的集合,且不允许重复的成员。不同的是每个元素都会关联一个 double 类型的分数。redis 正是通过分数来为集合中的成员进行从小到大的排序。

    有序集合的成员是唯一的,但分数(score)却可以重复。集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是 O(1)。

    image

    命令使用:

    命令 简述 使用
    ZADD 将一个带有指定分值的成员添加到有序集合member中 ZADD key 100 member
    ZEANGE 根据元素在有序集合中所处的位置,从有序集合中获取多个元素 ZRANGE key 0-1 withccores
    ZREM 如果给定元素成员存在于有序集合中,那么就移除这个元素 ZREM key member1
    ZSCORE 获取有序集合中指定key的分值 ZSCORE member key

    实战场景:

    • 排行榜:有序集合的经典使用场景。例如小说视频等网站需要对用户上传的小说视频做排行榜,榜单可以按照用户关注数,更新时间,字数等打分,做排行。

    2. Stream

    Redis5.0 中还增加了一个数据结构Stream,从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。

    基于Reids的消息队列实现有很多种,例如:

    PUB/SUB,订阅/发布模式

    • 但是发布订阅模式是无法持久化的,如果出现网络断开、Redis 宕机等,消息就会被丢弃;

    基于List LPUSH+BRPOP或者基于Sorted-Set

    • 支持了持久化,但是不支持多播,分组消费等

    上面的结构无法满足需求,那么如果我们期望设计一种数据结构来实现消息队列,最重要的就是要理解设计一个消息队列需要考虑什么? 初步可以想到:

    • 消息的产生

    • 消息的消费

      • 单播和多播(多对多)

      • 阻塞和非阻塞读取

    • 消息有序性

    • 消息持久化

    其他,借助美团技术团队的一篇文章, 消息队列设计精要 (opens new window)中的图

    image

    2.1 详解

    从以下几个大方向理解比较合适:

    • Stream的结构设计

    • 生产和消费

      • 基本的增删改查

      • 单一消费者的消费

      • 消费者组的消费

    • 监控状态

    Stream的结构

    每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

    image
    • Consumer Group : 消费组,使用XGROUP CREATE命令创建,一个消费组有多个消费者(Consumer),这些消费者之间是竞争关系。

    • last_delivered_id :游标,每个消费组都会有一个游标,任意一个消费者读取了消息都会使游标往前移动。

    • pending_ids :消费者的状态变量,作用是维护消费者的未确认的id。pending_ids记录了当前已经被客户端读取的消息,但是还没有ack(确认字符)。如果客户端没有ack,这个变量里面的消息id会越来越多,一旦某个消息被ack,他就开始减少,它用来确保客户端至少消费了消息一次。而不会在网络传输的中途丢失了没处理。

    消息id:消息ID的形式是 timestampInMillis-sequence,例如1527846880572-5 ,他表示当前的消息在毫秒时间戳1527846880572产生,并且是该毫秒内地第五条消息。消息id可以有服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且后面加入的消息的id要大于前面的消息id。

    增删改查:

    • XADD - 添加消息到末尾

    • XTRIM - 对流进行修剪,限制长度

    • XDEL - 删除消息

    • XLEN - 获取流包含的元素数量,即消息长度

    • XRANGE - 获取消息列表,会自动过滤已经删除的消息

    • XREVRANGE - 反向获取消息列表,ID 从大到小

    • XREAD - 以阻塞或非阻塞方式获取消息列表

    独立消费:

    我们可以在不定义消费组的情况下进行Stream消息的独立消费,当Stream没有新消息时,甚至可以阻塞等待。Redis设计一个了单独的消费指令xread,可以将Stream当成普通的消息队列(list)来使用。使用xread时,我们可以完全忽略消费组的存在,就好比一个Stream是一个普通的列表(list)。

    # 从Stream头部读取两条消息
    127.0.0.1:6379> xread count 2 streams codehole 0-0
    1) 1) "codehole"
       2) 1) 1) 1527851486781-0
             2) 1) "name"
                2) "laoqian"
                3) "age"
                4) "30"
          2) 1) 1527851493405-0
             2) 1) "name"
                2) "yurui"
                3) "age"
                4) "29"
    # 从Stream尾部读取一条消息,毫无疑问,这里不会返回任何消息
    127.0.0.1:6379> xread count 1 streams codehole $
    (nil)
    # 从尾部阻塞等待新消息到来,下面的指令会堵住,直到新消息到来
    127.0.0.1:6379> xread block 0 count 1 streams codehole $
    # 我们从新打开一个窗口,在这个窗口往Stream里塞消息
    127.0.0.1:6379> xadd codehole * name youming age 60
    1527852774092-0
    # 再切换到前面的窗口,我们可以看到阻塞解除了,返回了新的消息内容
    # 而且还显示了一个等待时间,这里我们等待了93s
    127.0.0.1:6379> xread block 0 count 1 streams codehole $
    1) 1) "codehole"
       2) 1) 1) 1527852774092-0
             2) 1) "name"
                2) "youming"
                3) "age"
                4) "60"
    (93.11s)
    

    客户端如果想要使用xread进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息ID。下次继续调用xread时,将上次返回的最后一个消息ID作为参数传递进去,就可以继续消费后续的消息。

    block 0表示永远阻塞,直到消息到来,block 1000表示阻塞1s,如果1s内没有任何消息到来,就返回nil

    消费组消费:

    image

    命令

    • XGROUP CREATE - 创建消费者组 需要传递起始消息ID参数用来初始化last_delivered_id变量。

    • XREADGROUP GROUP - 读取消费者组中的消息

    • XACK - 将消息标记为"已处理"

    • XGROUP SETID - 为消费者组设置新的最后递送消息ID

    • XGROUP DELCONSUMER - 删除消费者

    • XGROUP DESTROY - 删除消费者组

    • XPENDING - 显示待处理消息的相关信息

    • XCLAIM - 转移消息的归属权

    • XINFO - 查看流和消费者组的相关信息

    • XINFO GROUPS - 打印消费者组的信息

    • XINFO STREAM - 打印流信息

    创建消费组:

    127.0.0.1:6379> xgroup create codehole cg1 0-0  #  表示从头开始消费
    OK
    # $表示从尾部开始消费,只接受新消息,当前Stream消息会全部忽略
    127.0.0.1:6379> xgroup create codehole cg2 $
    OK
    127.0.0.1:6379> xinfo stream codehole  # 获取Stream信息
     1) length
     2) (integer) 3  # 共3个消息
     3) radix-tree-keys
     4) (integer) 1
     5) radix-tree-nodes
     6) (integer) 2
     7) groups
     8) (integer) 2  # 两个消费组
     9) first-entry  # 第一个消息
    10) 1) 1527851486781-0
     2) 1) "name"
     2) "laoqian"
     3) "age"
     4) "30"
    11) last-entry  # 最后一个消息
    12) 1) 1527851498956-0
     2) 1) "name"
     2) "xiaoqian"
     3) "age"
     4) "1"
    127.0.0.1:6379> xinfo groups codehole  # 获取Stream的消费组信息
    1) 1) name
     2) "cg1"
     3) consumers
     4) (integer) 0  # 该消费组还没有消费者
     5) pending
     6) (integer) 0  # 该消费组没有正在处理的消息
    2) 1) name
     2) "cg2"
     3) consumers  # 该消费组还没有消费者
     4) (integer) 0
     5) pending
     6) (integer) 0  # 该消费组没有正在处理的消息
    

    消费组消费:

    Stream提供了xreadgroup指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息ID。它同xread一样,也可以阻塞等待新消息。读到新消息后,对应的消息ID就会进入消费者的PEL(正在处理的消息)结构里,客户端处理完毕后使用xack指令通知服务器,本条消息已经处理完毕,该消息ID就会从PEL中移除。

    # >号表示从当前消费组的last_delivered_id后面开始读
    # 每当消费者读取一条消息,last_delivered_id变量就会前进
    127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
    1) 1) "codehole"
     2) 1) 1) 1527851486781-0
     2) 1) "name"
     2) "laoqian"
     3) "age"
     4) "30"
    127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
    1) 1) "codehole"
     2) 1) 1) 1527851493405-0
     2) 1) "name"
     2) "yurui"
     3) "age"
     4) "29"
    127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 2 streams codehole >
    1) 1) "codehole"
     2) 1) 1) 1527851498956-0
     2) 1) "name"
     2) "xiaoqian"
     3) "age"
     4) "1"
     2) 1) 1527852774092-0
     2) 1) "name"
     2) "youming"
     3) "age"
     4) "60"
    # 再继续读取,就没有新消息了
    127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
    (nil)
    # 那就阻塞等待吧
    127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
    # 开启另一个窗口,往里塞消息
    127.0.0.1:6379> xadd codehole * name lanying age 61
    1527854062442-0
    # 回到前一个窗口,发现阻塞解除,收到新消息了
    127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
    1) 1) "codehole"
     2) 1) 1) 1527854062442-0
     2) 1) "name"
     2) "lanying"
     3) "age"
     4) "61"
    (36.54s)
    127.0.0.1:6379> xinfo groups codehole  # 观察消费组信息
    1) 1) name
     2) "cg1"
     3) consumers
     4) (integer) 1  # 一个消费者
     5) pending
     6) (integer) 5  # 共5条正在处理的信息还有没有ack
    2) 1) name
     2) "cg2"
     3) consumers
     4) (integer) 0  # 消费组cg2没有任何变化,因为前面我们一直在操纵cg1
     5) pending
     6) (integer) 0
    # 如果同一个消费组有多个消费者,我们可以通过xinfo consumers指令观察每个消费者的状态
    127.0.0.1:6379> xinfo consumers codehole cg1  # 目前还有1个消费者
    1) 1) name
     2) "c1"
     3) pending
     4) (integer) 5  # 共5条待处理消息
     5) idle
     6) (integer) 418715  # 空闲了多长时间ms没有读取消息了
    # 接下来我们ack一条消息
    127.0.0.1:6379> xack codehole cg1 1527851486781-0
    (integer) 1
    127.0.0.1:6379> xinfo consumers codehole cg1
    1) 1) name
     2) "c1"
     3) pending
     4) (integer) 4  # 变成了5条
     5) idle
     6) (integer) 668504
    # 下面ack所有消息
    127.0.0.1:6379> xack codehole cg1 1527851493405-0 1527851498956-0 1527852774092-0 1527854062442-0
    (integer) 4
    127.0.0.1:6379> xinfo consumers codehole cg1
    1) 1) name
     2) "c1"
     3) pending
     4) (integer) 0  # pel空了
     5) idle
     6) (integer) 745505
    

    信息监控:

    查看队列信息

    127.0.0.1:6379> Xinfo stream mq
     1) "length"
     2) (integer) 7
     3) "radix-tree-keys"
     4) (integer) 1
     5) "radix-tree-nodes"
     6) (integer) 2
     7) "groups"
     8) (integer) 1
     9) "last-generated-id"
    10) "1553585533795-9"
    11) "first-entry"
    12) 1) "1553585533795-3"
     2) 1) "msg"
     2) "4"
    13) "last-entry"
    14) 1) "1553585533795-9"
     2) 1) "msg"
     2) "10"
    

    消费组信息

    127.0.0.1:6379> Xinfo groups mq
    1) 1) "name"
     2) "mqGroup"
     3) "consumers"
     4) (integer) 3
     5) "pending"
     6) (integer) 3
     7) "last-delivered-id"
     8) "1553585533795-4"
    

    消费组成员信息

    127.0.0.1:6379> XINFO CONSUMERS mq mqGroup
    1) 1) "name"
     2) "consumerA"
     3) "pending"
     4) (integer) 1
     5) "idle"
     6) (integer) 18949894
    2) 1) "name"
     2) "consumerB"
     3) "pending"
     4) (integer) 1
     5) "idle"
     6) (integer) 3092719
    3) 1) "name"
     2) "consumerC"
     3) "pending"
     4) (integer) 1
     5) "idle"
     6) (integer) 23683256
    

    2.2 深入理解

    Stream用在什么场景

    可用作实时通信等,大数据分析,异地数据备份等

    image

    客户端可以平滑扩展,提高处理能力

    image

    消息ID的设计是否考虑了时间回拨的问题?

    XADD生成的1553439850328-0,就是Redis生成的消息ID,由两部分组成:时间戳-序号。时间戳是毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型(int64)。序号是在这个毫秒时间点内的消息序号,它也是个64位整型。

    可以通过multi批处理,来验证序号的递增

    127.0.0.1:6379> MULTI
    OK
    127.0.0.1:6379> XADD memberMessage * msg one
    QUEUED
    127.0.0.1:6379> XADD memberMessage * msg two
    QUEUED
    127.0.0.1:6379> XADD memberMessage * msg three
    QUEUED
    127.0.0.1:6379> XADD memberMessage * msg four
    QUEUED
    127.0.0.1:6379> XADD memberMessage * msg five
    QUEUED
    127.0.0.1:6379> EXEC
    1) "1553441006884-0"
    2) "1553441006884-1"
    3) "1553441006884-2"
    4) "1553441006884-3"
    5) "1553441006884-4"
    

    由于一个redis命令的执行很快,所以可以看到在同一时间戳内,是通过序号递增来表示消息的。

    为了保证消息是有序的,因此Redis生成的ID是单调递增有序的。由于ID中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis的每个Stream类型数据都维护一个latest_generated_id属性,用于记录最后一个消息的ID。若发现当前时间戳退后(小于latest_generated_id所记录的),则采用时间戳不变而序号递增的方案来作为新消息ID(这也是序号为什么使用int64的原因,保证有足够多的的序号),从而保证ID的单调递增性质。

    强烈建议使用Redis的方案生成消息ID,因为这种时间戳+序号的单调递增的ID方案,几乎可以满足你全部的需求。但同时,记住ID是支持自定义的。

    消费者崩溃会不会有消息丢失问题?

    为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息。演示如下:

    127.0.0.1:6379> XPENDING mq mqGroup # mpGroup的Pending情况
    1) (integer) 5 # 5个已读取但未处理的消息
    2) "1553585533795-0" # 起始ID
    3) "1553585533795-4" # 结束ID
    4) 1) 1) "consumerA" # 消费者A有3个
     2) "3"
     2) 1) "consumerB" # 消费者B有1个
     2) "1"
     3) 1) "consumerC" # 消费者C有1个
     2) "1"
    
    127.0.0.1:6379> XPENDING mq mqGroup - + 10 # 使用 start end count 选项可以获取详细信息
    1) 1) "1553585533795-0" # 消息ID
     2) "consumerA" # 消费者
     3) (integer) 1654355 # 从读取到现在经历了1654355ms,IDLE
     4) (integer) 5 # 消息被读取了5次,delivery counter
    2) 1) "1553585533795-1"
     2) "consumerA"
     3) (integer) 1654355
     4) (integer) 4
    # 共5个,余下3个省略 ...
    
    127.0.0.1:6379> XPENDING mq mqGroup - + 10 consumerA # 在加上消费者参数,获取具体某个消费者的Pending列表
    1) 1) "1553585533795-0"
     2) "consumerA"
     3) (integer) 1641083
     4) (integer) 5
    # 共3个,余下2个省略 ...
    

    每个Pending的消息有4个属性:

    • 消息ID

    • 所属消费者

    • IDLE,已读取时长

    • delivery counter,消息被读取次数

    上面的结果我们可以看到,我们之前读取的消息,都被记录在Pending列表中,说明全部读到的消息都没有处理,仅仅是读取了。那如何表示消费者处理完毕了消息呢?使用命令 XACK 完成告知消息处理完成,演示如下:

    127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # 通知消息处理结束,用消息ID标识
    (integer) 1
    
    127.0.0.1:6379> XPENDING mq mqGroup # 再次查看Pending列表
    1) (integer) 4 # 已读取但未处理的消息已经变为4个
    2) "1553585533795-1"
    3) "1553585533795-4"
    4) 1) 1) "consumerA" # 消费者A,还有2个消息处理
     2) "2"
     2) 1) "consumerB"
     2) "1"
     3) 1) "consumerC"
     2) "1"
    127.0.0.1:6379>
    

    有了这样一个Pending机制,就意味着在某个消费者读取消息但未处理后,消息是不会丢失的。等待消费者再次上线后,可以读取该Pending列表,就可以继续处理该消息了,保证消息的有序和不丢失

    消费者彻底宕机后如何转移给其他消费者处理?

    消息转移的操作时将某个消息转移到自己的Pending列表中。使用语法XCLAIM来实现,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。演示如下:

    # 当前属于消费者A的消息1553585533795-1,已经15907,787ms未处理了
    127.0.0.1:6379> XPENDING mq mqGroup - + 10
    1) 1) "1553585533795-1"
     2) "consumerA"
     3) (integer) 15907787
     4) (integer) 4
    
    # 转移超过3600s的消息1553585533795-1到消费者B的Pending列表
    127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
    1) 1) "1553585533795-1"
     2) 1) "msg"
     2) "2"
    
    # 消息1553585533795-1已经转移到消费者B的Pending中。
    127.0.0.1:6379> XPENDING mq mqGroup - + 10
    1) 1) "1553585533795-1"
     2) "consumerB"
     3) (integer) 84404 # 注意IDLE,被重置了
     4) (integer) 5 # 注意,读取次数也累加了1次
    

    以上代码,完成了一次消息转移。转移除了要指定ID外,还需要指定IDLE,保证是长时间未处理的才被转移。被转移的消息的IDLE会被重置,用以保证不会被重复转移,以为可能会出现将过期的消息同时转移给多个消费者的并发操作,设置了IDLE,则可以避免后面的转移不会成功,因为IDLE不满足条件。例如下面的连续两条转移,第二条不会成功。

    127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
    127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1
    

    这就是消息转移。至此我们使用了一个Pending消息的ID,所属消费者和IDLE的属性,还有一个属性就是消息被读取次数,delivery counter,该属性的作用由于统计消息被读取的次数,包括被转移也算。这个属性主要用在判定是否为错误数据上。

    坏消息问题,Dead Letter、死信问题

    如果某个消息,不能被消费者处理,也就是不能被XACK,这是要长时间处于Pending列表中,即使被反复的转移给各个消费者也是如此。此时该消息的delivery counter就会累加(上一节的例子可以看到),当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息),由于有了判定条件,我们将坏消息处理掉即可,删除即可。删除一个消息,使用XDEL语法,演示如下:

    # 删除队列中的消息
    127.0.0.1:6379> XDEL mq 1553585533795-1
    (integer) 1
    # 查看队列中再无此消息
    127.0.0.1:6379> XRANGE mq - +
    1) 1) "1553585533795-0"
     2) 1) "msg"
     2) "1"
    2) 1) "1553585533795-2"
     2) 1) "msg"
     2) "3"
    

    注意本例中,并没有删除Pending中的消息因此你查看Pending,消息还会在。可以执行XACK标识其处理完毕!

    3. 对象机制

    image

    Redis的每种对象其实都由对象结构(redisObject)对应编码的数据结构组合而成,而每种对象类型对应若干编码方式,不同的编码方式所对应的底层数据结构是不同的。

    所以,我们需要从几个个角度来着手底层研究:

    • 对象设计机制: 对象结构(redisObject)

    • 编码类型和底层数据结构: 对应编码的数据结构

    3.1 为什么会设计RedisObject对象

    在redis的命令中,用于对键进行处理的命令占了很大一部分,而对于键所保存的值的类型(键的类型),键能执行的命令又各不相同。如: LPUSHLLEN 只能用于列表键, 而 SADDSRANDMEMBER 只能用于集合键, 等等; 另外一些命令, 比如 DELTTLTYPE, 可以用于任何类型的键;但是要正确实现这些命令, 必须为不同类型的键设置不同的处理方式: 比如说, 删除一个列表键和删除一个字符串键的操作过程就不太一样。

    以上的描述说明, Redis 必须让每个键都带有类型信息, 使得程序可以检查键的类型, 并为它选择合适的处理方式.

    比如说, 集合类型就可以由字典和整数集合两种不同的数据结构实现, 但是, 当用户执行 ZADD 命令时, 他/她应该不必关心集合使用的是什么编码, 只要 Redis 能按照 ZADD 命令的指示, 将新元素添加到集合就可以了。

    这说明, 操作数据类型的命令除了要对键的类型进行检查之外, 还需要根据数据类型的不同编码进行多态处理.

    为了解决以上问题, Redis 构建了自己的类型系统, 这个系统的主要功能包括:

    • redisObject 对象.

    • 基于 redisObject 对象的类型检查.

    • 基于 redisObject 对象的显式多态函数.

    • 对 redisObject 进行分配、共享和销毁的机制

    3.2 redisObject数据结构

    /*
     * Redis 对象
     */
    typedef struct redisObject {
    
     // 类型
     unsigned type:4;
    
     // 编码方式
     unsigned encoding:4;
    
     // LRU - 24位, 记录最末一次访问时间(相对于lru_clock); 或者 LFU(最少使用的数据:8位频率,16位访问时间)
     unsigned lru:LRU_BITS; // LRU_BITS: 24
    
     // 引用计数
     int refcount;
    
     // 指向底层数据结构实例
     void *ptr;
    
    } robj;
    
    image

    其中type、encoding和ptr是最重要的三个属性

    • type记录了对象所保存的值的类型,它的值可能是以下常量中的一个:
    /*
    * 对象类型
    */
    #define OBJ_STRING 0 // 字符串
    #define OBJ_LIST 1 // 列表
    #define OBJ_SET 2 // 集合
    #define OBJ_ZSET 3 // 有序集
    #define OBJ_HASH 4 // 哈希表
    
    • encoding记录了对象所保存的值的编码,它的值可能是以下常量中的一个 :
    /*
    * 对象编码
    */
    #define OBJ_ENCODING_RAW 0 /* Raw representation */
    #define OBJ_ENCODING_INT 1 /* Encoded as integer */
    #define OBJ_ENCODING_HT 2      /* Encoded as hash table */
    #define OBJ_ENCODING_ZIPMAP 3  /* 注意:版本2.6后不再使用. */
    #define OBJ_ENCODING_LINKEDLIST 4 /* 注意:不再使用了,旧版本2.x中String的底层之一. */
    #define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
    #define OBJ_ENCODING_INTSET 6  /* Encoded as intset */
    #define OBJ_ENCODING_SKIPLIST 7  /* Encoded as skiplist */
    #define OBJ_ENCODING_EMBSTR 8  /* Embedded sds string encoding */
    #define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
    #define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */
    
    • ptr是一个指针,指向实际保存值的数据结构,这个数据结构由type和encoding属性决定。举个例子, 如果一个redisObject 的type 属性为OBJ_LIST , encoding 属性为OBJ_ENCODING_QUICKLIST ,那么这个对象就是一个Redis 列表(List),它的值保存在一个QuickList的数据结构内,而ptr 指针就指向quicklist的对象

    • lru属性: 记录了对象最后一次被命令程序访问的时间

    空转时长:当前时间减去键的值对象的lru时间,就是该键的空转时长。Object idletime命令可以打印出给定键的空转时长

    如果服务器打开了maxmemory选项,并且服务器用于回收内存的算法为volatile-lru或者allkeys-lru,那么当服务器占用的内存数超过了maxmemory选项所设置的上限值时,空转时长较高的那部分键会优先被服务器释放,从而回收内存。

    3.3 命令的类型检查和多态

    当执行一个处理数据类型命令的时候,redis执行以下步骤

    • 根据给定的key,在数据库字典中查找和他相对应的redisObject,如果没找到,就返回NULL;

    • 检查redisObject的type属性和执行命令所需的类型是否相符,如果不相符,返回类型错误;

    • 根据redisObject的encoding属性所指定的编码,选择合适的操作函数来处理底层的数据结构;

    • 返回数据结构的操作结果作为命令的返回值。

    image

    3.4 对象共享

    redis一般会把一些常见的值放到一个共享对象中,这样可使程序避免了重复分配的麻烦,也节约了一些CPU时间。

    redis预分配的值对象如下

    • 各种命令的返回值,比如成功时返回的OK,错误时返回的ERROR,命令入队事务时返回的QUEUE,等等

    • 包括0 在内,小于REDIS_SHARED_INTEGERS的所有整数(REDIS_SHARED_INTEGERS的默认值是10000)

    image

    注意:共享对象只能被字典和双向链表这类能带有指针的数据结构使用。像整数集合和压缩列表这些只能保存字符串、整数等自勉之的内存数据结构

    为什么redis不共享列表对象、哈希对象、集合对象、有序集合对象,只共享字符串对象

    • 列表对象、哈希对象、集合对象、有序集合对象,本身可以包含字符串对象,复杂度较高。

    • 如果共享对象是保存字符串对象,那么验证操作的复杂度为O(1)

    • 如果共享对象是保存字符串值的字符串对象,那么验证操作的复杂度为O(N)

    • 如果共享对象是包含多个值的对象,其中值本身又是字符串对象,即其它对象中嵌套了字符串对象,比如列表对象、哈希对象,那么验证操作的复杂度将会是O(N的平方)

    如果对复杂度较高的对象创建共享对象,需要消耗很大的CPU,用这种消耗去换取内存空间,是不合适的

    3.5 引用计数以及对象的销毁

    redisObject中有refcount属性,是对象的引用计数,显然计数0那么就是可以回收。

    • 每个redisObject结构都带有一个refcount属性,指示这个对象被引用了多少次;

    • 当新创建一个对象时,它的refcount属性被设置为1;

    • 当对一个对象进行共享时,redis将这个对象的refcount加一;

    • 当使用完一个对象后,或者消除对一个对象的引用之后,程序将对象的refcount减一;

    • 当对象的refcount降至0 时,这个RedisObject结构,以及它引用的数据结构的内存都会被释放。

    小结

    • redis使用自己实现的对象机制(redisObject)来实现类型判断、命令多态和基于引用次数的垃圾回收;

    • redis会预分配一些常用的数据对象,并通过共享这些对象来减少内存占用,和避免频繁的为小对象分配内存;

    相关文章

      网友评论

          本文标题:redis基础

          本文链接:https://www.haomeiwen.com/subject/fgmokrtx.html