美文网首页
Jedis源码分析

Jedis源码分析

作者: 小胖学编程 | 来源:发表于2020-12-16 13:25 被阅读0次

引入对应的依赖,Jedis的版本2.9.1,SpringBoot版本2.1.3.RELEASE

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>

1. JAVA操作下Jedis源码分析

Redis客户端和服务端之间的通信是借助Socket来实现的。
socket通信—实现网络间的IO通信

Redis通信流程图.png
public class JedisPipeline {
    //密码xxx,使用1号库
    private static JedisPool pool = new JedisPool(new JedisPoolConfig(), "127.0.0.1",6379, 2000, "xxx", 1);

    public static void main(String args[]) {
        test1();
    }
    public static void test1(){
        //在连接池中拿出连接
        Jedis jedis = pool.getResource();
        //执行查询String结构的命令。
        jedis.get("CCC");
        jedis.close();
    }
}

1.1 获取到Jedis连接

1.1.1 创建socket连接

获取Jedis对象.png

创建Connection对象,实际上创建socket连接,并获取到输出流和输入流。

public class Connection implements Closeable {
  //装饰的socket的输出和输入流
  private RedisOutputStream outputStream;
  private RedisInputStream inputStream;
...
  public void connect() {
    if (!isConnected()) {
      try {
        socket = new Socket();
        socket.setReuseAddress(true);
        socket.setKeepAlive(true);
        socket.setTcpNoDelay(true); 
        socket.setSoLinger(true, 0);

        socket.connect(new InetSocketAddress(host, port), connectionTimeout);
        socket.setSoTimeout(soTimeout);
        ...
        outputStream = new RedisOutputStream(socket.getOutputStream());
        inputStream = new RedisInputStream(socket.getInputStream());
      } catch (IOException ex) {
        broken = true;
        throw new JedisConnectionException("Failed connecting to host " 
            + host + ":" + port, ex);
      }
    }
  }
}

上面的RedisOutputStream和RedisInputStream是装饰流,装饰socket.getInputStream()

1.1.2 装饰socket的输出流

FilterOutputStream是装饰流类,目的就是装饰socket的outputStream流。

当使用pipeline传递多个命令时,可以确保os.write()方法只是将命令写入到buf数组中,直到执行flush()方法时,才会将命令通过socket发送给redis服务端。

public final class RedisOutputStream extends FilterOutputStream {
  protected final byte[] buf;
  ...
   //注意:实际上是写入到buf中,并不会写入到socket。
  public void write(final byte b) throws IOException {
    if (count == buf.length) {
      flushBuffer();
    }
    buf[count++] = b;
  }
 ...
   
  @Override
  public void flush() throws IOException {
    //此时才将信息刷新到socket中。
    flushBuffer();
    out.flush();
  }
  //将buf数组刷新到socket流中
  private void flushBuffer() throws IOException {
    if (count > 0) {
      out.write(buf, 0, count);
      count = 0;
    }
  }
}

1.2 jedis的get命令

public class Jedis extends BinaryJedis implements JedisCommands, MultiKeyCommands,
    AdvancedJedisCommands, ScriptingCommands, BasicCommands, ClusterCommands, SentinelCommands {
  @Override
  public String get(final String key) {
    //检查是否是批量或者管道操作
    checkIsInMultiOrPipeline();
    //组装os流。
    client.sendCommand(Protocol.Command.GET, key);
    //这个方法是真正进行socket通信,内部含有flush方法。
    return client.getBulkReply();
  }
}

1.2.1 将命令读取到outputStream的缓存

将命令读取到RedisOutputStream中的buf数组进行缓存。

public class Connection implements Closeable {
  protected Connection sendCommand(final Command cmd, final String... args) {
    //转换参数编码
    final byte[][] bargs = new byte[args.length][];
    for (int i = 0; i < args.length; i++) {
      bargs[i] = SafeEncoder.encode(args[i]);
    }
    return sendCommand(cmd, bargs);
  }
 
