你读过jedis源码吗?

作者: 爪哇部落格 | 来源:发表于2019-04-24 18:03 被阅读13次

    本文将对redis的java客户端jedis进行源码剖析,目的是让读者对jedis客户端有清楚的认识;代码结构如下:

    源码地址:https://github.com/xetorthio/jedis

    代码结构

    jedis

    jedis 使用

    jedis的直接使用很简单,新建一个客户端便可直接使用

    public static void main(String[] args) {
            Jedis jedis = new Jedis("127.0.0.1", 6666);
            jedis.set("hello", "world");
            String value = jedis.get("hello");
            System.out.println("value:" + value);
            jedis.close();
    }
    

    jedis源码梳理

    jedis结构

    Jedis继承了 BinaryJedis ,并实现了诸多commands接口

    BinaryJedis

    BinaryJedis声明了三个成员变量分别是Client(客户端),Transaction(redis事务),Pipeline(redis批量执行)。BinaryJedis的构造函数创建了Client实例,以下为其中一种示例

    public BinaryJedis(final String host, final int port) {
        client = new Client(host, port);
      }
    

    我们再看具体的set命令实现

    public String set(final byte[] key, final byte[] value) {
        checkIsInMultiOrPipeline();
        client.set(key, value);
        return client.getStatusCodeReply();
    }
    

    由client去调用set方法,并返回client.getStatusCodeReply()结果;那么client是如何实现的呢,我们来看看client的类图


    client

    我们发现Client的父类也是继承了Connection类;我们从顶层向下梳理

    Connection

    原来是由Connection类实现了发送命令,建立连接,关闭连接;参照以下代码片段;

    /**建立连接**/
    public void connect() {
        if (!isConnected()) {
          try {
            socket = new Socket();
            socket.setReuseAddress(true);
            socket.setKeepAlive(true);
            socket.setTcpNoDelay(true);
            socket.setSoLinger(true, 0); 
            socket.connect(new InetSocketAddress(host, port), connectionTimeout);
            socket.setSoTimeout(soTimeout);
            if (ssl) {
              if (null == sslSocketFactory) {
                sslSocketFactory = (SSLSocketFactory)SSLSocketFactory.getDefault();
              }
              socket = (SSLSocket) sslSocketFactory.createSocket(socket, host, port, true);
              if (null != sslParameters) {
                ((SSLSocket) socket).setSSLParameters(sslParameters);
              }
              if ((null != hostnameVerifier) &&
                  (!hostnameVerifier.verify(host, ((SSLSocket) socket).getSession()))) {
                String message = String.format(
                    "The connection to '%s' failed ssl/tls hostname verification.", host);
                throw new JedisConnectionException(message);
              }
            }
            outputStream = new RedisOutputStream(socket.getOutputStream());
            inputStream = new RedisInputStream(socket.getInputStream());
          } catch (IOException ex) {
            broken = true;
            throw new JedisConnectionException(ex);
          }
        }
      }
    
    /**发送命令**/
    protected Connection sendCommand(final Command cmd, final byte[]... args) {
        try {
          connect();
         /**以协议的格式发送命令**/
          Protocol.sendCommand(outputStream, cmd, args);
          pipelinedCommands++;
          return this;
        } catch (JedisConnectionException ex) {
          try {
            String errorMessage = Protocol.readErrorLineIfPossible(inputStream);
            if (errorMessage != null && errorMessage.length() > 0) {
              ex = new JedisConnectionException(errorMessage, ex.getCause());
            }
          } catch (Exception e) {
          }
          broken = true;
          throw ex;
        }
      }
    
    /**获取响应 *Reply方法都是获取不同格式的响应 **/
    public String getStatusCodeReply() {
        flush();
        pipelinedCommands--;
        final byte[] resp = (byte[]) readProtocolWithCheckingBroken();
        if (null == resp) {
          return null;
        } else {
          return SafeEncoder.encode(resp);
        }
      }
    

    通过阅读Connection的源码,我们明确了它的职责是与Redis服务端建立连接,还可以向Redis服务端发送命令,以及获取Redis服务端的响应;我们再来看Connection的子类BinaryClient。

    /**重写父类的Connect方法,并加入auth与select的过程**/
    @Override
      public void connect() {
        if (!isConnected()) {
          super.connect();
          if (password != null) {
            auth(password);
            getStatusCodeReply();
          }
          if (db > 0) {
            select(Long.valueOf(db).intValue());
            getStatusCodeReply();
          }
        }
      }
    
    /**通过定义入参为byte数组的API方法,调用父类的sendCommand来发送命令**/
    public void set(final byte[] key, final byte[] value){
        sendCommand(Command.SET, key, value);
    }
    ....
    public void get(final byte[] key){
        sendCommand(Command.GET, key);
    }
    

    BinaryClient重写了connect方法,并实现了auth和select的过程,并声明了调用Redis的相关方法;最后是我们的目标类Client,它实现了定义了Redis常用API的Commands接口,而Client具体的实现方法则是调用BinaryClient之前声明的方法;

    @Override
      public void set(final String key, final String value) {
       /**调用BinaryClient的方法,将参数转换为byte[]**/
        set(SafeEncoder.encode(key), SafeEncoder.encode(value));
      }
    ....
    public void get(final String key) {
        get(SafeEncoder.encode(key));
      }
    

    至此我们明白Client是一个调用Redis相应API功能的客户端。

    我们来梳理一下,Connection是负责与Redis服务端的通讯的连接,Client是负责调用通讯的客户端,Jedis是给开发人员使用的客户端。我们知道了底层通讯是通过socket来实现的, 为了避免频繁的创建连接销毁连接,常用的办法是采用连接池技术,那么接下来我们一起来看看JedisPool相关的实现。

    JedisPool

    JedisPool使用

    JedisPool使用是先创建Pool实例,然后获取Jedis资源,使用结束后使用jedis.close()归还资源。

    public static void main(String[] args) {
            JedisPool jedisPool = new JedisPool("127.0.0.1", 6666);
            Jedis jedis = jedisPool.getResource();
            jedis.set("test", "jedis");
            String value = jedis.get("test");
            System.out.println("value:" + value);
            jedis.close();
        }
    

    JedisPool源码分析

    分析JedisPool源码,我们还是先看JedisPool的类图,了解其继承实现结构。


    JedisPool

    我们还是从上往下分析查看,先一起看看Pool的实现,以下是Pool的核心代码

    public abstract class Pool<T> implements Closeable {
     
      /**支持泛型的一般对象池,来自apache.common**/ 
     protected GenericObjectPool<T> internalPool;
      
     public Pool() {
      }
     /**Pool构造方法**/
     public Pool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
        initPool(poolConfig, factory);
      }
    
     /**根据poolConfig初始化池**/
     public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {
        if (this.internalPool != null) {
          try {
            closeInternalPool();
          } catch (Exception e) {
          }
        }
        this.internalPool = new GenericObjectPool<T>(factory, poolConfig);
      }
    
     /**获取资源**/
     public T getResource(){
      try {
          return internalPool.borrowObject();
        } catch (NoSuchElementException nse) {
          if (null == nse.getCause()) { pool
            throw new JedisExhaustedPoolException(
                "Could not get a resource since the pool is exhausted", nse);
          }
          throw new JedisException("Could not get a resource from the pool", nse);
        } catch (Exception e) {
          throw new JedisConnectionException("Could not get a resource from the pool", e);
        }
    }
    
     /**归还资源**/
     protected void returnResourceObject(final T resource) {
      if (resource == null) {
          return;
        }
        try {
          internalPool.returnObject(resource);
        } catch (Exception e) {
          throw new JedisException("Could not return the resource to the pool", e);
        }
    }
    
     /**销毁资源**/
     public void destroy() {..省略代码..}
    
    }
    

    我们看到Pool的构造函数会调用initPool方法,而initPool实际上就是初始化的apache.common GenericObjectPool的实例internalPool,而获取资源则是从 internalPool.borrowObject(),归还则是 internalPool.returnObject(resource);所以Pool是依托GenericObjectPool来实现的;那么GenericObjectPool是如何构建对象池的呢,我们一起来看该类的构造方法。

    public GenericObjectPool(PooledObjectFactory<T> factory, GenericObjectPoolConfig<T> config) {
            super(config, "org.apache.commons.pool2:type=GenericObjectPool,name=", config.getJmxNamePrefix());
            this.factoryType = null;
            this.maxIdle = 8;
            this.minIdle = 0;
            this.allObjects = new ConcurrentHashMap();
            this.createCount = new AtomicLong(0L);
            this.makeObjectCount = 0L;
            this.makeObjectCountLock = new Object();
            this.abandonedConfig = null;
            if (factory == null) {
                this.jmxUnregister();
                throw new IllegalArgumentException("factory may not be null");
            } else {
                this.factory = factory;
                this.idleObjects = new LinkedBlockingDeque(config.getFairness());
                this.setConfig(config);
            }
        }
    

    构造函数中有两个参数,一个是池对象工厂PooledObjectFactory factory,一个是对象池配置GenericObjectPoolConfig config;config用于配置最大连接数,最大空闲数,最小空闲数;factory则是用于创建,销毁,验证池对象,其实现我们后续查看,先了解其作用即可。

    public interface PooledObjectFactory<T> {
        /**创建池对象**/
        PooledObject<T> makeObject() throws Exception;
    
        /**销毁对象**/
        void destroyObject(PooledObject<T> var1) throws Exception;
    
        /**验证池对象**/
        boolean validateObject(PooledObject<T> var1);
    
        /**激活对象**/ 
        void activateObject(PooledObject<T> var1) throws Exception;
      
       /**冻结对象**/ 
        void passivateObject(PooledObject<T> var1) throws Exception;
    }
    

    我们接着了解从GenericObjectPool的borrowObject过程,我们解释核心部分;

    public T borrowObject(long borrowMaxWaitMillis) throws Exception {
            this.assertOpen();//对象池是否打开的断言
            //....省略部分代码...
            PooledObject<T> p = null;//声明池对象
            while(true) {
                boolean create;
                do {
                    do {
                        do {
                            //创建成功后返回对象
                            if (p != null) {
                                this.updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
                                return p.getObject();
                            }
    
                            create = false;
                            p = (PooledObject)this.idleObjects.pollFirst();
                            if (p == null) {
                               //创建池对象过程
                                p = this.create();
                                if (p != null) {
                                    create = true;
                                }
                            }
    
                            if (blockWhenExhausted) {
                                if (p == null) {
                                    if (borrowMaxWaitMillis < 0L) {
                                        p = (PooledObject)this.idleObjects.takeFirst();
                                    } else {
                                        p = (PooledObject)this.idleObjects.pollFirst(borrowMaxWaitMillis, TimeUnit.MILLISECONDS);
                                    }
                                }
    
                                if (p == null) {
                                    throw new NoSuchElementException("Timeout waiting for idle object");
                                }
                            } else if (p == null) {
                                throw new NoSuchElementException("Pool exhausted");
                            }
    
                            if (!p.allocate()) {
                                p = null;
                            }
                        } while(p == null);
    
                        try {
                           //激活池对象
                            this.factory.activateObject(p);
                        } catch (Exception var13) {
                            try {
                                this.destroy(p);
                            } catch (Exception var12) {
                            }
    
                            p = null;
                            if (create) {
                                NoSuchElementException nsee = new NoSuchElementException("Unable to activate object");
                                nsee.initCause(var13);
                                throw nsee;
                            }
                        }
                    } while(p == null);
                } while(!this.getTestOnBorrow() && (!create || !this.getTestOnCreate()));
    
                boolean validate = false;
                Throwable validationThrowable = null;
    
                try {
                     //验证池对象
                    validate = this.factory.validateObject(p);
                } catch (Throwable var15) {
                    PoolUtils.checkRethrow(var15);
                    validationThrowable = var15;
                }
    
                if (!validate) {
                    try {
                        this.destroy(p);
                        this.destroyedByBorrowValidationCount.incrementAndGet();
                    } catch (Exception var14) {
                    }
    
                    p = null;
                    if (create) {
                        NoSuchElementException nsee = new NoSuchElementException("Unable to validate object");
                        nsee.initCause(validationThrowable);
                        throw nsee;
                    }
                }
            }
        }
    
    

    我们来梳理一下流程,首先声明对象,然后调用create,之后会在create方法中调用this.factory.makeObject()创建对象,激活对象,验证对象,对象不为空则返回对象。

    接着是JedisPoolAbstract,JedisPoolAbstract的代码比较简单,指定了泛型为Jedis,限制了池中资源是Jedis

    public class JedisPoolAbstract extends Pool<Jedis> {
    
      public JedisPoolAbstract() {
        super();
      }
    
      public JedisPoolAbstract(GenericObjectPoolConfig poolConfig, PooledObjectFactory<Jedis> factory) {
        super(poolConfig, factory);
      }
    
      @Override
      protected void returnBrokenResource(Jedis resource) {
        super.returnBrokenResource(resource);
      }
    
      @Override
      protected void returnResource(Jedis resource) {
        super.returnResource(resource);
      }
    }
    

    最后是JedisPool的代码

    public class JedisPool extends JedisPoolAbstract {
     /**丰富的构造方法-省略**/
    
     /**核心构造方法**/
    public JedisPool(final GenericObjectPoolConfig poolConfig, final URI uri,
          final int connectionTimeout, final int soTimeout) {
        //JedisFactory 是PooledObjectFactory实现类
        super(poolConfig, new JedisFactory(uri, connectionTimeout, soTimeout, null));
      }
    
     /**重写获取资源方法**/
     @Override
      public Jedis getResource() {
        Jedis jedis = super.getResource();
        jedis.setDataSource(this);
        return jedis;
      }
    
     /**重写归还异常资源方法,对象池会使其不可用并销毁**/
      @Override
      protected void returnBrokenResource(final Jedis resource) {
        if (resource != null) {
          returnBrokenResourceObject(resource);
        }
      }
    
     /**重写归还资源方法**/
      @Override
      protected void returnResource(final Jedis resource) {
        if (resource != null) {
          try {
            resource.resetState();
            returnResourceObject(resource);
          } catch (Exception e) {
            returnBrokenResource(resource);
            throw new JedisException("Resource is returned to the pool as broken", e);
          }
        }
      }
    }
    

    JedisPool也是重写了父类获取资源,归还资源的方法;其中我们需要注意的是JedisPool的构造方法调用了父类的构造方法,JedisFactory是PooledObjectFactory的实现类。

      @Override
      public PooledObject<Jedis> makeObject() throws Exception {
        final HostAndPort hostAndPort = this.hostAndPort.get();
        final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
            soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
    
        try {
          jedis.connect();
          if (password != null) {
            jedis.auth(password);
          }
          if (database != 0) {
            jedis.select(database);
          }
          if (clientName != null) {
            jedis.clientSetname(clientName);
          }
        } catch (JedisException je) {
          jedis.close();
          throw je;
        }
    
        return new DefaultPooledObject<Jedis>(jedis);
    
      }
    

    我们梳理一下整个JedisPool的使用流程;JedisPool利用GenericObjectPool实现了Jedis资源池化,其构造函数中的JedisFactory实现了PooledObjectFactory的接口,GenericObjectPool实例对象 internalPool 是 JedisPool 的父类 Pool的成员变量,初始化JedisPool 时会调用父类构造方法,初始化internalPool,在需要申请资源时JedisPool 实例调用 getResource方法,getResource调用父类实现,父类实现是调用internalPool 的borrowObject()完成资源的获取;归还资源的流程类似。

    相关文章

      网友评论

        本文标题:你读过jedis源码吗?

        本文链接:https://www.haomeiwen.com/subject/airggqtx.html