美文网首页中间件一些收藏
SpringBoot2.x下Redis的Pipeline的使用

SpringBoot2.x下Redis的Pipeline的使用

作者: 小胖学编程 | 来源:发表于2020-12-17 15:56 被阅读0次

1. 简介

Redis Cluster(Redis集群模式)会将key经过hash后分片存储到不同的节点。每个Redis节点只能操作本节点下的key。故Redis Cluster不支持mget等的批量操作。

Redis Cluster不支持mget等命令,是因为一个命令含有多个key,而多个key可能在不同的节点,所以Redis Cluster是直接不支持的。

Jedis客户端访问Redis Cluster时,已经计算出key所在的slot(hash槽),并根据(slot-JedisPool)的缓存可以获取到对应节点的Connection对象。使用对应节点的Connection对象去对应节点查询数据。详细请见——JedisCluster源码分析

但是一个节点上支持发送多条Redis命令(即单节点支持pipeline操作)。

而SpringBoot下若是使用Jedis客户端,没有实现对Redis Cluster的pipeline操作。
而SpringBoot2.0以后的版本,使用的是lettuce客户端,Spring实现了对Redis Cluster的pipeline操作。

2. SpringBoot的使用

引入依赖,SpringBoot2.x版本2.1.3

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

使用方式:

@RestController
@Slf4j
public class TestPipelineRedisController {


    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @RequestMapping("/redis")
    public String redis1() {
        //得到的就是Redis返回的有序的结果,Spring已经帮助序列化。
        List<Object> originalDatas = stringRedisTemplate.executePipelined(new RedisCallback() {
            @Override
            public Object doInRedis(RedisConnection connection) throws DataAccessException {
                connection.set("HH:AA".getBytes(),"我是中国人".getBytes());
                connection.get("HH:123".getBytes());
                connection.hGetAll("HH:HASH:cc".getBytes());
                return null;
            }
        });
        return JSON.toJSONString(originalDatas);
    }
}

关注事项:

  1. doInRedis()方法必须返回null,要不会出现异常;
  2. doInRedis()方法中,命令并不会被执行,故也不会返回结果;
  3. originalDatas的结果已经按照stringRedisTemplate配置的方式进行对key和value进行反序列化;
  4. originalDatas的结果顺序和命令是一致的,若命令出现异常,那么List<Object>对应节点存储的是异常对象,不会出现错位的情况。

SpringBoot的源码实现

public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperations<K, V>, BeanClassLoaderAware {

    @Override
    public List<Object> executePipelined(RedisCallback<?> action) {
        return executePipelined(action, valueSerializer);
    }

    /*
     * (non-Javadoc)
     * @see org.springframework.data.redis.core.RedisOperations#executePipelined(org.springframework.data.redis.core.RedisCallback, org.springframework.data.redis.serializer.RedisSerializer)
     */
    @Override
    public List<Object> executePipelined(RedisCallback<?> action, @Nullable RedisSerializer<?> resultSerializer) {

        return execute((RedisCallback<List<Object>>) connection -> {
            //将pipeline对象放入到connection对象中
            connection.openPipeline();
            boolean pipelinedClosed = false;
            try {
                //用户代码,使用pipeline进行操作,只是将命令保存到buf缓存中
                Object result = action.doInRedis(connection);
               //【注意】此处就是若用户不返回null,抛出异常的原因
                if (result != null) {
                    throw new InvalidDataAccessApiUsageException(
                            "Callback cannot return a non-null value as it gets overwritten by the pipeline");
                }
               //和Redis进行socket通信发送命令获取结果【此时才会得到Redis的结果】
                List<Object> closePipeline = connection.closePipeline();
                pipelinedClosed = true;
               //对返回的结果按照RedisTemple配置的方式进行反序列化
                return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);
            } finally {
                if (!pipelinedClosed) {
                    connection.closePipeline();
                }
            }
        });
    }
}

注意:代码使用JedisConnection进行的pipeline操作。Jedis对pipeline操作的原理以及源码分析可见:
Jedis源码分析
JedisCluster源码分析

相关文章

网友评论

    本文标题:SpringBoot2.x下Redis的Pipeline的使用

    本文链接:https://www.haomeiwen.com/subject/ywrngktx.html