美文网首页
Redisson中常用工具Lock/RateLimiter的实现

Redisson中常用工具Lock/RateLimiter的实现

作者: Teddy_b | 来源:发表于2024-01-01 15:07 被阅读0次

获取Long

示例代码

RAtomicLong atomicLong = client.getAtomicLong("longVal");
long val = atomicLong.get();

实现原理

可以看到这里就是执行GET命令,但是会对结果进行转换,转成Long类型

public RFuture<Long> getAsync() {
        return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.GET_LONG, getRawName());
    }

RedisStrictCommand<Long> GET_LONG = new RedisStrictCommand<Long>("GET", new LongReplayConvertor());

public class LongReplayConvertor implements Convertor<Long> {

    @Override
    public Long convert(Object obj) {
        if (obj == null) {
            return 0L;
        }
        return Long.valueOf(obj.toString());
    }
}

发送命令的主要逻辑在RedisExecutorexecute()方法中, 这里的mainPromise就是返回结果的CompletableFuture

public void execute() {
        if (mainPromise.isCancelled()) {
            free();
            return;
        }

        ...

        try {
            codec = getCodec(codec);

            CompletableFuture<RedisConnection> connectionFuture = getConnection();

            CompletableFuture<R> attemptPromise = new CompletableFuture<>();
            ...

           ...
            connectionFuture.whenComplete((connection, e) -> {
               ...
                try {
                    sendCommand(attemptPromise, connection);
                }

                scheduleWriteTimeout(attemptPromise);

                writeFuture.addListener((ChannelFutureListener) future -> {
                    checkWriteFuture(writeFuture, attemptPromise, connection);
                });
            });

            attemptPromise.whenComplete((r, e) -> {
                releaseConnection(attemptPromise, connectionFuture);

                checkAttemptPromise(attemptPromise, connectionFuture);
            });
        } catch (Exception e) {
            free();
            handleError(connectionFuture, e);
            throw e;
        }
    }
protected void doConnect() {
        try {
            if (config.isSlaveNotUsed()) {
                masterSlaveEntry = new SingleEntry(this, serviceManager.getConnectionWatcher(), config);
            } else {
                masterSlaveEntry = new MasterSlaveEntry(this, serviceManager.getConnectionWatcher(), config);
            }
            ...
    }
  • 获取连接,这里的连接其实就是从初始化的时候创建的空闲连接freeConnections中获取,初始化的时候默认初始化了24个读写连接;这里会先从缓存中获取,缓存中存在的话则直接使用这个连接,同时将最大连接计数器减一;如果缓存中没有了,则会开始创建新的连接,需要注意的是新创建的连接不会加入到freeConnections中,只会加入到allConnections里面,同时将最大连接计数器减一;以此达到控制最大连接数的目的。

再解释下:最大连接数为64,空闲连接数为24;初始化完成后,freeConnections和allConnections中均有24个连接,由于连接都没有使用,最大连接计数器仍为64;使用一个连接后,空闲连接为23,最大连接计数器为63;使用24个连接后,空闲连接为0,最大连接计数器为40;此时再获取连接就会开始创建连接,由于最大连接计数器为40,所以还能再创建40个连接;最大连接满了后,再获取连接的请求就会开始排队,等待归还连接时被唤醒出队;归还连接时,都归还到freeConnections中,同时唤醒排队的请求。

protected final CompletableFuture<T> acquireConnection(RedisCommand<?> command, ClientConnectionsEntry entry) {
        CompletableFuture<T> result = new CompletableFuture<T>();

        CompletableFuture<Void> f = acquireConnection(entry, command);
        f.thenAccept(r -> {
            T conn = poll(entry, command);
        if (conn != null) {
            connectedSuccessful(entry, promise, conn);
            return;
        }

        createConnection(entry, promise);
        });
        result.whenComplete((r, e) -> {
            if (e != null) {
                f.completeExceptionally(e);
            }
        });
        return result;
    }

public RedisConnection pollConnection(RedisCommand<?> command) {
        RedisConnection c = freeConnections.poll();
        if (c != null) {
            c.incUsage();
        }
        return c;
    }
  • 发送命令,就是通过连接里的通道写数据了,然后就会经过上一篇文章里的编解码流程拿到响应数据
protected void sendCommand(CompletableFuture<R> attemptPromise, RedisConnection connection) {
        ...else {
           ...
            writeFuture = connection.send(new CommandData<>(attemptPromise, codec, command, params));

           ...
        }
    }

public <T, R> ChannelFuture send(CommandData<T, R> data) {
        return channel.writeAndFlush(data);
    }

订阅

示例代码

RTopic topic = client.getTopic("myTopic");

        topic.addListener(String.class, new MessageListener<String>() {
            @Override
            public void onMessage(CharSequence channel, String msg) {
                log.info("Receive message:{}", msg);
            }
        });

实现原理

这里首先也是回去entry,对应的仍然是和上面同一个entry

public CompletableFuture<List<PubSubConnectionEntry>> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) {
        
        MasterSlaveEntry entry = getEntry(channelName);
       
        CompletableFuture<PubSubConnectionEntry> f = subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, null, listeners);
        return f.thenApply(res -> Collections.singletonList(res));
    }

