美文网首页Redisjava进阶干货
redis之管道应用场景及源码分析

redis之管道应用场景及源码分析

作者: jerrik | 来源:发表于2017-10-10 13:01 被阅读0次
    一、redis通信基础

    我们都知道,redis的通信是建立在tcp基础上的,也就是说每一次命令(get、set)都需要经过tcp三次握手,而且redis一般都是部署在局域网内,网络开销非常小,针对频次较低的操作,网络开销都是可以忽略的。

    二、什么情况下需要使用redis的管道?

    在redis通信基础中 我已经讲到了。每一次操作redis的时候我们都需要和服务端建立连接,针对量小的情况下网络延迟都是可以忽略的,但是针对大批量的业务,就会产生雪崩效应。假如一次操作耗时2ms,理论上100万次操作就会有2ms*100万ms延迟,中间加上服务器处理开销,耗时可能更多.对应客户端来讲,这种长时间的耗时是不能接受的。所以为了解决这个问题,redis的管道pipeline就派上用场了。 恰好公司的对账业务使用了redis的sdiff功能,数据量比较大,刚开始没有pipeline导致延迟非常严重。后来wireshark抓包分析原因确实发现不停的建立tcp连接(发送数据,接收数据)。使用pipeline后性能大幅度提升。

    三、使用实例
    1. 不使用pipeline的情况
       private static void withoutPipeline(int count){
            try {
                for(int i =0; i<count; i++){
                  CacheUtils.sadd("testWithout", "key_" + i);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    1. 使用pipeline的情况
        private static void usePipeline(int count) {
            try {
                Pipeline pipe = CacheUtils.pipelined();
                for (int i = 0; i < count; i++) {
                    pipe.sadd("test", "key_" + i);
                }
                pipe.sync();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    1. 测试一下(循环操作1万次,看耗时情况)
    int count = 10000;
    long start = System.currentTimeMillis();
    withoutPipeline(count);
    long end = System.currentTimeMillis();
    System.out.println("withoutPipeline: " + (end - start));
    
    start = System.currentTimeMillis();
    usePipeline(count);
    end = System.currentTimeMillis();
    System.out.println("usePipeline: " + (end - start));
    
    output:
    操作10000次的结果:
    withoutPipeline: 9266
    usePipeline: 66
    
    操作1000000次的结果:
    withoutPipeline: 834535
    usePipeline: 4803
    

    可想而知,使用pipeline的性能要比不使用管道快很多倍。

    四、jredis pipeline()源码分析
     public Pipeline pipelined() {
        Pipeline pipeline = new Pipeline();
        pipeline.setClient(client);
        return pipeline;
      }
    
    //使用管道的sadd
    public Response<Long> sadd(String key, String... member) {
        getClient(key).sadd(key, member);
        return getResponse(BuilderFactory.LONG);//没有手动调用flush(),而是返回一个固定值。
      }
    
    //让我们来看看pipeline的sync()方法:
     public void sync() {
        if (getPipelinedResponseLength() > 0) {
          List<Object> unformatted = client.getAll();
          for (Object o : unformatted) {
            generateResponse(o);
          }
        }
      }
    
    //client.getAll()调用了getAll(0)
    public List<Object> getAll(int except) {
        List<Object> all = new ArrayList<Object>();
        flush();//也就是说在调用pipeline.sync()时手动触发的flush()方法,一次pipeline操作真正意思上只有一次tcp
        while (pipelinedCommands > except) {
          try {
            all.add(readProtocolWithCheckingBroken());
          } catch (JedisDataException e) {
            all.add(e);
          }
          pipelinedCommands--;
        }
        return all;
      }
    
    
    //未使用管道的sadd方法
    public Long sadd(final String key, final String... members) {
        checkIsInMulti();
        client.sadd(key, members);//这里的sadd将会执行下面的sendCommand发送指令
        return client.getIntegerReply();//客户端立即发送数据到服务端。客户端等待服务端返回
      }
    
    //所有的发送指令都要调用该方法,但是该方法并没有真正发送数据。
     protected Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) {
        try {
          connect();
          Protocol.sendCommand(outputStream, cmd, args);
          pipelinedCommands++;
          return this;
        } catch (JedisConnectionException ex) {
          // Any other exceptions related to connection?
          broken = true;
          throw ex;
        }
      }
    
     public Long getIntegerReply() {
        flush();//sendCommand方法调用后,还没有真正将数据写到服务端,当调用flush()后才真正触发发送数据
        pipelinedCommands--;
        return (Long) readProtocolWithCheckingBroken();
      }
    

    本文就先到这里了。。。

    相关文章

      网友评论

        本文标题:redis之管道应用场景及源码分析

        本文链接:https://www.haomeiwen.com/subject/lpjvyxtx.html