  protected Connection sendCommand(final Command cmd, final byte[]... args) {
    try {
      //因为在`pool.getResource();`中建立了socket连接
      connect();
      //发送命令
      Protocol.sendCommand(outputStream, cmd, args);
      //int +1 记录使用
      pipelinedCommands++;
      return this;
    } catch (JedisConnectionException ex) {
      try {
        String errorMessage = Protocol.readErrorLineIfPossible(inputStream);
        if (errorMessage != null && errorMessage.length() > 0) {
          ex = new JedisConnectionException(errorMessage, ex.getCause());
        }
      } catch (Exception e) {
        /*
         * Catch any IOException or JedisConnectionException occurred from InputStream#read and just
         * ignore. This approach is safe because reading error message is optional and connection
         * will eventually be closed.
         */
      }
      // Any other exceptions related to connection?
      broken = true;
      throw ex;
    }
  }
}

Protocol将命令拼装成AOF文件的格式,该方法进行拼接。

public final class Protocol {
  public static void sendCommand(final RedisOutputStream os, final Command command,
      final byte[]... args) {
    sendCommand(os, command.raw, args);
  }

  private static void sendCommand(final RedisOutputStream os, final byte[] command,
      final byte[]... args) {
   //组装的是AOF格式的命令,读取到byte[]数组中
    try {
      os.write(ASTERISK_BYTE);
      os.writeIntCrLf(args.length + 1);
      os.write(DOLLAR_BYTE);
      os.writeIntCrLf(command.length);
      os.write(command);
      os.writeCrLf();

      for (final byte[] arg : args) {
        os.write(DOLLAR_BYTE);
        os.writeIntCrLf(arg.length);
        os.write(arg);
        os.writeCrLf();
      }
    } catch (IOException e) {
      throw new JedisConnectionException(e);
    }
  }
}

拼接后的格式:

*2
$3
GET
$3
CCC

1.2.2 将命令发送给Redis以及接收Redis返回值

  1. flush()方法中,将RedisOutputStream的buf数组的信息写入socket(即发送给redis中)
  2. readProtocolWithCheckingBroken(),读取socket信息(Redis返回的值)到内存中。

当Redis没有返回值,那么readProtocolWithCheckingBroken()方法将会阻塞。

public class Connection implements Closeable {

  public String getBulkReply() {
    //发送命令并接收redis返回的结果
    final byte[] result = getBinaryBulkReply();
    if (null != result) {
      //进行编码
      return SafeEncoder.encode(result);
    } else {
      return null;
    }
  }
  //执行
  public byte[] getBinaryBulkReply() {
    //发送命令【上面进行分析】
    flush();
    //管道的命令-1
    pipelinedCommands--;
    //通过socket获取结果
    return (byte[]) readProtocolWithCheckingBroken();
  }
}

2. pipeline源码分析

pipeline和上述代码原理一样,通过socket来完成通信,先接受多个命令缓存到outputStreambuf数组中,当执行sync()方法或syncAndReturnAll()方法时,将buf数组的内容输出到socket中。

public class JedisPipeline {
    //密码xxx,使用1号库
    private static JedisPool pool = new JedisPool(new JedisPoolConfig(), "127.0.0.1",6379, 2000, "xxx", 1);

    public static void main(String args[]) {
        test1();
    }
    public static void test3(){
        Jedis jedis = pool.getResource();
        Pipeline pipelined = jedis.pipelined();
        //多个命令
        pipelined.get("AAA");
        pipelined.get("CCC");
        pipelined.set("CC","CCTV1");
        //此时发送给Redis(无结果)
        //pipelined.sync();
        //此时发送给Redis(有结果)
        List<Object> objects = pipelined.syncAndReturnAll();
        System.out.println(JSON.toJSONString(objects));
        jedis.close();
    }
}

执行结果

[null,"CCTV","OK"]

Redis服务端是单线程,所以返回的结果值顺序就是命令的顺序。