然后获取锁,这里的锁是针对频道名称,通过设置最大连接数量为1,来确保并发订阅同一个频道的顺序,从示例代码里可以看到,订阅就是注册一个监听器,这里的锁就是确保监听器添加的先后顺序

private CompletableFuture<PubSubConnectionEntry> subscribe(PubSubType type, Codec codec, ChannelName channelName,
                                                               MasterSlaveEntry entry, ClientConnectionsEntry clientEntry, RedisPubSubListener<?>... listeners) {
        CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>();
        AsyncSemaphore lock = getSemaphore(channelName);
        int timeout = config.getSubscriptionTimeout();
        ...
        lock.acquire().thenAccept(r -> {
           ...
            subscribeNoTimeout(codec, channelName, entry, clientEntry, promise, type, lock, new AtomicInteger(), listeners);
           ...
        });
        return promise;
    }

这里的name2PubSubConnection对应的是一个Map,会先检查Map中是否已经存在当前频道,如果已经存在了,则直接把监听器添加到当前频道即可

PubSubConnectionEntry connEntry = name2PubSubConnection.get(new PubSubKey(channelName, entry));
        if (connEntry != null) {
            addListeners(channelName, promise, type, lock, connEntry, listeners);
            return;
        }

如果缓存中没有, 则需要获取一个全局锁,这个全局锁针对的是所有的频道订阅,也是通过设置最大连接数量为1来实现;这里需要控制全局主要是entry2PubSubConnection中保存着entry和发布订阅连接的映射关系,而entry在单节点里是全局唯一的,避免并发put的时候相互覆盖的问题

PubSubEntry freePubSubConnections = entry2PubSubConnection.getOrDefault(entry, new PubSubEntry());

            PubSubConnectionEntry freeEntry = freePubSubConnections.getEntries().peek();
            if (freeEntry == null) {
                freePubSubLock.release();
                connect(codec, channelName, entry, clientEntry, promise, type, lock, attempts, listeners);
                return;
            }

如果是初次订阅,那么会获取发布订阅连接,发布订阅连接也是初始化的时候就创建好了的,这里直接从缓存中取出连接即可;同样的,缓存里无空闲连接可用时,会创建新的连接

CompletableFuture<RedisPubSubConnection> connFuture = msEntry.nextPubSubConnection(clientEntry);

获取到连接后,会创建PubSubConnectionEntry对象,它保存着连接和监听器,然后加入到name2PubSubConnection缓存中,以便相同频道的订阅直接从缓存中获取;在记录缓存name2entry,保存了频道名称和entry的映射关系;最后看配置里指定的每个连接可以订阅的频道数量,默认每个连接最多只能订阅5个不同频道,如果还能够订阅更多频道,那么会记录缓存entry2PubSubConnection,把这个PubSubConnectionEntry对象入队,这样下次订阅不同频道时,就不需要获取连接了,可以直接往PubSubConnectionEntry对象添加监听器即可

PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, connectionManager.getServiceManager());

PubSubKey key = new PubSubKey(channelName, msEntry);
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(key, entry);

Collection<MasterSlaveEntry> coll = name2entry.computeIfAbsent(channelName, k -> Collections.newSetFromMap(new ConcurrentHashMap<>()));
coll.add(msEntry);

