美文网首页
vert.x源码解析之redis集成

vert.x源码解析之redis集成

作者: 穹柏 | 来源:发表于2021-06-16 14:45 被阅读0次

@[toc]

前言

bala bala...

vertx是什么

vert.x是一个全异步框架,通过内部的event bus跟event loop做到了处处皆异步。关于里面的细节,后面我会出一篇文章详细跟大家探讨...

为什么要用vertx

充分利用机器性能,提升应用的吞吐量上限。

vertx-redis-client是什么

通俗来讲,vertx-redis提供了一个全异步的可配置的redis客户端

vertx-redis-client有哪些特性

相对常规的jedis或者Lettuce来说,这里的特性其实就是vertx贯穿全局的特性:异步。当我们调用api发送一条redis指令的时候,可以不用等待redis的响应,只需要绑定一个回调函数,程序就可以继续往下执行。当拿到redis的响应之后,就会触发这个回调。当然,像这种编程方式,需要把业务依赖关系处理得很明确,因为我们必须把依赖这个redis响应的所有处理都移到这个回调函数里去。

怎么使用vertx-redis-client

添加vertx-redis-client依赖

//gradle
compile("io.vertx:vertx-redis-client:$version")
//maven
<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-redis-client</artifactId>
  <vertion></version>
</dependency>

RedisConnection

RedisConnection是vertx-redis-client里暴露出来可操作redis的最基本的一组api了,使用如下

  Redis
    .createClient(Vertx.vertx(), "redis://ip:port")
    .connect(onConnect -> {
       //连接成功
       if (onConnect.succeeded()) {
           //获取连接实例
           RedisConnection conn = onConnect.result();
           //创建redis get指令. 
           Request command = Request.cmd(Command.GET);
           //添加指令参数aa. 最后到达redis的指令就是get aa
           command.arg("aa");
           //发送指令
           conn.send(command, resp -> {
               //执行成功
               if (resp.succeeded()) {
                   //拿到指令执行结果
                   Response result = resp.result();
               }
           });
       }
   });

这里就谈谈怎么使用吧,其实内部的实现就是通过netty包装的channel跟redis建立起连接,有兴趣的朋友可以点进去看看。

redisOptions

在介绍该redisClient各种模式之前,我们需要了解options是什么。
简单来说,redisOptions是我们用来定制该redisClient的一种方式,像之前我们通过==Redis.createClient(Vertx.vertx(), "redis://ip:port")== 这种仅仅提供一个地址的方式就建立起了一个redisClient,但是如果我们想要定制更复杂的redisClient呢?这个时候就需要用到redisOptions了

各项主要配置的含义

先大概看下redisOptions都有哪些属性:

public class RedisOptions {

  /**
   * The default redis endpoint = {@code redis://localhost:6379}
   */
  public static final String DEFAULT_ENDPOINT = "redis://localhost:6379";
  
  private RedisClientType type;
  private NetClientOptions netClientOptions;
  private List<String> endpoints;
  private int maxWaitingHandlers;
  private int maxNestedArrays;
  private String masterName;
  private RedisRole role;
  private RedisSlaves slaves;
  private String password;

  // pool related options
  private int poolCleanerInterval;
  private int maxPoolSize;
  private int maxPoolWaiting;
  private int poolRecycleTimeout;
}

type

该枚举用于指定redisClient以什么模式去连接server,这里的模式跟redis server的模式一一对应,我们在用的时候也必须注意,server是以什么模式部署的,client就以什么模式去配置。源码如下

public enum RedisClientType {

  /**
   * 默认值,单机模式
   */
  STANDALONE,

  /**
   * 哨兵
   */
  SENTINEL,

  /**
   * 集群
   */
  CLUSTER
}

这些配置决定了redisClient与server交互的方式,不同的类型会用不同的策略来应对,在我们调用Redis#creatClient的时候就会作出处理,源码如下

static Redis createClient(Vertx vertx, RedisOptions options) {
    switch (options.getType()) {
      case STANDALONE:
        return new RedisClient(vertx, options);
      case SENTINEL:
        return new RedisSentinelClient(vertx, options);
      case CLUSTER:
        return new RedisClusterClient(vertx, options);
      default:
        throw new IllegalStateException("Unknown Redis Client type: " + options.getType());
    }
  }

至于不同Client内部是怎么处理的,后面会一一介绍

masterName

