Semaphore 也是 Redis 分布式锁支持的一种,同步组件。
Semaphore 信号量:允许多个线程同时获取一把锁,任何一个线程释放锁后,其他等待的一个线程就可以尝试获取锁。
举个栗子: 凭证有3个,10个线程去抢
@Test
public void test() {
RSemaphore semaphore = redisson.getSemaphore("semaphore");
// 3个凭证:允许3个线程同时持有
semaphore.trySetPermits(3);
// 运行 10 个线程
for (int i = 0; i < 10; ++i) {
new Thread(() -> {
try {
System.out.println(LocalDateTime.now()
+ " : 线程[" + Thread.currentThread().getName()
+ "] 尝试获取 Semaphore 锁");
semaphore.acquire();
System.out.println(LocalDateTime.now()
+ " : 线程[" + Thread.currentThread().getName()
+ "] 成功获取 Semaphore 锁,开始工作");
Thread.sleep(3000);
semaphore.release();
System.out.println(LocalDateTime.now()
+ " : 线程[" + Thread.currentThread().getName()
+ "] 释放 Semaphore 锁");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(60000); // 等60秒
}
输出结果:
2022-06-21T16:59:24.835 : 线程[Thread-9] 尝试获取 Semaphore 锁
2022-06-21T16:59:24.836 : 线程[Thread-7] 尝试获取 Semaphore 锁
2022-06-21T16:59:24.835 : 线程[Thread-10] 尝试获取 Semaphore 锁
2022-06-21T16:59:24.835 : 线程[Thread-6] 尝试获取 Semaphore 锁
2022-06-21T16:59:24.836 : 线程[Thread-12] 尝试获取 Semaphore 锁
2022-06-21T16:59:24.836 : 线程[Thread-8] 尝试获取 Semaphore 锁
2022-06-21T16:59:24.836 : 线程[Thread-11] 尝试获取 Semaphore 锁
2022-06-21T16:59:24.836 : 线程[Thread-5] 尝试获取 Semaphore 锁
2022-06-21T16:59:24.835 : 线程[Thread-3] 尝试获取 Semaphore 锁
2022-06-21T16:59:24.835 : 线程[Thread-4] 尝试获取 Semaphore 锁
2022-06-21T16:59:25.349 : 线程[Thread-9] 成功获取 Semaphore 锁,开始工作
2022-06-21T16:59:25.349 : 线程[Thread-10] 成功获取 Semaphore 锁,开始工作
2022-06-21T16:59:25.350 : 线程[Thread-7] 成功获取 Semaphore 锁,开始工作
2022-06-21T16:59:28.402 : 线程[Thread-9] 释放 Semaphore 锁
2022-06-21T16:59:28.418 : 线程[Thread-7] 释放 Semaphore 锁
2022-06-21T16:59:28.419 : 线程[Thread-10] 释放 Semaphore 锁
2022-06-21T16:59:28.499 : 线程[Thread-3] 成功获取 Semaphore 锁,开始工作
2022-06-21T16:59:28.499 : 线程[Thread-8] 成功获取 Semaphore 锁,开始工作
2022-06-21T16:59:28.499 : 线程[Thread-5] 成功获取 Semaphore 锁,开始工作
2022-06-21T16:59:31.556 : 线程[Thread-8] 释放 Semaphore 锁
2022-06-21T16:59:31.565 : 线程[Thread-5] 释放 Semaphore 锁
2022-06-21T16:59:31.565 : 线程[Thread-3] 释放 Semaphore 锁
2022-06-21T16:59:31.613 : 线程[Thread-6] 成功获取 Semaphore 锁,开始工作
2022-06-21T16:59:31.613 : 线程[Thread-11] 成功获取 Semaphore 锁,开始工作
2022-06-21T16:59:31.613 : 线程[Thread-12] 成功获取 Semaphore 锁,开始工作
2022-06-21T16:59:34.671 : 线程[Thread-12] 释放 Semaphore 锁
2022-06-21T16:59:34.673 : 线程[Thread-6] 释放 Semaphore 锁
2022-06-21T16:59:35.138 : 线程[Thread-11] 释放 Semaphore 锁
2022-06-21T16:59:35.142 : 线程[Thread-4] 成功获取 Semaphore 锁,开始工作
2022-06-21T16:59:38.403 : 线程[Thread-4] 释放 Semaphore 锁
源码分析:
@Test
public void test() throws InterruptedException {
RSemaphore semaphore = redisson.getSemaphore("semaphore");
// 3个凭证:允许3个线程同时持有
semaphore.trySetPermits(3);
semaphore.acquire();
semaphore.release();
}
1. 设置凭证源码定位: RedissonSemaphore#trySetPermits
对应参数如下:
- KEYS[1]:锁的名称 "semaphore"
- KEYS[2]:通道名称,redisson_sc:{锁名}
- ARGV[1]:凭证数量,需要输入的
local value = redis.call('get', KEYS[1]); -- 获取锁
if (value == false or value == 0) then -- 锁不存在
redis.call('set', KEYS[1], ARGV[1]); -- 设置 锁 和 凭证数量
redis.call('publish', KEYS[2], ARGV[1]); -- 发布通知订阅
return 1; -- 设置成功
end;
return 0; -- 设置失败
2. 获取锁源码定位:RedissonSemaphore#acquire
@Override
public void acquire(int permits) throws InterruptedException {
// 尝试获取凭证,默认 permits = 1
if (tryAcquire(permits)) {
return;
}
// 订阅
RFuture<RedissonLockEntry> future = subscribe();
commandExecutor.syncSubscription(future);
try {
// 无限循环:尝试获取凭证
while (true) {
if (tryAcquire(permits)) {
return;
}
// 等待一段时间
getEntry().getLatch().acquire(permits);
}
} finally {
// 取消订阅
unsubscribe(future);
}
}
RedissonSemaphore#tryAcquire() 中的 lua 脚本:
local value = redis.call('get', KEYS[1]); -- 获取锁中的凭证数量
-- 锁存在 且 当前凭证数量 >= 需要的凭证数量
if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then
local val = redis.call('decrby', KEYS[1], ARGV[1]); -- 自减凭证数量
return 1; -- 获取锁成功
end;
return 0; -- 获取锁失败
3. 释放锁源码定位:RedissonSemaphore#releaseAsync
local value = redis.call('incrby', KEYS[1], ARGV[1]); -- 凭证数 +1
redis.call('publish', KEYS[2], value); -- 发布通知
作者:卷几你哇
链接:https://juejin.cn/post/7112617659916615711
来源:稀土掘金
网友评论