int remainFreeAmount = entry.tryAcquire();
if (remainFreeAmount > 0) {
        PubSubEntry psEntry = entry2PubSubConnection.computeIfAbsent(msEntry, e -> new PubSubEntry());
        psEntry.getEntries().add(entry);
}

添加监听器,到这里,示例代码中的监听器才会被添加到PubSubConnectionEntry对象中,除了添加我们自定义的监听器之外,还会默认添加一个SubscribeListener,这个默认添加的监听器是每个频道对应一个;

private CompletableFuture<Void> addListeners(ChannelName channelName, CompletableFuture<PubSubConnectionEntry> promise,
            PubSubType type, AsyncSemaphore lock, PubSubConnectionEntry connEntry,
            RedisPubSubListener<?>... listeners) {
        for (RedisPubSubListener<?> listener : listeners) {
            connEntry.addListener(channelName, listener);
        }
        SubscribeListener list = connEntry.getSubscribeFuture(channelName, type);
        CompletableFuture<Void> subscribeFuture = list.getSuccessFuture();

        subscribeFuture.whenComplete((res, e) -> {
            ...

            if (!promise.complete(connEntry)) {
                ...
            } else {
                lock.release();
            }
        });
        return subscribeFuture;
    }

这个默认的监听器的作用是用来正常完成我们的订阅CompletableFuture,在上一篇初始化流程里,我们已经知道了订阅消息是通过onMessage发送到监听器的,而这个onMessage方法会调用每个监听器的onStatus方法,默认的监听器就在onStatus方法里对CompletableFuture进行完成

@Override
    public void onStatus(PubSubType type, CharSequence channel) {
        if (name.equals(channel) && this.type.equals(type)) {
            promise.complete(null);
        }
    }

public void onMessage(PubSubStatusMessage message) {
        for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
            redisPubSubListener.onStatus(message.getType(), message.getChannel());
        }
    }

最后是发送SUBSCRIBE命令,然后会经过上一篇初始化流程里的编解码,收到订阅的响应消息后,回调onStatus方法来完成CompletableFuture

public ChannelFuture subscribe(CompletableFuture<Void> promise, Codec codec, ChannelName... channels) {
        for (ChannelName ch : channels) {
            this.channels.put(ch, codec);
        }
        return async(promise, new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channels);
    }

RedisCommand<Object> SUBSCRIBE = new RedisCommand<Object>("SUBSCRIBE", new PubSubStatusDecoder());

最后总结下这个流程:

  • 订阅是通过注册监听器来实现的

  • 初次订阅频道时,需要获取发布订阅连接,连接在初始化的时候已经创建好了;并且每个连接默认最多只能订阅5个不同频道,超过5个的,会创建新的连接

  • 每个连接里的订阅对应的PubSubConnectionEntry对象是同一个,只是往这个对象中注册不同的监听器

  • 发送SUBSCRIBE命令后,SUBSCRIBE命令的响应消息第一个元素是subscribe,会使用SUBSCRIBE命令中指定的PubSubStatusDecoder作为解码器,它生成的对象是PubSubStatusMessage类型,因此会回调onStatus方法,来完成我们的订阅CompletableFuture;但是后续收到的订阅消息,第一个元素是message,会使用PubSubMessageDecoder作为解码器,它生成的对象是PubSubMessage类型,因此会回调onMessage方法,也就是我们注册的监听器的onMessage方法

  • 订阅不同频道时,会重新注册监听器,并重新发送SUBSCRIBE命令

CAS操作

示例代码

RAtomicDouble atomicDouble = client.getAtomicDouble("myDouble");
double val = atomicDouble.get();
boolean success = atomicDouble.compareAndSet(100, 200);
Assertions.assertFalse(success);

实现原理

CAS操作都是通过lua脚本来实现的原子操作,因为redis执行lua脚本是原子操作,进而保证CAS操作的原子性

public RFuture<Boolean> compareAndSetAsync(double expect, double update) {
        return commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                  "local value = redis.call('get', KEYS[1]);"
                + "if (value == false and tonumber(ARGV[1]) == 0) or (tonumber(value) == tonumber(ARGV[1])) then "
                     + "redis.call('set', KEYS[1], ARGV[2]); "
                     + "return 1 "
                   + "else "
                     + "return 0 end",
                Collections.<Object>singletonList(getRawName()), BigDecimal.valueOf(expect).toPlainString(), BigDecimal.valueOf(update).toPlainString());
    }