这个只在Sentinel模式下有用,跟redisServer的redis_sentinel.conf里配置的masterName是一个东西,用来指明连接的是被哨兵监控的哪一个主从集群

role

这项配置只用于clientType为Sentinel的情况,主要用来指定当前的客户端到底是跟master连接还是只跟slave连接,如果配置为slave,则通过该客户端发起的所有指令都会被路由到slave节点,这意味着这个client对redisServer的所有操作都是只读的。
还有一个sentinel,这个意味着这个client只操作server端的sentinel,不会主动去获取到master或者slave的连接。按我目前看来,这个枚举值其实是给程序内部使用的,除非我们想手动实现跟哨兵的通信以及维护相应主从节点的一个failover情况,那么就可以使用这个配置。核心源码在RedisSentinelClient中,如下所示

private void createConnectionInternal(RedisOptions options, RedisRole role, Handler<AsyncResult<RedisConnection>> onCreate) {
    switch (role) {
      case SENTINEL:
        resolveClient(this::isSentinelOk, options, createAndConnect);
        break;

      case MASTER:
        resolveClient(this::getMasterFromEndpoint, options, createAndConnect);
        break;

      case SLAVE:
        resolveClient(this::getSlaveFromEndpoint, options, createAndConnect);
    }
  }

这里可以看到,根据不同的role,获取了不同类型的redis连接地址

