获取Long
示例代码
RAtomicLong atomicLong = client.getAtomicLong("longVal");
long val = atomicLong.get();
实现原理
可以看到这里就是执行GET命令,但是会对结果进行转换,转成Long类型
public RFuture<Long> getAsync() {
return commandExecutor.writeAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.GET_LONG, getRawName());
}
RedisStrictCommand<Long> GET_LONG = new RedisStrictCommand<Long>("GET", new LongReplayConvertor());
public class LongReplayConvertor implements Convertor<Long> {
@Override
public Long convert(Object obj) {
if (obj == null) {
return 0L;
}
return Long.valueOf(obj.toString());
}
}
发送命令的主要逻辑在RedisExecutor
的execute()
方法中, 这里的mainPromise就是返回结果的CompletableFuture
public void execute() {
if (mainPromise.isCancelled()) {
free();
return;
}
...
try {
codec = getCodec(codec);
CompletableFuture<RedisConnection> connectionFuture = getConnection();
CompletableFuture<R> attemptPromise = new CompletableFuture<>();
...
...
connectionFuture.whenComplete((connection, e) -> {
...
try {
sendCommand(attemptPromise, connection);
}
scheduleWriteTimeout(attemptPromise);
writeFuture.addListener((ChannelFutureListener) future -> {
checkWriteFuture(writeFuture, attemptPromise, connection);
});
});
attemptPromise.whenComplete((r, e) -> {
releaseConnection(attemptPromise, connectionFuture);
checkAttemptPromise(attemptPromise, connectionFuture);
});
} catch (Exception e) {
free();
handleError(connectionFuture, e);
throw e;
}
}
- 获取entry,这里获取的entry对应的就是Redisson客户端的初始化流程中创建连接时创建的entry
protected void doConnect() {
try {
if (config.isSlaveNotUsed()) {
masterSlaveEntry = new SingleEntry(this, serviceManager.getConnectionWatcher(), config);
} else {
masterSlaveEntry = new MasterSlaveEntry(this, serviceManager.getConnectionWatcher(), config);
}
...
}
- 获取连接,这里的连接其实就是从初始化的时候创建的空闲连接freeConnections中获取,初始化的时候默认初始化了24个读写连接;这里会先从缓存中获取,缓存中存在的话则直接使用这个连接,同时将最大连接计数器减一;如果缓存中没有了,则会开始创建新的连接,需要注意的是新创建的连接不会加入到freeConnections中,只会加入到allConnections里面,同时将最大连接计数器减一;以此达到控制最大连接数的目的。
再解释下:最大连接数为64,空闲连接数为24;初始化完成后,freeConnections和allConnections中均有24个连接,由于连接都没有使用,最大连接计数器仍为64;使用一个连接后,空闲连接为23,最大连接计数器为63;使用24个连接后,空闲连接为0,最大连接计数器为40;此时再获取连接就会开始创建连接,由于最大连接计数器为40,所以还能再创建40个连接;最大连接满了后,再获取连接的请求就会开始排队,等待归还连接时被唤醒出队;归还连接时,都归还到freeConnections中,同时唤醒排队的请求。
protected final CompletableFuture<T> acquireConnection(RedisCommand<?> command, ClientConnectionsEntry entry) {
CompletableFuture<T> result = new CompletableFuture<T>();
CompletableFuture<Void> f = acquireConnection(entry, command);
f.thenAccept(r -> {
T conn = poll(entry, command);
if (conn != null) {
connectedSuccessful(entry, promise, conn);
return;
}
createConnection(entry, promise);
});
result.whenComplete((r, e) -> {
if (e != null) {
f.completeExceptionally(e);
}
});
return result;
}
public RedisConnection pollConnection(RedisCommand<?> command) {
RedisConnection c = freeConnections.poll();
if (c != null) {
c.incUsage();
}
return c;
}
- 发送命令,就是通过连接里的通道写数据了,然后就会经过上一篇文章里的编解码流程拿到响应数据
protected void sendCommand(CompletableFuture<R> attemptPromise, RedisConnection connection) {
...else {
...
writeFuture = connection.send(new CommandData<>(attemptPromise, codec, command, params));
...
}
}
public <T, R> ChannelFuture send(CommandData<T, R> data) {
return channel.writeAndFlush(data);
}
订阅
示例代码
RTopic topic = client.getTopic("myTopic");
topic.addListener(String.class, new MessageListener<String>() {
@Override
public void onMessage(CharSequence channel, String msg) {
log.info("Receive message:{}", msg);
}
});
实现原理
这里首先也是回去entry,对应的仍然是和上面同一个entry
public CompletableFuture<List<PubSubConnectionEntry>> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) {
MasterSlaveEntry entry = getEntry(channelName);
CompletableFuture<PubSubConnectionEntry> f = subscribe(PubSubType.SUBSCRIBE, codec, channelName, entry, null, listeners);
return f.thenApply(res -> Collections.singletonList(res));
}
然后获取锁,这里的锁是针对频道名称,通过设置最大连接数量为1,来确保并发订阅同一个频道的顺序,从示例代码里可以看到,订阅就是注册一个监听器,这里的锁就是确保监听器添加的先后顺序
private CompletableFuture<PubSubConnectionEntry> subscribe(PubSubType type, Codec codec, ChannelName channelName,
MasterSlaveEntry entry, ClientConnectionsEntry clientEntry, RedisPubSubListener<?>... listeners) {
CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>();
AsyncSemaphore lock = getSemaphore(channelName);
int timeout = config.getSubscriptionTimeout();
...
lock.acquire().thenAccept(r -> {
...
subscribeNoTimeout(codec, channelName, entry, clientEntry, promise, type, lock, new AtomicInteger(), listeners);
...
});
return promise;
}
这里的name2PubSubConnection
对应的是一个Map,会先检查Map中是否已经存在当前频道,如果已经存在了,则直接把监听器添加到当前频道即可
PubSubConnectionEntry connEntry = name2PubSubConnection.get(new PubSubKey(channelName, entry));
if (connEntry != null) {
addListeners(channelName, promise, type, lock, connEntry, listeners);
return;
}
如果缓存中没有, 则需要获取一个全局锁,这个全局锁针对的是所有的频道订阅,也是通过设置最大连接数量为1来实现;这里需要控制全局主要是entry2PubSubConnection
中保存着entry和发布订阅连接的映射关系,而entry在单节点里是全局唯一的,避免并发put的时候相互覆盖的问题
PubSubEntry freePubSubConnections = entry2PubSubConnection.getOrDefault(entry, new PubSubEntry());
PubSubConnectionEntry freeEntry = freePubSubConnections.getEntries().peek();
if (freeEntry == null) {
freePubSubLock.release();
connect(codec, channelName, entry, clientEntry, promise, type, lock, attempts, listeners);
return;
}
如果是初次订阅,那么会获取发布订阅连接,发布订阅连接也是初始化的时候就创建好了的,这里直接从缓存中取出连接即可;同样的,缓存里无空闲连接可用时,会创建新的连接
CompletableFuture<RedisPubSubConnection> connFuture = msEntry.nextPubSubConnection(clientEntry);
获取到连接后,会创建PubSubConnectionEntry
对象,它保存着连接和监听器,然后加入到name2PubSubConnection
缓存中,以便相同频道的订阅直接从缓存中获取;在记录缓存name2entry
,保存了频道名称和entry的映射关系;最后看配置里指定的每个连接可以订阅的频道数量,默认每个连接最多只能订阅5个不同频道,如果还能够订阅更多频道,那么会记录缓存entry2PubSubConnection
,把这个PubSubConnectionEntry
对象入队,这样下次订阅不同频道时,就不需要获取连接了,可以直接往PubSubConnectionEntry
对象添加监听器即可
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, connectionManager.getServiceManager());
PubSubKey key = new PubSubKey(channelName, msEntry);
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(key, entry);
Collection<MasterSlaveEntry> coll = name2entry.computeIfAbsent(channelName, k -> Collections.newSetFromMap(new ConcurrentHashMap<>()));
coll.add(msEntry);
int remainFreeAmount = entry.tryAcquire();
if (remainFreeAmount > 0) {
PubSubEntry psEntry = entry2PubSubConnection.computeIfAbsent(msEntry, e -> new PubSubEntry());
psEntry.getEntries().add(entry);
}
添加监听器,到这里,示例代码中的监听器才会被添加到PubSubConnectionEntry
对象中,除了添加我们自定义的监听器之外,还会默认添加一个SubscribeListener
,这个默认添加的监听器是每个频道对应一个;
private CompletableFuture<Void> addListeners(ChannelName channelName, CompletableFuture<PubSubConnectionEntry> promise,
PubSubType type, AsyncSemaphore lock, PubSubConnectionEntry connEntry,
RedisPubSubListener<?>... listeners) {
for (RedisPubSubListener<?> listener : listeners) {
connEntry.addListener(channelName, listener);
}
SubscribeListener list = connEntry.getSubscribeFuture(channelName, type);
CompletableFuture<Void> subscribeFuture = list.getSuccessFuture();
subscribeFuture.whenComplete((res, e) -> {
...
if (!promise.complete(connEntry)) {
...
} else {
lock.release();
}
});
return subscribeFuture;
}
这个默认的监听器的作用是用来正常完成我们的订阅CompletableFuture,在上一篇初始化流程里,我们已经知道了订阅消息是通过onMessage
发送到监听器的,而这个onMessage
方法会调用每个监听器的onStatus
方法,默认的监听器就在onStatus
方法里对CompletableFuture进行完成
@Override
public void onStatus(PubSubType type, CharSequence channel) {
if (name.equals(channel) && this.type.equals(type)) {
promise.complete(null);
}
}
public void onMessage(PubSubStatusMessage message) {
for (RedisPubSubListener<Object> redisPubSubListener : listeners) {
redisPubSubListener.onStatus(message.getType(), message.getChannel());
}
}
最后是发送SUBSCRIBE命令,然后会经过上一篇初始化流程里的编解码,收到订阅的响应消息后,回调onStatus
方法来完成CompletableFuture
public ChannelFuture subscribe(CompletableFuture<Void> promise, Codec codec, ChannelName... channels) {
for (ChannelName ch : channels) {
this.channels.put(ch, codec);
}
return async(promise, new PubSubMessageDecoder(codec.getValueDecoder()), RedisCommands.SUBSCRIBE, channels);
}
RedisCommand<Object> SUBSCRIBE = new RedisCommand<Object>("SUBSCRIBE", new PubSubStatusDecoder());
最后总结下这个流程:
-
订阅是通过注册监听器来实现的
-
初次订阅频道时,需要获取发布订阅连接,连接在初始化的时候已经创建好了;并且每个连接默认最多只能订阅5个不同频道,超过5个的,会创建新的连接
-
每个连接里的订阅对应的
PubSubConnectionEntry
对象是同一个,只是往这个对象中注册不同的监听器 -
发送SUBSCRIBE命令后,SUBSCRIBE命令的响应消息第一个元素是
subscribe
,会使用SUBSCRIBE命令中指定的PubSubStatusDecoder
作为解码器,它生成的对象是PubSubStatusMessage
类型,因此会回调onStatus
方法,来完成我们的订阅CompletableFuture;但是后续收到的订阅消息,第一个元素是message
,会使用PubSubMessageDecoder
作为解码器,它生成的对象是PubSubMessage
类型,因此会回调onMessage
方法,也就是我们注册的监听器的onMessage
方法 -
订阅不同频道时,会重新注册监听器,并重新发送SUBSCRIBE命令
CAS操作
示例代码
RAtomicDouble atomicDouble = client.getAtomicDouble("myDouble");
double val = atomicDouble.get();
boolean success = atomicDouble.compareAndSet(100, 200);
Assertions.assertFalse(success);
实现原理
CAS操作都是通过lua脚本来实现的原子操作,因为redis执行lua脚本是原子操作,进而保证CAS操作的原子性
public RFuture<Boolean> compareAndSetAsync(double expect, double update) {
return commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local value = redis.call('get', KEYS[1]);"
+ "if (value == false and tonumber(ARGV[1]) == 0) or (tonumber(value) == tonumber(ARGV[1])) then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "return 1 "
+ "else "
+ "return 0 end",
Collections.<Object>singletonList(getRawName()), BigDecimal.valueOf(expect).toPlainString(), BigDecimal.valueOf(update).toPlainString());
}
进一步看下lua脚本的执行和普通命令的执行有何区别?
private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType,
String script, List<Object> keys, boolean noRetry, Object... params) {
if (isEvalCacheActive() && evalCommandType.getName().equals("EVAL")) {
CompletableFuture<R> mainPromise = new CompletableFuture<>();
Object[] pps = copy(params);
CompletableFuture<R> promise = new CompletableFuture<>();
String sha1 = getServiceManager().calcSHA(script);
RedisCommand cmd;
if (readOnlyMode && evalShaROSupported.get()) {
cmd = new RedisCommand(evalCommandType, "EVALSHA_RO");
} else {
cmd = new RedisCommand(evalCommandType, "EVALSHA");
}
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(sha1);
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
RedisExecutor<T, R> executor = new RedisExecutor<>(readOnlyMode, nodeSource, codec, cmd,
args.toArray(), promise, false,
connectionManager, objectBuilder, referenceType, noRetry);
executor.execute();
promise.whenComplete((res, e) -> {
if (e != null) {
if (e.getMessage().startsWith("ERR unknown command")) {
evalShaROSupported.set(false);
RFuture<R> future = evalAsync(nodeSource, readOnlyMode, codec, evalCommandType, script, keys, noRetry, pps);
transfer(future.toCompletableFuture(), mainPromise);
} else if (e.getMessage().startsWith("NOSCRIPT")) {
RFuture<String> loadFuture = loadScript(executor.getRedisClient(), script);
loadFuture.whenComplete((r, ex) -> {
if (ex != null) {
free(pps);
mainPromise.completeExceptionally(ex);
return;
}
List<Object> newargs = new ArrayList<Object>(2 + keys.size() + params.length);
newargs.add(sha1);
newargs.add(keys.size());
newargs.addAll(keys);
newargs.addAll(Arrays.asList(pps));
NodeSource ns = nodeSource;
if (ns.getRedisClient() == null) {
ns = new NodeSource(nodeSource, executor.getRedisClient());
}
RFuture<R> future = async(readOnlyMode, ns, codec, cmd, newargs.toArray(), false, noRetry);
transfer(future.toCompletableFuture(), mainPromise);
});
} else {
free(pps);
mainPromise.completeExceptionally(e);
}
return;
}
free(pps);
mainPromise.complete(res);
});
return new CompletableFutureWrapper<>(mainPromise);
}
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
return async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), false, noRetry);
}
这里lua脚本的执行包括两种方式:
- 方式一:不启用脚本缓存,默认是这种方式,这时候就是见到的按照EVAL命令的格式来执行lua脚本即可,所以这里参数的长度指定为
2 + keys.size() + params.length
EVAL script numkeys key [key ...] arg [arg ...]
- 方式二:启用脚本缓存,这时候会创建一个LRU的Map,用来缓存每个lua脚本的md5值,然后先按照EVALSHA 命令的格式发送md5值到redis服务器,如果redis服务器存在这个md5值对应的脚本,则直接执行这个脚本
EVALSHA sha1 numkeys key [key ...] arg [arg ...]
如果redis服务器不存在这个md5值对应的脚本,则会报错NOSCRIPT
,此时在按照SCRIPT LOAD
命令的格式把脚本上传到redis服务器,成功后再次执行EVALSHA 命令即可
SCRIPT LOAD script
Lock分布式锁
示例代码
client.getLock("myLock");
实现原理
获取锁
获取锁也是通过lua脚本来完成的
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
这里我们指定了锁名称为myLock
,这个会作为KEY,这个KEY的类型会设置为HASH类型
实际的锁名称是通过当前线程ID+UUID构成,作为HASH中的键值对,值为当前线程加锁的次数,实现可重入锁
这段lua脚本的逻辑也很简单:
-
如果
myLock
这个key已经存在了,那么说明锁已经被占用了,直接通过PTTL命令返回过期时间,以秒为单位 -
否则直接设置
myLock
的键值对为{ssdsx-aadff-assdrtf-xxcrdfd:3, 1}
,并设置过期时间,如果我们指定了过期时间,则直接设置为这个过期时间,如果我们没有指定过期时间,则默认设置个30s的过期时间,然后通过续租的方式增加过期时间
锁续租
也是通过lua脚本来完成的,逻辑就是只要这个myLock
还存在,那就重新续约30s
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
30s, getLockName(threadId));
}
外层是通过哈希时间轮来完成的,每10s触发一次续租,续租成功就递归续租,续租失败就取消后续的续租
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = getServiceManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
锁释放
也是通过lua脚本来完成的
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local val = redis.call('get', KEYS[3]); " +
"if val ~= false then " +
"return tonumber(val);" +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
"return 1; " +
"end; ",
Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,
getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);
}
这段脚本稍微复杂一点
-
如果
myLock
中当前线程的键值对查询不存在,说明锁不在当前线程,什么操作都不做;这样可以避免锁被其它线程释放 -
对
myLock
中当前线程的重入锁次数减一,如果仍然大于0,说明当前线程多次占用了这个锁,此时还不能直接释放这个锁,因此只是更新了myLock
的过期时间,同时新设置了一个redisson_unlock_latch:{myLock}:some_random_id
这个key值为0
,过期时间指定为:(命令执行超时时间+重试间隔时间)* 重试次数 -
如果
myLock
锁没有重入了,直接删除myLock
这个锁,同时PUBLISH消息0
到频道redisson_lock__channel:{myLock}
,同时更新redisson_unlock_latch:{myLock}:some_random_id
这个key值为1
,过期时间也更新 -
最后无论是删除了锁还是减少锁占用次数,都会删除
redisson_unlock_latch:{myLock}:some_random_id
这个key
protected final RFuture<Boolean> unlockInnerAsync(long threadId) {
String id = getServiceManager().generateId();
MasterSlaveServersConfig config = getServiceManager().getConfig();
int timeout = (config.getTimeout() + config.getRetryInterval()) * config.getRetryAttempts();
timeout = Math.max(timeout, 1);
RFuture<Boolean> r = unlockInnerAsync(threadId, id, timeout);
CompletionStage<Boolean> ff = r.thenApply(v -> {
CommandAsyncExecutor ce = commandExecutor;
ce.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.DEL, getUnlockLatchName(id));
return v;
});
return new CompletableFutureWrapper<>(ff);
}
这个场景梳理下就是:如果这个锁没被当前线程多次获取(重入),那么直接删除这个锁,发布消息0
到频道redisson_lock__channel:{myLock}
,发布消息是为了让后续获取这个锁的其它线程能够成功获取到;如果当前线程多次获取了锁,那么释放的时候只会更新锁占用的次数。
至于redisson_unlock_latch:{myLock}:some_random_id
这个key,看commit信息是为了解决一些比较罕见的异常case,暂不明白
锁排队
当多个线程竞争同一个锁时,只有一个线程能成功获取到锁,其它线程则需要排队
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = commandExecutor.getInterrupted(future);
} else {
entry = commandExecutor.get(future);
}
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(entry, threadId);
}
}
这里锁排队是通过订阅频道redisson_lock__channel:{myLock}
完成的,当锁被释放的时候,会发布消息0
到频道,然后再这里通过监听器回调处理,这里主要就是对Semaphore
变量进行进行释放操作,这样排队获取锁的线程就可以被唤醒
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(0)) {
value.getLatch().release();
}
}
限流器
示例代码
RRateLimiter limiter = redisson.getRateLimiter("myLimiter");
// Initialization required only once.
// 5 permits per 2 seconds
limiter.trySetRate(RateType.OVERALL, 5, 2, RateIntervalUnit.SECONDS);
// acquire 3 permits or block until they became available
limiter.acquire(3);
实现原理
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
byte[] random = getServiceManager().generateIdArray();
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"local rate = redis.call('hget', KEYS[1], 'rate');"
+ "local interval = redis.call('hget', KEYS[1], 'interval');"
+ "local type = redis.call('hget', KEYS[1], 'type');"
+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"
+ "local valueName = KEYS[2];"
+ "local permitsName = KEYS[4];"
+ "if type == '1' then "
+ "valueName = KEYS[3];"
+ "permitsName = KEYS[5];"
+ "end;"
+ "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "
+ "local currentValue = redis.call('get', valueName); "
+ "local res;"
+ "if currentValue ~= false then "
+ "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
+ "local released = 0; "
+ "for i, v in ipairs(expiredValues) do "
+ "local random, permits = struct.unpack('Bc0I', v);"
+ "released = released + permits;"
+ "end; "
+ "if released > 0 then "
+ "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "
+ "if tonumber(currentValue) + released > tonumber(rate) then "
+ "currentValue = tonumber(rate) - redis.call('zcard', permitsName); "
+ "else "
+ "currentValue = tonumber(currentValue) + released; "
+ "end; "
+ "redis.call('set', valueName, currentValue);"
+ "end;"
+ "if tonumber(currentValue) < tonumber(ARGV[1]) then "
+ "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); "
+ "res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));"
+ "else "
+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "res = nil; "
+ "end; "
+ "else "
+ "redis.call('set', valueName, rate); "
+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "res = nil; "
+ "end;"
+ "local ttl = redis.call('pttl', KEYS[1]); "
+ "if ttl > 0 then "
+ "redis.call('pexpire', valueName, ttl); "
+ "redis.call('pexpire', permitsName, ttl); "
+ "end; "
+ "return res;",
Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),
value, System.currentTimeMillis(), random);
}
限流器完全依靠lua脚本的逻辑控制:
-
初始化的时候会创建
myLimiter
这个HASH key,并保存键值对:{rate: 5, interval: 2, type: 0}
,并确保不存在用于控制剩余次数的{myLimiter}:value
这个key不存在(删除之),以及保存历史发放的次数和时间戳{myLimiter}:permits
这个ZSET key不存在(删除之) -
这里的type是用于控制限流是全局的还是只是当前客户端的,如果指定的是当前客户端限流
type=1
,那么上面的这两个key再拼接一个uuid串就可以了 -
首次获取次数的时候,会创建
{myLimiter}:value
这个key,设置值为rate-本次获取的次数
;并且往{myLimiter}:permits
这个ZSET中加入SCORE为本次时间戳,元素为随机字节和本次获取次数 -
后面再次获取次数时,会首先找到
{myLimiter}:permits
这个ZSET中已经过期的元素,时间戳在当前时间戳-interval
之前的就是已经过期了的元素,这部分元素可以回收了,会从ZSET中移除,然后重新计算回收后还剩余的次数;如果本次获取的次数仍低于回收后剩余的次数,那么可以直接分配,即保存ZSET和更新剩余次数即可;如果本次获取的次数超过了回收后剩余的次数,那么就需要等待了,等待的时间为interval-(当前时间戳-最先过期的时间戳(ZSET中的第一个元素的SCORE))
,为了避免出现临界情况,这里还加了一个3,表示比需要等待的时间稍稍多一丢丢就可以了
网友评论