进一步看下lua脚本的执行和普通命令的执行有何区别?

private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType,
                                        String script, List<Object> keys, boolean noRetry, Object... params) {
        if (isEvalCacheActive() && evalCommandType.getName().equals("EVAL")) {
            CompletableFuture<R> mainPromise = new CompletableFuture<>();
            
            Object[] pps = copy(params);

            CompletableFuture<R> promise = new CompletableFuture<>();
            String sha1 = getServiceManager().calcSHA(script);
            RedisCommand cmd;
            if (readOnlyMode && evalShaROSupported.get()) {
                cmd = new RedisCommand(evalCommandType, "EVALSHA_RO");
            } else {
                cmd = new RedisCommand(evalCommandType, "EVALSHA");
            }
            List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
            args.add(sha1);
            args.add(keys.size());
            args.addAll(keys);
            args.addAll(Arrays.asList(params));

            RedisExecutor<T, R> executor = new RedisExecutor<>(readOnlyMode, nodeSource, codec, cmd,
                                                        args.toArray(), promise, false,
                                                        connectionManager, objectBuilder, referenceType, noRetry);
            executor.execute();

            promise.whenComplete((res, e) -> {
                if (e != null) {
                    if (e.getMessage().startsWith("ERR unknown command")) {
                        evalShaROSupported.set(false);
                        RFuture<R> future = evalAsync(nodeSource, readOnlyMode, codec, evalCommandType, script, keys, noRetry, pps);
                        transfer(future.toCompletableFuture(), mainPromise);
                    } else if (e.getMessage().startsWith("NOSCRIPT")) {
                        RFuture<String> loadFuture = loadScript(executor.getRedisClient(), script);
                        loadFuture.whenComplete((r, ex) -> {
                            if (ex != null) {
                                free(pps);
                                mainPromise.completeExceptionally(ex);
                                return;
                            }

                            List<Object> newargs = new ArrayList<Object>(2 + keys.size() + params.length);
                            newargs.add(sha1);
                            newargs.add(keys.size());
                            newargs.addAll(keys);
                            newargs.addAll(Arrays.asList(pps));

                            NodeSource ns = nodeSource;
                            if (ns.getRedisClient() == null) {
                                ns = new NodeSource(nodeSource, executor.getRedisClient());
                            }

                            RFuture<R> future = async(readOnlyMode, ns, codec, cmd, newargs.toArray(), false, noRetry);
                            transfer(future.toCompletableFuture(), mainPromise);
                        });
                    } else {
                        free(pps);
                        mainPromise.completeExceptionally(e);
                    }
                    return;
                }
                free(pps);
                mainPromise.complete(res);
            });
            return new CompletableFutureWrapper<>(mainPromise);
        }
        
        List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
        args.add(script);
        args.add(keys.size());
        args.addAll(keys);
        args.addAll(Arrays.asList(params));
        return async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), false, noRetry);
    }

这里lua脚本的执行包括两种方式:

  • 方式一:不启用脚本缓存,默认是这种方式,这时候就是见到的按照EVAL命令的格式来执行lua脚本即可,所以这里参数的长度指定为2 + keys.size() + params.length

EVAL script numkeys key [key ...] arg [arg ...]

  • 方式二:启用脚本缓存,这时候会创建一个LRU的Map,用来缓存每个lua脚本的md5值,然后先按照EVALSHA 命令的格式发送md5值到redis服务器,如果redis服务器存在这个md5值对应的脚本,则直接执行这个脚本

EVALSHA sha1 numkeys key [key ...] arg [arg ...]

如果redis服务器不存在这个md5值对应的脚本,则会报错NOSCRIPT,此时在按照SCRIPT LOAD命令的格式把脚本上传到redis服务器,成功后再次执行EVALSHA 命令即可

SCRIPT LOAD script

Lock分布式锁

示例代码

client.getLock("myLock");

实现原理

获取锁

