1. 分布式应用并发问题
分布式应用进行逻辑处理时经常会遇到并发问题,比如下面这个例子。
在Redis里对"account"这个key进行操作,两个客户端同时要修改account,首先要读取account的值,然后对其进行修改,修改完再存回去,那么就会出现并发问题,因为读取和保存状态这两个操作不是原子的。
image
2. Redis实现分布式锁
2.1 第一个示例
2.1.1 创建工程
首先创建一个空工程RedisStart,pom文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>RedisStart</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>redis-lock</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
</parent>
<profiles>
<profile>
<id>jdk-1.8</id>
<activation>
<activeByDefault>true</activeByDefault>
<jdk>1.8</jdk>
</activation>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
</properties>
</profile>
</profiles>
</project>
2.1.2 创建子工程
创建子工程redis-lock,工程结构如下
image
pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>RedisStart</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>redis-lock</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
配置文件:
server:
port: 8090
spring:
redis:
host: 127.0.0.1
port: 6379
2.1.3 启动类
package com.redisson;
import org.redisson.Redisson;
import org.redisson.config.Config;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public Redisson redisson(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
}
2.1.4 关键类
首先先在Redis中执行set account 50这条指令,加入数据。然后请看下面代码:
package com.redisson;
import org.redisson.Redisson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class IndexController {
@Autowired
private Redisson redisson;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("/deduct account")
public String deductAccount() throws InterruptedException {
int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
if (account > 0){
int realAccount = account - 1;
stringRedisTemplate.opsForValue().set("account",realAccount + "");
System.out.println("扣减成功,剩余库存:" + realAccount + "");
}else {
System.out.println("扣减失败,库存不足");
}
return "end";
}
}
上面代码重现了开头提出的问题,如果有多线程知识储备的话,可以很容易想到,加入synchronized关键字即可解决。
synchronized(this){
int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
if (account > 0){
int realAccount = account - 1;
stringRedisTemplate.opsForValue().set("account",realAccount + "");
System.out.println("扣减成功,剩余库存:" + realAccount + "");
}else {
System.out.println("扣减失败,库存不足");
}
return "end";
}
}
2.1.5 分布式锁简单实现setnx
如果上边的服务部署在单机模式下,并且程序没有其他异常的话,可以解决原子性问题,但是现在的服务都是在集群部署下的,如果有多台服务的话,上面代码无法解决这个问题。
如果看过前面Redis应用之常用数据类型文章的话,可以知道Redis提供了setnx指令实现分布式锁,可以用以下代码实现。
String lockKey = "lockKey";
//获取锁信息
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "lock");
//锁还没有被释放,直接返回
if (!result){
return "error";
}
int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
if (account > 0){
int realAccount = account - 1;
stringRedisTemplate.opsForValue().set("account",realAccount + "");
System.out.println("扣减成功,剩余库存:" + realAccount + "");
}else {
System.out.println("扣减失败,库存不足");
}
//业务代码执行完,释放锁
stringRedisTemplate.delete(lockKey);
return "end";
这样即使是集群部署下的服务,也可以实现分布式锁功能了,但是如果在执行释放锁那行代码之前,有异常的话,会导致该锁一直被占用,无法释放。
2.1.6 解决程序异常导致的无法释放锁
加入finally逻辑,可以保证即使程序有异常的话,也可以释放锁。
String lockKey = "lockKey";
try {
//获取锁信息
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "lock");
//锁还没有被释放,直接返回
if (!result){
return "error";
}
int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
if (account > 0){
int realAccount = account - 1;
stringRedisTemplate.opsForValue().set("account",realAccount + "");
System.out.println("扣减成功,剩余库存:" + realAccount + "");
}else {
System.out.println("扣减失败,库存不足");
}
}finally {
//业务代码执行完,释放锁
stringRedisTemplate.delete(lockKey);
}
return "end";
上面代码解决了,如果程序有异常的话,无法释放锁的问题,但是如果在代码执行的一半的时候,服务挂了,或者被Kill掉的话,就不会执行finally里边的代码,所以依然无法释放锁。
2.1.7 setnx命令的原子操作
设置锁的时候,直接对该锁加入超时时间,可以解决由于上边的问题。
String lockKey = "lockKey";
try {
//设置锁超时时间
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "lock",10, TimeUnit.MILLISECONDS);
//锁还没有被释放,直接返回
if (!result){
return "error";
}
int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
if (account > 0){
int realAccount = account - 1;
stringRedisTemplate.opsForValue().set("account",realAccount + "");
System.out.println("扣减成功,剩余库存:" + realAccount + "");
}else {
System.out.println("扣减失败,库存不足");
}
}finally {
//业务代码执行完,释放锁
stringRedisTemplate.delete(lockKey);
}
return "end";
如果并发不高的情况下,上面代码可以实现一个良好的分布式锁,但是如果在高并发情况下,会导致锁一直失效。举个例子:如果锁的失效时间是1秒,A线程成功获得了该锁,但是由于并发比较高,导致程序在1秒钟没有执行完,就不会执行删除锁的代码,但是这个时候由于超过了1秒,所以A线程获取的锁已经失效了, 其他线程可以获得该锁,假设B线程获取到了这把锁,开始执行后边的逻辑,这个时候A线程执行了finally中的逻辑,把B线程获取到的锁删除了,那么其他线程又可以获取到这把锁,而过一段时间该锁又失效了,而B线程就会删除C线程过去到的锁,以此类推,这个锁就会一直失效。
2.1.8 锁信息与线程信息绑定
为每一个线程设置线程ID,并将该ID作为锁Key信息的value值,在删除锁的时候,进行判断,如果是本线程自己加的锁,可以删除,如果不是,不能删除。
String lockKey = "lockKey";
String clientId = UUID.randomUUID().toString();
try {
//设置锁超时时间
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId,30, TimeUnit.MILLISECONDS);
//锁还没有被释放,直接返回
if (!result){
return "error";
}
int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
if (account > 0){
int realAccount = account - 1;
stringRedisTemplate.opsForValue().set("account",realAccount + "");
System.out.println("扣减成功,剩余库存:" + realAccount + "");
}else {
System.out.println("扣减失败,库存不足");
}
}finally {
//判断该锁是不是当前线程加的
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){
//业务代码执行完,释放锁
stringRedisTemplate.delete(lockKey);
}
}
return "end";
至此,已经实现了一把比较完善的分布式锁,但是还是会有一些问题,比如业务代码执行时间大于锁的过期时间,如何对该锁进行续期?可以简单的想到,在主线程启动的时候,启动另外一个线程,监控这把锁的状态,启动一个定时任务,定期监控该锁,如果过一段时间,锁还没释放,就把锁的失效时间重新置位启始值。但这事儿说起来容易,实际写的时候,稍不注意,就会引起很多BUG。
2.2 用Redisson实现一个分布式锁
好在,贴心的Redisson已经帮我们实现了功能完善的分布式锁。请看下面代码:
String lockKey = "lockKey";
RLock redissonLock = redisson.getLock(lockKey);
try {
//加锁,实现锁续命
redissonLock.lock();
int account = Integer.parseInt(stringRedisTemplate.opsForValue().get("account"));
if (account > 0){
int realAccount = account - 1;
stringRedisTemplate.opsForValue().set("account",realAccount + "");
System.out.println("扣减成功,剩余库存:" + realAccount + "");
}else {
System.out.println("扣减失败,库存不足");
}
}finally {
//释放锁
redissonLock.unlock();
}
可以看到加锁逻辑十分简洁。
2.2.1 Redisson加锁逻辑
下边用一张简单的图来说明下:
image
2.2.2 源码阅读
由于整个Redisson的代码实在太多,所以这里只看主要逻辑。
2.2.2.1 初始化逻辑
先看下边代码对应的源码
RLock redissonLock = redisson.getLock(lockKey);
会调用RedissonLock的构造方法,初始化锁的名称以及锁的失效时间为30秒
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
}
初始化锁的失效时间
private long lockWatchdogTimeout = 30 * 1000;
2.2.2.2 加锁逻辑
调用RedissonLock.lockInterruptibly()
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
下边代码大体逻辑就是,先尝试获取锁,如果获取成功就直接返回,如果没有获取成功,就自旋,直到获取成功。
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
//获取当前线程ID
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 获取锁就返回
if (ttl == null) {
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
//没获取锁自旋获取
while (true) {
//再次尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
//获取锁就返回
if (ttl == null) {
break;
}
//自旋的时候等待一段时间,下次再次获取
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
获取锁的逻辑
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
继续看tryAcquireAsync(leaseTime, unit, threadId)
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
由于传入的leaseTime为-1,所以请看如下代码
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
Redisson源码大量调用了Lua脚本,用于实现原子性。下面来分析这几个脚本,KEYS[1]为锁的名称、ARGV[1]为锁的失效时间、ARGV[2]为当前线程的ID。
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
上边脚本的逻辑为,首先判断锁是否存在,如果锁不存在,那么就设置该锁,并设置失效时间,返回空;如果锁已存在,就判断该锁是否是当前线程加的锁,如果是当前线程加的锁,就对这个锁的调用次数加1,并刷新锁的失效时间,返回空;如果不是当前线程加的锁,那么该返回该锁的失效时间。
下面来看监听里边相关的代码,这部分代码用了一个延时的任务执行,每隔10秒就查询当前线程获取的锁状态,如果存在就对失效时间进行刷新,并返回1,不存在就直接返回0。
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
2.2.3 存在问题
由于Redis一般是集群部署的,所以会出现由于主节点挂掉的话,从节点会取而代之。这时候如果客户端在主节点上申请成功了一把锁,但是这把锁还没有来得及同步到从节点,主节点突然挂了,然后从节点变为主节点,这个新的节点内部没有这个锁,所以当另一个客户端过来请求加锁时,立即就批准了,这样就导致系统中同样一把锁被两个客户端同时持有,不安全性由此产生。
3. RedLock算法
为了解决由于主节点挂掉导致多个客户端同时持有一把锁的问题,Antirez发明了RedLock算法,它的流程比较复杂,不过已经有了很多大神开发了开源的库,用户可以拿来即用,比如redlock-py以及Redisson。
为了使用 Redlock,需要提供多个Redis实例,这些实例之前相互独立没有主从关系。同很多分布式算法一样,redlock 也使用「大多数机制」。
加锁时,它会向过半节点发送 set(key, value, nx=True, ex=xxx) 指令,只要过半节点 set成功,那就认为加锁成功。释放锁时,需要向所有节点发送 del 指令。不过 Redlock 算法还
需要考虑出错重试、时钟漂移等很多细节问题,同时因为 Redlock 需要向多个节点进行读写,意味着相比单实例 Redis 性能会下降一些。
3.1 RedLock使用场景
如果你很在乎高可用性,希望挂了一台 redis 完全不受影响,那就应该考虑 redlock。不过代价也是有的,需要更多的 redis 实例,性能也下降了,代码上还需要引入额外的library,运维上也需要特殊对待,这些都是需要考虑的成本,使用前请再三斟酌。
参考资料
- 《Redis深度历险》
网友评论