Redis客户端为了保证命令的结果类型(String、Hash结果类型)和结果值一一对应,会按照顺序将结果类型放入到Queue中。

public abstract class PipelineBase extends Queable implements BinaryRedisPipeline, RedisPipeline {
  @Override
  public Response<String> get(final String key) {
    //将命令维护到OutputStream的buf缓存中
    getClient(key).get(key);
    //将命令的结果类型维护到缓存中
    return getResponse(BuilderFactory.STRING);
  }
}

存储命令的响应对象。

public class Queable {
  private Queue<Response<?>> pipelinedResponses = new LinkedList<Response<?>>();

  protected void clean() {
    pipelinedResponses.clear();
  }

  protected Response<?> generateResponse(Object data) {
    Response<?> response = pipelinedResponses.poll();
    if (response != null) {
      response.set(data);
    }
    return response;
  }

  protected <T> Response<T> getResponse(Builder<T> builder) {
    Response<T> lr = new Response<T>(builder);
    pipelinedResponses.add(lr);
    return lr;
  }

  protected boolean hasPipelinedResponse() {
    return !pipelinedResponses.isEmpty();
  }

  protected int getPipelinedResponseLength() {
    return pipelinedResponses.size();
  }
}
public class Pipeline extends MultiKeyPipelineBase implements Closeable {
  public List<Object> syncAndReturnAll() {
    if (getPipelinedResponseLength() > 0) {
      //将多个命令发送给redis,并接收到Redis的响应结果
      List<Object> unformatted = client.getAll();
      List<Object> formatted = new ArrayList<Object>();

      for (Object o : unformatted) {
        try {
          //`generateResponse`是Queable方法。响应结果按照结果类型进行组装,得到真正的返回结果。
          formatted.add(generateResponse(o).get());
        } catch (JedisDataException e) {
          formatted.add(e);
        }
      }
      return formatted;
    } else {
      return java.util.Collections.<Object> emptyList();
    }
  }
}

推荐阅读

下一篇—JedisCluster源码分析

Jedis源码解析(Pipeline的实现)

相关文章

  • Jedis源码分析

    对于日常开发,Redis由于单线程的并发模型、丰富的数据结构和简单的API,深受广大程序员的喜爱。Redis提供了...

  • Jedis源码分析

    https://github.com/xetorthio/jedis 1:redis协议 2:基本的redis命令...

  • Jedis源码分析

    引入对应的依赖,Jedis的版本2.9.1,SpringBoot版本2.1.3.RELEASE。 1. JAVA操...

  • Jedis之Sharded源码分析

    1.概述 当业务的数据量非常庞大时,单实例Redis可能无法支撑,需要考虑将数据存储到多个缓存节点上,如何定位数据...

  • Redis——Sentinel 高可用读写分离

    客户端高可用 我们直接来分析jedis的源码,找到相应的构造函数 初始化sentinel节点 下面是MasterL...

  • jedis源码解析

    说明:本文的源代码是3.1.0版本 结论 总的来说,Jedis原理大概是这样的:a、首先创建Jedis创建时需要提...

  • jedis对象池原理和源码分析

    1. 什么是对象池? 我们都知道一个对象,比如car其生命周期大致可分为“创建”,“使用”, “销毁”三个阶段,如...

  • 你读过jedis源码吗?

    本文将对redis的java客户端jedis进行源码剖析,目的是让读者对jedis客户端有清楚的认识;代码结构如下...

  • Jedis使用教程完整版

    记录是一种精神,是加深理解最好的方式之一。 最近深入研究了Jedis的源码,对Jedis的使用进行深入理解,提笔记...

  • Jedis客户端分片--构建高可用客户端分片

    记录是一种精神,是加深理解最好的方式之一。 最近深入研究了Jedis的源码,对Jedis的使用进行深入理解,提笔记...

网友评论

      本文标题:Jedis源码分析

      本文链接:https://www.haomeiwen.com/subject/kuqbgktx.html