获取锁也是通过lua脚本来完成的

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if ((redis.call('exists', KEYS[1]) == 0) " +
                            "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

这里我们指定了锁名称为myLock,这个会作为KEY,这个KEY的类型会设置为HASH类型

实际的锁名称是通过当前线程ID+UUID构成,作为HASH中的键值对,值为当前线程加锁的次数,实现可重入锁

这段lua脚本的逻辑也很简单:

  • 如果myLock这个key已经存在了,那么说明锁已经被占用了,直接通过PTTL命令返回过期时间,以秒为单位

  • 否则直接设置myLock的键值对为{ssdsx-aadff-assdrtf-xxcrdfd:3, 1},并设置过期时间,如果我们指定了过期时间,则直接设置为这个过期时间,如果我们没有指定过期时间,则默认设置个30s的过期时间,然后通过续租的方式增加过期时间

锁续租

也是通过lua脚本来完成的,逻辑就是只要这个myLock还存在,那就重新续约30s

protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                30s, getLockName(threadId));
    }

外层是通过哈希时间轮来完成的,每10s触发一次续租,续租成功就递归续租,续租失败就取消后续的续租

private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = getServiceManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                CompletionStage<Boolean> future = renewExpirationAsync(threadId);
                future.whenComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock {} expiration", getRawName(), e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    } else {
                        cancelExpirationRenewal(null);
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

锁释放

也是通过lua脚本来完成的

protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                              "local val = redis.call('get', KEYS[3]); " +
                                    "if val ~= false then " +
                                        "return tonumber(val);" +
                                    "end; " +

                                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                                        "return nil;" +
                                    "end; " +
                                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                                    "if (counter > 0) then " +
                                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                                        "redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
                                        "return 0; " +
                                    "else " +
                                        "redis.call('del', KEYS[1]); " +
                                        "redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
                                        "redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
                                        "return 1; " +
                                    "end; ",
                                Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),
                                LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,
                                getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);
    }

这段脚本稍微复杂一点

  • 如果myLock中当前线程的键值对查询不存在,说明锁不在当前线程,什么操作都不做;这样可以避免锁被其它线程释放

  • myLock中当前线程的重入锁次数减一,如果仍然大于0,说明当前线程多次占用了这个锁,此时还不能直接释放这个锁,因此只是更新了myLock的过期时间,同时新设置了一个redisson_unlock_latch:{myLock}:some_random_id这个key值为0,过期时间指定为:(命令执行超时时间+重试间隔时间)* 重试次数

  • 如果myLock锁没有重入了,直接删除myLock这个锁,同时PUBLISH消息0到频道redisson_lock__channel:{myLock},同时更新redisson_unlock_latch:{myLock}:some_random_id这个key值为1,过期时间也更新

  • 最后无论是删除了锁还是减少锁占用次数,都会删除redisson_unlock_latch:{myLock}:some_random_id这个key

protected final RFuture<Boolean> unlockInnerAsync(long threadId) {
        String id = getServiceManager().generateId();
        MasterSlaveServersConfig config = getServiceManager().getConfig();
        int timeout = (config.getTimeout() + config.getRetryInterval()) * config.getRetryAttempts();
        timeout = Math.max(timeout, 1);
        RFuture<Boolean> r = unlockInnerAsync(threadId, id, timeout);
        CompletionStage<Boolean> ff = r.thenApply(v -> {
            CommandAsyncExecutor ce = commandExecutor;
           
            ce.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.DEL, getUnlockLatchName(id));
           
            return v;
        });
        return new CompletableFutureWrapper<>(ff);
    }

这个场景梳理下就是:如果这个锁没被当前线程多次获取(重入),那么直接删除这个锁,发布消息0到频道redisson_lock__channel:{myLock},发布消息是为了让后续获取这个锁的其它线程能够成功获取到;如果当前线程多次获取了锁,那么释放的时候只会更新锁占用的次数。

至于redisson_unlock_latch:{myLock}:some_random_id这个key,看commit信息是为了解决一些比较罕见的异常case,暂不明白

锁排队

当多个线程竞争同一个锁时,只有一个线程能成功获取到锁,其它线程则需要排队

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
        pubSub.timeout(future);
        RedissonLockEntry entry;
        if (interruptibly) {
            entry = commandExecutor.getInterrupted(future);
        } else {
            entry = commandExecutor.get(future);
        }

        try {
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    try {
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        entry.getLatch().acquire();
                    } else {
                        entry.getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(entry, threadId);
        }
    }

