美文网首页我爱编程
读《Redis实战》有感

读《Redis实战》有感

作者: 我就是要皮 | 来源:发表于2017-08-02 21:04 被阅读0次

    一、redis的优势

    1、redis简介

    redis是速度非常快的非关系型数据库,是内存数据库,可以以key-value的形式存储String、set、zset、list、hash这5种数据结构,还能作为轻量级的消息队列(功能和rabbitmq等消息队列差不多,不过当数据很大时入队很慢,不推荐使用)。

    2、redis和其他数据库的对比

    名称 类型 数据存储选项 查询类型 附加功能
    redis 使用内存存储的非关系型数据库 字符串、列表、集合、散列表、有序集合 各种数据类型都有自己的专属命令,还有批量操作和不完全的事务支持 发布订阅、主从复制、持久化
    memcached 使用内存存储的键值缓存 键值之间的映射 创建命令、读取命令、更新命令、删除命令和其他几个命令 为提升性能而设的多线程服务器
    mysql 关系数据库 select、update、insert、delete、函数、存储过程 支持ACID、主从复制和主主复制

    3、使用redis的理由

    • redis的数据类型相比于memcached等内存缓存更加丰富,能解决遇到的大多数问题。
    • redis的效率和简单比关系型数据库要好得多
    • 对于大多数关系数据库来说,插入行是很快的(顺序写硬盘比随机写内存要快),但是更新很慢(随机写硬盘),而对redis来说更新很快

    二、redis命令

    1、字符串、列表、集合命令

    1.1 字符串

    incr key ## key的值++ 
    decr key ## key的值--
    incrby key incremental ## key的值加上整数incremental
    decrby key decrease ## key的值减去整数decrease
    incrbyfloat key float ## key的值加上浮点数float
    append key value ## key的值某位加上字符串value
    getrange key start end ## 截取key对应的字符串的子字符串(从start到end,含头含尾)
    setrange key offset value ## key对应的字符串从offset开始设置为给定的值
    getbit key offset ## 将key的值看成一个二进制位串,并返回串中偏移量为offset的二进制位值
    setbit key offset value ## 字符串中偏移量为offset的二进制位的值设置成value
    bitcount key [start,end] ##统计二进制字符串中值为1的二进制位的数量
    bitop operation dest-key key [key2...] ## 对一个或者多个二进制位串执行按位运算,并把的出来的结果放在dest-key中
    
    

    1.2 列表

    
    rpush key value ## 右边入列表
    lpush key value ## 左边入列表
    rpop key ## 移除并返回最右边的元素
    lpop key ## 移除并返回最左边的元素
    lindex key offset ## 返回列表中偏移量为offset的元素
    lrange key start end ## 返回列表中偏移量start到end范围内的所有元素
    ltrim key start end ## 只保留列表中偏移量从start到end范围内的元素(头尾都保留)
    blpop key [key2...] timeout ## 从第一个非空队列中弹出位于最左端的元素,或者在timeout秒之内阻塞并等待可弹出元素出现
    brpop key [key2...] timeout ## 从第一个非空队列中弹出位于最右端的元素,或者在timeout秒之内阻塞并等待可弹出元素出现
    rpoplpush source-key dest-key ## 从列表source-key最右端弹出,从列表dest-key最左端进入
    brpoplpush source-key dest-key timeout## 从列表source-key最右端弹出,从列表dest-key最左端进入,如果source-key为空,那么阻塞等timeout秒
    
    

    1.3 集合

    
    sadd key value [value2...] ## 向集合中加入一个或多个元素,返回被添加的个数
    srem key value [value2...] ## 删除集合中的一个或多个元素,返回被删除的个数
    sismember key value ## 返回该value是够存在于集合key中(布尔值)
    scard key ## 返回集合key中元素个数
    smembers key ## 返回集合中所有元素
    srandmember key [count] ## 从集合中随机返回一个或多个元素,当count为正数时,返回的元素不会重复;当count为负数时,返回的元素可能会重复
    spop key ## 随机返回并移除集合key中的一个元素
    smove source-key dest-key value ## 如果source-key中含有value,那么把value从source-key移到dest-key中并返回1;否则返回0
    sdiff key [key2...] ## 返回那些存在于第一个集合并不存在于其他集合中的元素
    sdiffstore dest-key key [key2] ## 将那些那些存在于第一个集合并不存在于其他集合中的元素放入集合des-key中
    sinter key [key2...] ## 返回那些同时存在于所有集合中的元素
    sinterstore dest-key key [key2...] 将那些同时存在于所有集合中的元素放入集合dest-key中
    sunion key [key2...] ## 返回那些至少存在于一个集合中的元素
    sunionstore dest-key key [key2...] ## 将那些至少存在于一个集合中的元素放入集合dest-key中
    
    

    2、散列、有序集合命令

    2.1 散列表

    
    hget field key ## 取得field下key对应的值
    hset field key value ## 将field下key的值设为value
    hdel field key ## 删除field下key对应的映射
    hlen field ## 返回散列表field下的键值对的个数
    hexists field key ## 返回field下key是否存在(布尔值)
    hkeys field ## 返回field下所有的键值对
    hvals field ## 返回散列field中所有的值
    hgetall field ## 返回散列field中所有的键值对
    hincrby field key increment ## 将键key存储的值加上整数increment
    hincrbyfloat field key increment ## 将键key存储的值加上浮点数increment
    
    

    2.2 有序集合

    
    zadd key score member [score2 member2...] ## 在有序集合key中加入member和它的分值
    zrem key member [member2...] ## 有序集合key中删除一个或多个成员
    zcard key ## 返回有序集合中成员个数
    zincrby key increment member ## 将有序集合key中成员member的分值加上increment
    zcount key min max ## 返回有序集合key中分值介于min和max之间的成员数量
    zrank key member ## 返回有序集合key中成员member的排名
    zscore key member ## 返回member的分值
    zrange key start end [withscore] ## 返回排名在start和end之间的member,有了withscore就分值一并返回
    zrevrank key member ## 返回有序集合中成员member的排名,从分值从大到小排序
    zrevrange key start end [withscore] ## 返回有序集合中给定排名start到end之间的元素,排名从分值从大到小排序
    zrangebyscore key min max [withscore] ## 返回有序集合中分值介于min和max的元素
    zrevrangescore key min max [withscore] ## 返回有序集合中分值介于min和max的元素,返回顺序是分值从大到小
    zremrangebyrank key start end ## 移除有序集合中排名从start到end的元素
    zremrangebyscore key min max ## 移除有序集合中分值从min到max的元素
    zinterstore dest-key key-count key [key2...] [weights weight weight2...] [aggregate sum|min|max] ##  取key,key2...之间的交集并放入到有序集合dest-key中,其中weight选项是给key、key2...的权重(可填可不填),
    zunionstore dest-key key-count key [key2...] ## 取key,key2...之间的并集并放入到有序集合dest-key中
    
    

    3、发布订阅命令

    3.1 介绍

    订阅者负责订阅频道,发送者负责向频道发送二进制字符串消息。每当有消息发送至给定频道时,频道的所有订阅者都会收到消息。

    3.2 命令

    subscribe channel [channel2...] ## 订阅一个或多个模式
    unsubscribe [channel...] ## 退订给定的一个或者多个频道
    publish channel message ## 向给定频道发送消息
    psubscribe pattern [pattern2...] ## 订阅与给定模式相匹配的频道
    punsubscribe [pattern...] ## 退订给定的模式,要是没给定模式则退订所有模式
    

    3.3 流程

    新消息发布(publish)到频道(channel):

    imageimage

    消息发送给订阅该频道的客户端:

    imageimage

    3.4 java实现发布订阅

    3.4.1 用Jedis发布订阅

    初始化bean:

    <!-- 连接池配置 -->
    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
        <property name="maxTotal" value="${redis.maxTotal}"></property>
        <property name="maxIdle" value="${redis.maxIdle}"></property>
        <property name="numTestsPerEvictionRun" value="${redis.numTestsPerEvictionRun}"></property>
        <property name="timeBetweenEvictionRunsMillis" value="${redis.timeBetweenEvictionRunsMillis}"></property>
        <property name="minEvictableIdleTimeMillis" value="${redis.minEvictableIdleTimeMillis}"></property>
        <property name="softMinEvictableIdleTimeMillis" value="${redis.softMinEvictableIdleTimeMillis}"></property>
        <property name="maxWaitMillis" value="${redis.maxWaitMillis}"></property>
        <property name="testOnBorrow" value="${redis.testOnBorrow}"></property>
        <property name="testWhileIdle" value="${redis.testWhileIdle}"></property>
        <property name="blockWhenExhausted" value="${redis.blockWhenExhausted}"></property>
    </bean>
    <!-- 初始化连接池 -->
    <bean id="jedisPool" class="redis.clients.jedis.JedisPool">
        <constructor-arg index="0" ref="jedisPoolConfig" />
        <constructor-arg index="1" value="${redis.host}" />
        <constructor-arg index="2" value="${redis.port}" />
        <constructor-arg index="3" value="3000" />
        <constructor-arg index="4" value="${redis.password}" />
    </bean>
    

    发布:

    /**
     * jedis池
     */
    @Autowired
    private JedisPool jedisPool;
    
    public String pub(GlobalDTO globalDTO, String channel, String message) throws BusiException {
        Jedis jedis = null;
        try {
            jedis = this.jedisPool.getResource();
            jedis.publish(channel, message);
        } catch (Exception e) {
            LOGGER.error("{}publish error 发布数据channel = {}, message = {}",
                    new Object[] { JSON.toJSONString(globalDTO), channel, message});
        } finally {
            this.jedisPool.returnResourceObject(jedis);
        }
        return null;
    }
    

    订阅:
    JedisPubSub类的重写:

    package com.awifi.vp.assist.utils.redis;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import com.alibaba.fastjson.JSONObject;
    import com.awifi.vp.assist.config.PropertiesUtil;
    import com.awifi.vp.assist.constant.Prefix;
    import com.awifi.vp.assist.http.HttpClient;
    import com.awifi.vp.assist.utils.EncryptUtil;
    
    import redis.clients.jedis.JedisPubSub;
    
    /**
     * @Description: redis订阅到信息后执行操作
     * @Title: Subscriber.java 
     * @Package: com.awifi.vp.assist.utils.redis 
     * @author ffn 
     * @date 2017年6月20日
     */
    public class Subscriber extends JedisPubSub {
        public Subscriber() {}
    
        /**
         * redis订阅到信息后执行
         * @param channel 主题
         * @param message 接收到的订阅信息
         * @author ffn 
         * @date 2017年6月20日
         */
        public void onMessage(String channel, String message) {
            //对接收到的消息进行处理
        }
    
        /**
         * 继承自JedisPubSub
         * @param channel 主题
         * @param subscribedChannels  ..
         */
        public void onSubscribe(String channel, int subscribedChannels) {
        }
    
        /**
         * 继承自JedisPubSub
         * @param channel 主题
         * @param subscribedChannels ..
         */
        public void onUnsubscribe(String channel, int subscribedChannels) {
        }
    }
    
    

    订阅线程:

    /**
     * @Description: redis订阅线程
     * @Title: SubThread.java 
     * @Package: com.awifi.vp.assist.utils.redis 
     * @author ffn 
     * @date 2017年6月20日
     */
    public class SubThread extends Thread {
        
        /**
         * redis连接池
         */
        private JedisPool jedisPool;
    
        /**
         * redis订阅操作类
         */
        private final Subscriber subscriber = new Subscriber();
    
        /**
         * 订阅主题
         */
        private String channel;
    
        public SubThread(String channel,JedisPool jedisPool) {
            this.jedisPool = jedisPool;
            this.channel = channel;
        }
    
        @Override
        public void run() {
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.subscribe(subscriber, channel);
            } catch (Exception e) {
                
            } finally {
                if (jedis != null) {
                    jedis.close();
                }
            }
        }
    }
    

    启动订阅线程:

    new SubThread(channel,jedisPool).start();
    

    3.4.2 使用监听器

    bean初始化:

    <!-- spring data redis -->
    <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
        <property name="usePool" value="true"></property>
        <property name="hostName" value="${redis.host}" />
        <property name="port" value="${redis.port}" />
        <property name="password" value="${redis.pass}" />
        <property name="timeout" value="${redis.timeout}" />
        <property name="database" value="${redis.default.db}"></property>
        <constructor-arg index="0" ref="jedisPoolConfig" />
    </bean>
    
    <!-- jedis pool配置 -->
    <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
        <property name="maxTotal" value="${redis.maxActive}" />
        <property name="maxIdle" value="${redis.maxIdle}" />
        <property name="maxWaitMillis" value="${redis.maxWait}" />
        <!--
        <property name="testOnBorrow" value="${redis.testOnBorrow}" />
        -->
    </bean>
    
    
    <!-- Bean Configuration -->
    <bean id="messageListener"
        class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
        <constructor-arg>
            <bean class="com.redis.MyMessageListener" />
        </constructor-arg>
    </bean>
    
    <bean id="redisContainer"
        class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
        <property name="connectionFactory" ref="jedisConnectionFactory" />
        <property name="messageListeners">
            <map>
                <entry key-ref="messageListener">
                    <list>
                        <bean class="org.springframework.data.redis.listener.ChannelTopic">
                            <constructor-arg value="springtv" />
                        </bean>
                        <bean class="org.springframework.data.redis.listener.PatternTopic">
                            <constructor-arg value="hello*" />
                        </bean>
                        <bean class="org.springframework.data.redis.listener.PatternTopic">
                            <constructor-arg value="tv*" />
                        </bean>
                    </list>
                </entry>
            </map>
        </property>
    </bean>
    
    package com.redis;
    
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    public class MyMessageListener implements MessageListener {
    
        @Override
        public void onMessage(Message message, byte[] pattern) {
            System.out.println("接收到的消息主题:" + message.getChannel()
                + ",接收到的消息为:" + message.getBody());
        }
    
    }
    
    

    3.5 小结

    • redis的发布订阅多用于实时性较高的消息推送,并不保证可靠。
    • 要是redis某个客户端订阅了某个频道,但是它读取的速度不够快,那么积压的消息可能会使缓存区越来越大,甚至把内存占满。
    • 传输的时候万一断线,那么客户端将会丢失断线期间所有的数据。

    4、其他命令

    4.1 排序

    sort source-key [by pattern] [limit offset count] [get pattern...] [asc|desc] [store dest-key] ## 根据给定的选项,对输入的列表、集合或者有序集合进行排序
    

    4.2 基本的redis事务

    4.2.1 介绍

    redis事务可以让redis客户端在不被其他客户端打断的情况下执行多个命令。和关系数据库中可以回滚的事务不同,redis事务会用multi和exec包围要执行的命令,当事务执行完之后,redis才会处理其他客户端的命令。

    4.2.2 示例

    /**
     * redis连接池
     */
    private JedisPool jedisPool;
    
    public void test(){
        Jedis jedis = this.jedisPool.getResource();
        Transaction tx = jedis.multi();
        tx.zadd("market:", "1");
        tx.exec();
    }
    

    4.3 键的过期时间

    persist key ## 移除键的过期时间
    ttl key ## 查看键的过期时间,单位是秒
    expire key seconds ## 让给定的键在指定秒数后过期
    expireat key timestamp ## 将给定的key的过期时间设置为unix时间戳,单位是秒
    pttl key ## 查看键的过期时间,单位是毫秒
    pexpire key millisseconds ## 让给定的键在指定毫秒数后过期
    pexpireat key timestamp ## 将给定的key的过期时间设置为unix时间戳,单位是毫秒
    

    三、数据安全和性能保障

    1.持久化

    1.1 介绍

    将redis数据存储到硬盘里。目的是为了在之后重用数据,或者是为了防止系统故障而将数据备份到一个远程位置。
    在redis-list写缓存队列时,持久化能保存队列中的信息,防止系统崩溃队列中的信息丢失(pub/sub会丢失部分数据),不过这种方式违背了redis持久化的初衷,所以不推荐使用。

    1.2 持久化选项(redis.conf中配置)

    快照持久化:

    save 60 1000 ## 60s内至少有10000个key有变化(可以配置多个,满足任何一个就会触发)
    stop-writes-on-bgsave-error yes/no ## 指定redis在后台dump磁盘出错时的行为,默认为yes,表示若后台dump出错,则RedisServer拒绝新的写入请求,通过这种方式来引起用户警觉,避免因用户未发现异常而引起更大的事故。
    rdbcompression yes/no ## RDB文件是否压缩存储,若为yes,会在压缩时消耗一点CPU,但省磁盘空间。
    dbfilename dump.rdb ## 指定RDB文件名,默认为dump.rdb
    

    AOF持久化:

    appendonly yes/no ## 配置是否启用AOF持久化,默认为no
    appendfsync no/always/everysec
    ## 配置aof文件的同步方式,Redis支持3种方式:
    ##  a. no => redis不主动调用fsync,何时刷盘由OS来调度;
    ##  b. always => redis针对每个写入命令均会主动调用fsync刷磁盘;
    ##  c. everysec => 每秒调一次fsync刷盘。
    no-appendfsync-on-rewrite  yes/no ##指定是否在后台aof文件rewrite期间调用fsync,默认为no,表示要调用fsync(无论后台是否有子进程在刷盘)
    auto-aof-rewrite-percentage 100 ## 指定Redis重写aof文件的条件,默认为100,表示与上次rewrite的aof文件大小相比,当前aof文件增长量超过上次afo文件大小的100%时,就会触发background rewrite。若配置为0,则会禁用自动rewrite。
    auto-aof-rewrite-min-size 64mb ##  指定触发rewrite的aof文件大小。若aof文件小于该值,即使当前文件的增量比例达到auto-aof-rewrite-percentage的配置值,也不会触发自动rewrite。即这两个配置项同时满足时,才会触发rewrite。
    

    共享选项:

    dir ./ ## 指定快照文件和aof文件的存放路径
    

    1.3 快照持久化

    redis通过创建快照来获取存储在内存里的数据在某个时间点上的副本。以便数据丢失时恢复数据。
    如果新的快照文件创建完毕之前,Redis、系统或硬件之间任意一个崩溃,那么redis将丢失最近一次创建快照之后写入的所有数据。

    创建快照的方式:

    • 客户端向redis发送bgsave命令(windows不支持,因为windows不支持fork)来主动创建一个快照。redis会调用fork来创建一个子进程,然后子进程负责将快照写入磁盘,父进程继续执行别的请求。
    • 客户端向redis发送save命令来主动创建快照。这种方式服务器会阻塞,不再响应别的请求。
    • 用户配置了save选项,满足了会自动创建快照。(比如前面的save 60 1000)
    • 当redis通过shutdown命令接收到关闭服务器请求时,或者接收到标准term信号时,会执行一个save命令,阻塞所有客户端。
    • 当一个redis服务器连接到另一个redis服务器时,并向对方发送sync命令来开始一次复制操作的时候,如果主服务器没有执行bgsave命令或者并非刚刚执行bgsave命令,那么主服务器会调用一次bgsave。

    从上面的介绍中可以看出,快照持久化因为要将所有的数据都保存到硬盘,所以执行时间会比较慢,这就无法保证数据的实时性。(过于频繁会浪费资源,过于稀少会丢失大量数据)。

    1.4 AOF持久化

    将被执行的写命令写到aof文件的末尾,以此来记录数据发生的变化。并不是像快照持久化一样每次都完整的执行一次持久化。

    重写/亚索AOF文件:
    aof文件既可以将损失数据的时间降至1秒(甚至不丢失任何数据),又可以在极短的时间内完成持久化。但是aof文件会越来越大(甚至会占满整个硬盘),因为aof文件太大,还原操作也会消耗大量的时间。所以,重写/压缩aof文件刻不容缓。

    重写/压缩aof文件方式:
    auto-aof-rewrite-percentage 100
    auto-aof-rewrite-min-size 64mb
    (要同时满足这两个条件才会重写/压缩aof文件,具体含义参考前面的持久化配置选项)

    2.主从复制

    2.1 对redis的复制相关选项进行配置

    slaveof host post ## 从服务器连接主服务器
    slaveof no one ## 让服务器终止复制操作,不再接受主服务器的数据更新
    

    从服务器连接主服务器时的步骤:

    步骤 主服务器操作 从服务器操作
    1 (等待命令接入) 连接主服务器,发送sync命令
    2 开始执行bgsave,并使用缓冲区记录bgsave之后执行的所有写命令 根据配置选项来决定是继续使用现有的数据来处理客户端的请求,还是向发送请求的客户端返回错误
    3 bgsave执行完毕,向从服务器发送快照文件,并在发送期间继续使用缓冲区记录被执行写命令 丢弃所有旧数据,开始载入主服务器发来的快照文件
    4 快照文件发送完毕,开始向从服务器发送缓冲区里面的写命令 完成对快照文件的解释操作,想往常一样开始接受命令请求
    5 缓冲区存储的写命令发送完毕;从现在开始,每执行一个写命令,就向从服务器发送相同的写命令 执行从服务器发来的所有存储在缓冲区里面的写命令;从现在开始,接受并执行主服务器传来的每个写命令。

    2.2 主从链

    从服务器拥有自己的从服务器,并由此形成了主从链。
    当读请求的重要性明显高于写请求的重要性,并且读请求的数量远远超出一台redis服务器可以处理的范围时,用户就需要添加新的从服务器来处理读请求。
    随着负载不断上升,主服务器可能会无法快速地更新所有的从服务器,为了缓解这个问题,用户可以创建一个redis主从节点组成的中间层来分担主服务器的复制工作。

    3.redis事务

    在多个客户端同时处理相同的数据时,不谨慎的操作将会导致数据出错。redis事务和关系数据库的事务不一样,redis事务提供了一种“将多个命令打包, 然后一次性、按顺序地执行”的机制, 并且事务在执行的期间不会主动中断 —— 服务器在执行完事务中的所有命令之后, 才会继续处理其他客户端的其他命令。

    悲观锁:
    在访问写入为目的的数据的时候,关系数据库会对被访问的数据行进行加锁,直到事务被提交为止。如果有其他客户端试图对被加锁的数据行进行写入,那么该客户端将被阻塞,直到第一个事务执行完为止。缺点是,持有锁的客户端运行越慢,等待解锁的客户端被阻塞的时间就越长。
    乐观锁:
    redis为了减少客户端的等待时间,并不会在执行watch命令时对数据进行加锁。redis只会在数据已经被其他客户端修改的情况下,通知执行了watch命令的客户端。

    4.非事务型流水线

    在需要执行大量命令的情况下,即使命令实际上并不需要放在事务里面执行,但是为了通过一次发送所有命令来减少通信次数并降低延迟值,会用到非事务型流水线。(Jediscluster不支持管道模式)

    Jedis jedis = this.jedisPool.getResource();
    Pipeline pipeline = jedis.pipelined();
    pipeline.zremrangeByScore(semname, 0, now-timeout);
    pipeline.zadd(semname,now, identifier);
    Long id = pipeline.zrank(semname, identifier).get();
    pipeline.zrem(semname, identifier);
    pipeline.exec();
    

    5.分布式锁

    5.1 介绍

    不同机器上的不同Redis客户端先获取锁来得到对数据行排他性访问的能力,然后对数据行进行操作,最后释放锁。

    5.2 使用Redis构建锁

    构建锁之前,首先介绍一个redis命令:

    SETNX key value 
    将 key 的值设为 value ,当且仅当 key 不存在。
    若给定的 key 已经存在,则 SETNX 不做任何动作。
    SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。
    

    5.2.1 获取锁(带有超时时间)

    Jedis jedis = this.jedisPool.getResource();
    //设置锁的名字
    String lockName = "lock:";
    //锁的值
    String identifier = "identifier";
    //设置锁的过期时间是当前时间过后60s
    Long currentTime = System.currentTimeMillis()/1000;
    Long timeOut = currentTime + 60;
    Long end = currentTime + 10;
    //10s内一直获取,直到获取到了锁或者超时了才罢休
    while(System.currentTimeMillis()/1000 < end) {
        //获取到了锁
        if(jedis.setnx(lockName,identifier) == 1){
            jedis.exprie(lockTime,timeOut);
            return identifier;
        //没有设置超时时间,则为其设置超时时间
        }else if(-1 == jedis.ttl(lockName)){
            jedis.exprie(lockTime,timeOut);
        }
    }
    return null;
    

    5.2.2 释放锁(带有超时时间)

    Jedis jedis = this.jedisPool.getResource();
    Pipeline pipeline = jedis.pipelined();
    //锁的名字
    String lockName = "lock:";
    //锁的值
    String identifier = "identifier";
    while(true){
        pipeline.watch(lockName);
        if(identifier == pipeline.get(lockName)){
            pipeline.multi();
            pipeline.del(lockName);
            pipeline.exec();
            return true;    
        }
    }
    

    四、分片

    1、概念

    分片(partitioning)就是将你的数据拆分到多个 Redis 实例的过程,这样每个实例将只包含所有键的子集。

    2、作用

    • 允许使用很多电脑的内存总和来支持更大的数据库。没有分片,你就被局限于单机能支持的内存容量。
    • 允许伸缩计算能力到多核或多服务器,伸缩网络带宽到多服务器或多网络适配器。

    3、分片基础

    3.1. 范围分片

    先进入实例的用户在前面的片,前面的片满了再进入后面的片
    缺点:需要一个映射范围到实例的表格。这张表需要管理,不同类型的对象都需要一个表, 所以范围分片在 Redis 中常常并不可取,因为这要比替他分片可选方案低效得多。
    3.2 哈希分片

    • 使用一个哈希函数(例如,crc32 哈希函数) 将键名转换为一个数字。例如,如果键是 foobar, crc32(foobar)将会输出类似于 93024922 的东西。
    • 对这个数据进行取模运算,以将其转换为一个 0 到 3 之间的数字,这样这个数字就可以映射到我的 4 台 Redis 实例之一。93024922 模 4 等于 2,所以我知道我的键 foobar 应当存储到 R2 实例。注意:取模操作返回除法操作的余数,在许多编程语言总实现为%操作符。

    4、分片的不同实现

    • 客户端分片(Client side partitioning)意味着,客户端直接选择正确的节点来写入和读取指定键。许多 Redis 客户端实现了客户端分片。
    • 代理协助分片(Proxy assisted partitioning)意味着,我们的客户端发送请求到一个可以理解 Redis 协议的代理上,而不是直接发送请求到 Redis 实例上。代理会根据配置好的分片模式,来保证转发我们的请求到正确的 Redis 实例,并返回响应给客户端。Redis 和 Memcached 的代理 Twemproxy 实现了代理协助的分片。
    • 查询路由(Query routing)意味着,你可以发送你的查询到一个随机实例,这个实例会保证转发你的查询到正确的节点。Redis 集群在客户端的帮助下,实现了查询路由的一种混合形式 (请求不是直接从 Redis 实例转发到另一个,而是客户端收到重定向到正确的节点)。

    5、分片的缺点

    • 涉及多个键的操作通常不支持。例如,你不能对映射在两个不同 Redis 实例上的键执行交集(事实上有办法做到,但不是直接这么干)。
    • 涉及多个键的事务不能使用。
    • 分片的粒度(granularity)是键,所以不能使用一个很大的键来分片数据集,例如一个很大的有序集合。
    • 当使用了分片,数据处理变得更复杂,例如,你需要处理多个 RDB/AOF 文件,备份数据时你需要聚合多个实例和主机的持久化文件。
    • 添加和删除容量也很复杂。例如,Redis 集群具有运行时动态添加和删除节点的能力来支持透明地再均衡数据,但是其他方式,像客户端分片和代理都不支持这个特性。但是,有一种称为预分片(Presharding)的技术在这一点上能帮上忙。

    相关文章

      网友评论

        本文标题:读《Redis实战》有感

        本文链接:https://www.haomeiwen.com/subject/zhmylxtx.html