private void getMasterFromEndpoint(String endpoint, RedisOptions options, Handler<AsyncResult<String>> handler) {
  final RedisURI uri = new RedisURI(endpoint);
  connectionManager.getConnection(context, getSentinelEndpoint(uri), null, onCreate -> {
    final RedisConnection conn = onCreate.result();
    final String masterName = options.getMasterName();
    
    // 根据masterName从sentinel获取相应主从集群的master节点信息
    conn.send(cmd(SENTINEL).arg("GET-MASTER-ADDR-BY-NAME").arg(masterName), getMasterAddrByName -> {
      //bala bala...
    });
  });
private void getSlaveFromEndpoint(String endpoint, RedisOptions options, Handler<AsyncResult<String>> handler) {
  final RedisURI uri = new RedisURI(endpoint);
    connectionManager.getConnection(context, getSentinelEndpoint(uri), null, onCreate -> {
      
      final RedisConnection conn = onCreate.result();
      final String masterName = options.getMasterName();
      // 获取masterName指定主从的slave节点
      conn.send(cmd(SENTINEL).arg("SLAVES").arg(masterName), sentinelSlaves -> {
        //...
      });
    });
  }

slaves

这项配置用来指定在集群模式下对server的读操作的行为,源码如下

public enum RedisSlaves {

  /**
   * 读操作只落在master
   */
  NEVER,

  /**
   * 读操作会随机落在master跟slave
   */
  SHARE,

  /**
   * 读操作只落在slave上
   */
  ALWAYS
}

其核心原理是通过对slave节点执行readonly命令来开启slave的查询功能,因为默认集群模式下slave是不对外提供服务的。核心源码如下

private void connect(List<String> endpoints, int index, Handler<AsyncResult<RedisConnection>> onConnect) {
    //如果RedisSlaves的值不是NEVER,就执行readonly
    connectionManager.getConnection(context, endpoints.get(index), RedisSlaves.NEVER != options.getUseSlave() ? cmd(READONLY) : null, getConnection -> {
    }
}

poolCleanerInterval

连接池空闲连接清理间隔,每次扫描时会直接将闲置的连接关闭。-1表示不开启空闲连接清理,核心源码在RedisConnectionManager#start()

synchronized void start() {
    long period = options.getPoolCleanerInterval();
    //延迟period时间后执行checkExpired()进行空闲连接清理
    this.timerID = period > 0 ? vertx.setTimer(period, id -> checkExpired(period)) : -1;
  }

poolRecycleTimeout

这个是用来控制连接池中连接在执行完命令后还能存活的时间,超过这个时间连接就会被关闭,核心源码在RedisStandaloneConnection。如下

public boolean isValid() {
    return expirationTimestamp > 0 && System.currentTimeMillis() <= expirationTimestamp;
  }

  @Override
  public void close() {
    // recycle this connection from the pool
    expirationTimestamp = recycleTimeout > 0 ? System.currentTimeMillis() + recycleTimeout : 0L;
    listener.onRecycle();
  }

maxPoolSize

连接池最大大小。只有当等待获取连接的请求数量达到maxPoolWaiting时才会创建新的连接,可以类比ThreadPoolExecutormaximumPoolSize

maxPoolWaiting

连接池等待队列大小。当我们通过RedisClient或者RedisApi去操作reids时,如果当前已无空闲的redis连接,那么这个请求就会进入连接池的等待队列中。可以类比ThreadPoolExecutorworkQueue

maxWaitingHandlers

等待执行的handler数的最大值。当我们发送完redis指令后,一般是需要对这个响应作出处理,这个过程会被包装成一个task添加到eventLoopwaitQueue中,这项配置控制的就是这个queuesize

RedisClient

redisClient本质是对上述redisConnection的再封装,提供了针对redisServer不同模式的不同处理,分为3种:单机,哨兵以及集群模式。对应到具体的class:RedisClientRedisSentinelClientRedisSentinelClient
接下来来看一下如何配置这些客户端

STANDALONE

这个是redisClient默认的一种配置模式,现在我们用前面描述的redisOptions实践一下。

    RedisOptions options = new RedisOptions();
    options.setType(RedisClientType.STANDALONE)
            //指定server地址
            .setConnectionString("redis://ip:port")
            .setMaxPoolSize(6)
            .setMaxWaitingHandlers(1024)
            .setPoolRecycleTimeout(15_000)
            .setMaxPoolWaiting(2);

    Redis client = Redis.createClient(Vertx.vertx(), options);
    //创建redis 指令. get
    Request command = Request.cmd(Command.GET);
    //添加指令参数aa. 最后到达redis的指令就是get aa
    command.arg("aa");
    //发送指令并指定一个匿名handler处理结果
    client.send(command).onComplete(event -> {
        System.out.println("the value of redis key 'aa' is"+event.result().toString());
    });

SENTINEL

这里请原谅我秀了一下我那蹩脚的英语水平...

    RedisOptions options = new RedisOptions();
    options.setType(RedisClientType.SENTINEL)
            //the master name which are monitored by sentinels
            .setMasterName("myMaster")
            .setRole(RedisRole.MASTER)
            .setMaxPoolSize(6)
            .setMaxWaitingHandlers(1024)
            .setPoolRecycleTimeout(15_000)
            .setMaxPoolWaiting(2);

    //这里只填哨兵的地址就行了
    //程序会自动从哨兵处获取节点信息
    List<String> sentinels = new ArrayList<>();
    sentinels.add("redis://ip:port");
    sentinels.add("redis://ip:port");
    options.setEndpoints(sentinels);
    
    ...

这里可以只填一个哨兵地址,由于client不会主动去连接监控同一个master的哨兵节点,所以一旦这个唯一的哨兵挂了,那么server就会处于不可用的状态。

CLUSTER

    RedisOptions options = new RedisOptions();
    options.setType(RedisClientType.CLUSTER)
            .setUseSlave(RedisSlaves.NEVER)
            .setMaxPoolSize(6)
            .setMaxWaitingHandlers(1024)
            .setPoolRecycleTimeout(15_000)
            .setMaxPoolWaiting(20);
    //添加集群节点,越多越好...
    List<String> clusters = new ArrayList<>();
    sentinels.add("redis://ip:port");
    sentinels.add("redis://ip:port");
    sentinels.add("redis://ip:port");
    options.setEndpoints(clusters);

这里也需要注意最好将cluster中所有的节点信息给添加进来。虽然连接任意一个节点,client都会通过执行cluster slots获取cluster中所有节点的信息,但是长久保持连接的只有我们在这里设置进去的节点。这意味着一旦我们手动设置的节点挂了,那么server对我们来说就不可用了

RedisApi

这个是基于RedisClient的封装,隐藏了Command的复杂细节,提供了一套可以直接调用的api。使用如下:

    Redis client = Redis.createClient(Vertx.vertx(), options);
    RedisAPI api = RedisAPI.api(client);
    api.get("aa").onComplete(event -> {
        System.out.println("the value of redis key 'aa' is" + event.result().toString());
    });

总结

全文描述了vertx-client是什么以及其相对某些redis-client所具备的特点。同时深挖了这个客户端各项配置的作用以及不同模式下的表现。
总的来说,这篇文章可以帮助大家去建立一个基于vertx的redis客户端,并且对这个客户端能够做到基本的知其然且知其所以然

相关文章

网友评论

      本文标题:vert.x源码解析之redis集成

      本文链接:https://www.haomeiwen.com/subject/hxzxyltx.html