这里锁排队是通过订阅频道redisson_lock__channel:{myLock}完成的,当锁被释放的时候,会发布消息0到频道,然后再这里通过监听器回调处理,这里主要就是对Semaphore 变量进行进行释放操作,这样排队获取锁的线程就可以被唤醒

@Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(0)) {
            value.getLatch().release();
        }
    }

限流器

示例代码

RRateLimiter limiter = redisson.getRateLimiter("myLimiter");
// Initialization required only once.
// 5 permits per 2 seconds
limiter.trySetRate(RateType.OVERALL, 5, 2, RateIntervalUnit.SECONDS);

// acquire 3 permits or block until they became available       
limiter.acquire(3);

实现原理

private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
        byte[] random = getServiceManager().generateIdArray();

        return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "local rate = redis.call('hget', KEYS[1], 'rate');"
              + "local interval = redis.call('hget', KEYS[1], 'interval');"
              + "local type = redis.call('hget', KEYS[1], 'type');"
              + "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
              
              + "local valueName = KEYS[2];"
              + "local permitsName = KEYS[4];"
              + "if type == '1' then "
                  + "valueName = KEYS[3];"
                  + "permitsName = KEYS[5];"
              + "end;"

              + "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "

              + "local currentValue = redis.call('get', valueName); "
              + "local res;"
              + "if currentValue ~= false then "
                     + "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
                     + "local released = 0; "
                     + "for i, v in ipairs(expiredValues) do "
                          + "local random, permits = struct.unpack('Bc0I', v);"
                          + "released = released + permits;"
                     + "end; "

                     + "if released > 0 then "
                          + "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
                          + "if tonumber(currentValue) + released > tonumber(rate) then "
                               + "currentValue = tonumber(rate) - redis.call('zcard', permitsName); "
                          + "else "
                               + "currentValue = tonumber(currentValue) + released; "
                          + "end; "
                          + "redis.call('set', valueName, currentValue);"
                     + "end;"

                     + "if tonumber(currentValue) < tonumber(ARGV[1]) then "
                         + "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); "
                         + "res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));"
                     + "else "
                         + "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
                         + "redis.call('decrby', valueName, ARGV[1]); "
                         + "res = nil; "
                     + "end; "
              + "else "
                     + "redis.call('set', valueName, rate); "
                     + "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
                     + "redis.call('decrby', valueName, ARGV[1]); "
                     + "res = nil; "
              + "end;"

              + "local ttl = redis.call('pttl', KEYS[1]); "
              + "if ttl > 0 then "
                  + "redis.call('pexpire', valueName, ttl); "
                  + "redis.call('pexpire', permitsName, ttl); "
              + "end; "
              + "return res;",
                Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),
                value, System.currentTimeMillis(), random);
    }

限流器完全依靠lua脚本的逻辑控制:

  • 初始化的时候会创建myLimiter这个HASH key,并保存键值对:{rate: 5, interval: 2, type: 0},并确保不存在用于控制剩余次数的{myLimiter}:value这个key不存在(删除之),以及保存历史发放的次数和时间戳{myLimiter}:permits这个ZSET key不存在(删除之)

  • 这里的type是用于控制限流是全局的还是只是当前客户端的,如果指定的是当前客户端限流type=1,那么上面的这两个key再拼接一个uuid串就可以了

  • 首次获取次数的时候,会创建{myLimiter}:value这个key,设置值为rate-本次获取的次数;并且往{myLimiter}:permits这个ZSET中加入SCORE为本次时间戳,元素为随机字节和本次获取次数

  • 后面再次获取次数时,会首先找到{myLimiter}:permits这个ZSET中已经过期的元素,时间戳在当前时间戳-interval之前的就是已经过期了的元素,这部分元素可以回收了,会从ZSET中移除,然后重新计算回收后还剩余的次数;如果本次获取的次数仍低于回收后剩余的次数,那么可以直接分配,即保存ZSET和更新剩余次数即可;如果本次获取的次数超过了回收后剩余的次数,那么就需要等待了,等待的时间为 interval-(当前时间戳-最先过期的时间戳(ZSET中的第一个元素的SCORE)),为了避免出现临界情况,这里还加了一个3,表示比需要等待的时间稍稍多一丢丢就可以了

相关文章

网友评论

      本文标题:Redisson中常用工具Lock/RateLimiter的实现

      本文链接:https://www.haomeiwen.com/subject/xkelndtx.html