美文网首页
Redisson 中的 CommandExecutor

Redisson 中的 CommandExecutor

作者: Raymond_Z | 来源:发表于2016-07-10 17:25 被阅读985次

    上篇Redisson 分布式锁实现分析中提到了RedissonLock中的redis命令都是通过CommandExecutor来发送到redis服务执行的,本篇就来了解一下它的实现方式。

    先来看其源码

    public interface CommandExecutor extends CommandSyncExecutor, CommandAsyncExecutor {
    }
    

    可以看到它同时继承了 同步和异步(sync/async) 两种调用方式。

    Note:

    • 在分布式锁的实现中是用了同步的 CommandExecutor,是因为锁的获取和释放是有强一致性要求的,需要实时知道结果方可进行下一步操作。
    • 上篇分布式锁分析中我提到 Redisson 的同步实现实际上是基于异步实现的,这在下文中也会得到解释。

    在Redisson中,除了提供同步和异步的方式执行命令之外,还通过 Reactive Streams 实现了 Reactive 方式的命令执行器。见下图

    预备知识

    Redisson 大量使用了 Redis 的 EVAL 命令来执行 Lua 脚本,所以先要对 EVAL 有所了解。

    EVAL命令格式和示例

    EVAL script numkeys key [key ...] arg [arg ...]
    
    > eval "return redis.call('set',KEYS[1],ARGV[1])" 1 foo bar
    OK
    

    从 Redis 2.6.0 版本开始,通过内置的 Lua 解释器,可以使用 EVAL 命令对 Lua 脚本进行求值。

    参数的说明本文不再详述,可查阅 Redis命令参考

    重点是这个:Redis 使用单个 Lua 解释器去运行所有脚本,并且 Redis 也保证脚本会以原子性(atomic)的方式执行,当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。所以 Redisson 中使用了 EVAL 来保证执行命令操作数据时的安全性。

    例子

    这里就使用 Redisson 参考文档中的一个 RAtomicLong 对象的例子吧。

    RedissonClient client = Redisson.create(config);
    RAtomicLong longObject = client.getAtomicLong('myLong');
    // 同步方式
    longObject.compareAndSet(3, 401);
    // 异步方式
    longObject.compareAndSetAsync(3, 401);
    
    RedissonReactiveClient client = Redisson.createReactive(config);
    RAtomicLongReactive longObject = client.getAtomicLong('myLong');
    // reactive方式
    longObject.compareAndSet(3, 401);
    

    根据此例,我们分别来看 compareAndSet/compareAndSetAsync 的实现,其他命令原理都一样。

    异步

    既然同步和Reactive的实现都继承了异步的实现,那我们就先来看看CommandAsyncService吧。

    例子中的 longObject.compareAndSetAsync(3, 401); 实际执行的是 RedissonAtomicLong 实现类的 compareAndSetAsync 方法,如下

    public Future<Boolean> compareAndSetAsync(long expect, long update) {
        return commandExecutor.evalWriteAsync(getName(),
                                              StringCodec.INSTANCE,
                                              RedisCommands.EVAL_BOOLEAN,
                                              "...此处省略...",
                                              Collections.<Object>singletonList(getName()),
                                              expect, update);
    }
    

    此处的 evalWriteAsync 就是在 CommandAsyncService 中实现的,如下

    public <T, R> Future<R> evalWriteAsync(String key,
                                           Codec codec,
                                           RedisCommand<T> evalCommandType,
                                           String script,
                                           List<Object> keys,
                                           Object ... params) {
        NodeSource source = getNodeSource(key);
        return evalAsync(source, false, codec, evalCommandType, script, keys, params);
    }
    
    private <T, R> Future<R> evalAsync(NodeSource nodeSource,
                                       boolean readOnlyMode,
                                       Codec codec,
                                       RedisCommand<T> evalCommandType,
                                       String script,
                                       List<Object> keys,
                                       Object ... params) {
        Promise<R> mainPromise = connectionManager.newPromise();
        List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
        args.add(script);
        args.add(keys.size());
        args.addAll(keys);
        args.addAll(Arrays.asList(params));
        async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0);
        return mainPromise;
    }
    

    追根溯源,最后来看看 async 方法的实现,

    protected <V, R> void async(final boolean readOnlyMode,
                                final NodeSource source,
                                final Codec codec,
                                final RedisCommand<V> command,
                                final Object[] params,
                                final Promise<R> mainPromise,
                                final int attempt) {
        // ....省略部分代码....
        // AsyncDetails 是一个包装对象,用来将异步调用过程中的对象引用包装起来方便使用
        final AsyncDetails<V, R> details = AsyncDetails.acquire();
        details.init(connectionFuture, attemptPromise,
                readOnlyMode, source, codec, command, params, mainPromise, attempt);
    
        // retryTimerTask 用来实现 Redisson 提供的重试机制
        final TimerTask retryTimerTask = new TimerTask() {
    
            @Override
            public void run(Timeout t) throws Exception {
                // ....省略部分代码....
                int count = details.getAttempt() + 1;
                // ....省略部分代码....
                async(details.isReadOnlyMode(), details.getSource(),
                        details.getCodec(), details.getCommand(),
                        details.getParams(), details.getMainPromise(), count);
                AsyncDetails.release(details);
            }
        };
        // 启用重试机制
        Timeout timeout = connectionManager.newTimeout(retryTimerTask,
                connectionManager.getConfig().getRetryInterval(),
                TimeUnit.MILLISECONDS);
        details.setTimeout(timeout);
    
        // checkConnectionFuture 用于检查客户端是否与服务端集群建立连接,如果连接建立
        // 则可发送命令到服务端执行
        if (connectionFuture.isDone()) {
            checkConnectionFuture(source, details);
        } else {
            connectionFuture.addListener(new FutureListener<RedisConnection>() {
                @Override
                public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
                    checkConnectionFuture(source, details);
                }
            });
        }
    
        // ....省略部分代码....
    }
    
    private <R, V> void checkConnectionFuture(final NodeSource source,
            final AsyncDetails<V, R> details) {
        // ....省略部分代码....
        // 获取客户端与服务端集群建立的连接
        final RedisConnection connection = details.getConnectionFuture().getNow();
    
        if (details.getSource().getRedirect() == Redirect.ASK) {
            // 客户端接收到 ASK 转向, 先发送一个 ASKING 命令,然后再发送真正的命令请求
            // ....省略部分代码....
        } else {
            // ....省略部分代码....
            // 客户端发送命令到服务端
            ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(),
                    details.getCodec(), details.getCommand(), details.getParams()));
            details.setWriteFuture(future);
        }
        // ....省略部分代码....
        // 释放本次连接
        releaseConnection(source, details.getConnectionFuture(), details.isReadOnlyMode(),
                details.getAttemptPromise(), details);
    }
    

    由于代码太长,我只贴出了和执行命令有关的部分代码,我们可以从上面代码中看到

    • Redisson 对每次操作都提供了重试机制,可配置 retryAttempts 来控制重试次数(缺省为3次),可配置 retryInterval 来控制重试间隔(缺省为 1000 ms)。Redisson 中使用了 Netty 的 TimerTaskTimeout 工具来实现其重试机制。
    • Redisson 中也大量使用了 Netty 实现的异步工具 FutureFutureListener,使得异步调用执行完成后能够立刻做出对应的操作。
    • RedissonConnection 是基于 Netty 实现的,发送命令的 send 方法实现是使用 Netty 的 Channel.writeAndFlush 方法。

    以上便是 Redisson 的异步实现。

    同步

    Redisson 里的同步都是基于异步来实现的,为什么这么说,来看看 RedissonAtomicLongcompareAndSet 方法,

    public boolean compareAndSet(long expect, long update) {
        return get(compareAndSetAsync(expect, update));
    }
    

    可见是在之前的异步方法外套了一个 get 方法,而这个 get 方法实际上也是在异步实现类 CommandAsyncService 中实现的,至于同步的实现类 CommandSyncService 有兴趣大家可以去看看,基本上都是在异步实现返回的 Future 外套了一个 get 方法。那么我们就看看 get 的实现,

    public <V> V get(Future<V> future) {
        final CountDownLatch l = new CountDownLatch(1);
        future.addListener(new FutureListener<V>() {
            @Override
            public void operationComplete(Future<V> future) throws Exception {
                l.countDown();
            }
        });
        try {
            // 阻塞当前线程
            l.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        if (future.isSuccess()) {
            return future.getNow();
        }
        throw convertException(future);
    }
    

    原来是利用了 CountDownLatch 在异步调用结果返回前将当前线程阻塞,然后通过 Netty 的 FutureListener 在异步调用完成后解除阻塞,并返回调用结果。

    Reactive

    从例子中可以看到,Reactive 的客户端和对象实现都是独立的,先来看看 RedissonAtomicLongReactivecompareAndSet 方法,

    public Publisher<Boolean> compareAndSet(long expect, long update) {
        return commandExecutor.evalWriteReactive(getName(), StringCodec.INSTANCE,
                RedisCommands.EVAL_BOOLEAN,
                "if redis.call('get', KEYS[1]) == ARGV[1] then "
                     + "redis.call('set', KEYS[1], ARGV[2]); "
                     + "return 1 "
                   + "else "
                     + "return 0 end",
                Collections.<Object>singletonList(getName()), expect, update);
    }
    

    它调用的是 CommandReactiveService 中实现的 evalWriteReactive 方法,

    public <T, R> Publisher<R> evalWriteReactive(String key, Codec codec,
            RedisCommand<T> evalCommandType, String script, List<Object> keys,
            Object... params) {
      Future<R> f = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
      return new NettyFuturePublisher<R>(f);
    }
    

    我们看到这里还是基于异步调用实现的,只是将异步调用返回的 Future 封装在了一个 NettyFuturePublisher 对象中返回,这个对象继承了 Reactive Streams 中的 Stream,所以我的解读也只能到此为止了,Reactive Streams 的相关知识目前我还不具备。

    总结

    • Redisson 提供了 同步、异步 和 Reactive 三种命令执行方式。
    • 同步 和 Reactive 的实现是基于 异步 的实现的。
    • Redisson 使用 Netty 连接 Redis 服务,并依赖 Netty 异步工具类来实现异步通信、重试机制、阻塞等特性。
    • Redisson 使用 Reactive Streams 来实现 Reactive 特性。

    本文同时发布于我的微信订阅号


    无罔无罔

    相关文章

      网友评论

          本文标题:Redisson 中的 CommandExecutor

          本文链接:https://www.haomeiwen.com/subject/zzudjttx.html