经常听到分布式锁这个名词,觉得高大上又离我很遥远,于是也没有进行什么深究。直到面试被反复问道,我觉得有必要了解一下,以跟上这个时代的节奏。
常见的分布式锁实现,一个是ZooKeeper,另一个就是Redis了。Redis这么熟悉,当然是拿它先开刀了。在进行锁的实现的时候,有几个基本概念,需要先说明一下。
第一个就是锁的概念,这个锁不像java或者jvm中那种复杂又难懂的锁,你可以把锁理解为一个值,根据这个值的存在与否,来决定是否获取到锁。
在Redis中,有一种称为乐观锁的东西,什么叫乐观锁,意思就是这种策略是乐观的,总乐观的认为别的客户端连接不会修改该值,因此除了本客户端连接可以读取到某种资源(存储在Redis上的键值对信息),其它客户端连接也可以读取到。那万一其余客户端连接修改了资源呢?这个时候Redis Server会给加了乐观锁的客户端连接发送消息,该诉你该数据已经修改过了,这个时候本客户端连接可以选择循环重试或者直接退出。这个听起来是不是像CAS的操作呢? 我的感觉是像极了。 具体涉及的命令就是watch unwatch multi exec,这些命令什么意思,下文详细说明。
另一种锁称之为悲观锁。什么是悲观锁呢?就是客户端连接总是认为数据随时都可能被其余客户端连接修改掉,所以加了悲观锁,其余的客户端连接在此刻是无法读取到资源,自然也无法进行操作。当本客户端连接释放了锁,其余的客户端连接才可以获取锁,进而进行操作。涉及的命令不仅包括watch unwatch multi exec,也包括setnx ttl expire等命令,同样后文细说。
不像java中已经提供了各种锁以及同步容器,并发工具,Redis本身它不提供任何锁的实现,也没有乐观锁、悲观锁,超时锁等东西,这需要我们组合使用Redis的各种原子命令以及不同的命令特性,来自己实现锁,同时在代码层面一致的使用和释放锁。
再来说一下Redis客户端。Redis是用c语言写的,要么使用自带的redis-cli进行连接和交互,要么使用根据通信协议封装好的不同语言的客户端库。Redis的java客户端库很多,常用的有Jedis、Lettuce 。并且Spring SpringBoot默认的就是支持这两种客户端,所以学习了这两个客户端对以后也很有帮助。Lettuce 的功能更加强大,它基于netty,支持Redis的哨兵模式和集群模式。但是由于我知识浅薄,只能拿Jedis来作为演示啦。
由于乐观锁会导致无谓的修改循环重试,导致很少能够修改成功,耗费资源。而悲观锁,虽然在获取锁时不断重试,但对于修改资源,却是一次就成功了,在资源竞争严重的时候,悲观锁策略性能更好,因此这里主要选择悲观锁这种思想来进行代码演示。
首先我们设定一个键为锁(Redis就是Key Value型存储),并使用setnx命令,该命令的特点是,如果键存在,则设置定指定的值,并返回1,如果键存在,则什么都不做,并返回0。对应到代码中来就是,如果执行setnx命令返回了1,则说明获取到了锁,代码可以继续往下执行,如果执行setnx命令返回0,则说明没有获取到锁,当前线程等待或者重试。
获取到锁,执行代码完毕,需要释放锁。怎么释放呢,就是很简单的执行del命令即可,即把锁的key给删除,这样其余的连接就可以获取锁了。
伪代码如下:
while(con.setnx(lockKey,lockValue)==0){
//休眠 重试获取锁
}
// 执行业务代码
con.del(lockKey);
一切看起来很美好,不是吗?但现实总是千变万化的,一种可能是一段代码的执行,需要在不同地方获取不同的锁,导致死锁的发生,又或者是网络故障或者客户进程崩溃,造成锁永远无法释放。这就会导致其余的Redis连接一直无法获取到锁,因为一台机器的代码问题,网络问题,机器故障等原因,导致所有的服务都变得不可用,这是让人无法接受的,这违背了分布式的初衷。
怎么解决这个问题呢?我们可以指定一个锁的过期时间,比如10s后这个锁会过期,并且极限条件下业务执行时间也不会超过10s。(在服务有互相依赖,复杂的服务调用中,调用链越长,超时时间越不好预估,但这个也是在解决问题和性能之间做一个平衡,超时时间设置太长,性能会大大降低,超时时间太短,会造成并发问题,因为一个连接中代码还没有执行完,锁已经被删除,同时另一个连接获取到了锁,执行业务代码)。设置了锁的过期时间,解决了锁不释放的问题,但是同时引入的新的问题,那就是可能会删除其余连接的锁。比如A连接获取到锁key1,执行很长时间,此时锁过期,被删除,另一个连接B获取到锁key1,并执行对应的代码,此时A连接执行结束,于是释放锁,但是此时,其实它的锁已经被释放了,在锁过期的时候,现在它释放的是B链接的锁,那是不对的。假如此刻C连接进来,是能够获取到锁的,那么就意味着B C在同时执行业务代码,违背了锁当初设计的本意,因此绝对不能释放其余连接的锁,而只能释放自己的。
那么如何解决这个问题?其实我们可以在链接获取锁的时候,设置一个只有当前连接知道的唯一值,释放的时候会先取出锁的值,进行比较,只有跟存入的值是一致的时候,才会释放锁,也就是删除键,否则,什么也不做。
分析了这么多,我们可以看看获取锁的工具类代码:
/**
* 在指定的等待时限内获取锁
* @param jedis 连接
* @param lockName 锁名称
* @param timeOutMillionSeconds 获取锁超时时间 -1:一直等待,直到获取到锁
* @return 如果获取到锁,返回一个锁标识符,否则返回null
*/
public static String acquireLock(Jedis jedis,String lockName,long timeOutMillionSeconds){
// 略去各类校验
String identifier = UUID.randomUUID().toString();
long timeEnd = timeOutMillionSeconds==-1?Long.MAX_VALUE:System.currentTimeMillis() + timeOutMillionSeconds;
while(System.currentTimeMillis()<=timeEnd){
long result = jedis.setnx(lockName,identifier);
if(result==1){// 等于1 说明没有设置过,获取锁成功
return identifier;
}
try {
TimeUnit.MILLISECONDS.sleep(100);// 线程休眠值 根据业务来定
}catch (InterruptedException e){//假设在本机没有多线程编程 不会通知另一方线程中断 根据具体业务来
throw new RuntimeException(e);
}
}
return null;
}
注意代码中的jedis没有close,根据需要,也可以在工具类中close掉。
以及超时锁,为了防止setnx和expire之间,程序奔溃,造成超时时间没有设置上,因此其余的连接在获取不到锁的时候,会先判断锁有没有过期时间,如果没有,给锁加上过期时间:
/**
*在指定的等待时限内获取锁,该锁自身带有超时特性
* @param jedis 连接
* @param lockName 锁名称
* @param timeOutMillionSeconds 获取锁超时时间 -1:一直等待,直到获取到锁
* @param lockTimeOutSeconds 锁的超时时间
* @return 如果获取到锁,返回一个锁标识符,否则返回null
*/
public static String acquireLockTimeOut(Jedis jedis,String lockName,long timeOutMillionSeconds,int lockTimeOutSeconds){
// 略去各类校验
String identifier = UUID.randomUUID().toString();
long timeEnd = timeOutMillionSeconds==-1?Long.MAX_VALUE:System.currentTimeMillis() + timeOutMillionSeconds;
while(System.currentTimeMillis()<=timeEnd){
long result = jedis.setnx(lockName,identifier);
if(result==1){// 等于1 说明没有设置过,获取锁成功
jedis.expire(lockName,lockTimeOutSeconds);
return identifier;
}else{
if(jedis.ttl(lockName)==-1){
jedis.expire(lockName,lockTimeOutSeconds);
}
}
try {
TimeUnit.MILLISECONDS.sleep(100);// 线程休眠值 根据业务来定
}catch (InterruptedException e){//假设在本机没有多线程编程 不会通知另一方线程中断 根据具体业务来
throw new RuntimeException(e);
}
}
return null;
}
释放锁的代码:
/**
* 关于watch multi exec等命令,参考https://redis.io/topics/transactions 才能深刻理解
* @param jedis 连接
* @param lockName 锁名称
* @param identifier 锁标识符
* @return 是否成功释放锁 仅作为参考
*/
public static boolean releaseLock(Jedis jedis,String lockName,String identifier){
// 略去各类校验
boolean releaseNormal = false;// 锁是否正常释放
jedis.watch(lockName);
String identifierInRedis = jedis.get(lockName);
if(identifier.equalsIgnoreCase(identifierInRedis)){// 如果标识符没有改动,则说明可以解锁
Transaction transaction = jedis.multi();
transaction.del(lockName);
transaction.exec();
releaseNormal = true;
}else{
jedis.unwatch();
releaseNormal = false;
}
return releaseNormal;
}
这里重点解释一下释放锁的操作:只有现在取出的跟当时存入的值一致,才会进行删除操作。但为了防止get 和del之间的某个时候,另一个连接修改了锁的值,(为什么会修改?是因为当前连接A在执行完get之后,锁过期了,因此另一个连接B可以获取到锁,现在A执行删除操作,就是删除B连接获取到的锁),因此需要watch 操作,如果现在取出的值和当初存入的不一致,那么直接执行unwatch并返回。为什么要执行unwatch呢?因为为了安全,加入不执行unwatch就返回,在后续的代码中执行multi和exec,那就有很大的问题,当锁被删除或者修改,就会打断当前的事务,但是该事物跟锁是没有任何关系的,所以unwatch是一个需要执行的操作。另一个情况是假如当前连接取出的锁的值,跟存入的一直,就需要执行删除锁的操作。可能有同学就会问了,Redis的所有操作都是原子操作,执行del和包裹在multi exec中执行del不是一样的原子操作吗?为何还要多此一举,让代码变得不好理解。在这一点上,它们确实并无任何区别,但是重点是之前有一个watch命令。假如没有执行watch multi del exec这样的顺序,就会有释放掉其余连接的锁的风险,为什么会这样,上文已经做了分析。在get和del之间发生的事情,当前连接是不知道的,get del的执行不是原子性的。有了watch multi del exec这个顺序,当前连接A get执行之后,锁失效,且被另一个连接B获取到锁,也就是修改了锁,因为watch(key),所以当前连接就知道了有人修改了。当执行exec的时候,就会丢弃掉del命令,因为watch的通知使得事务已经失效了,这保证了其余连接的锁不会被删除。同时,当执行exec的时候,不论事务成功与否,都unwatch了。最终呢,释放锁的代码看起来就是这样了。
写好了工具类,我们应该测试一下,看看是不是真的,我们可以使用线程池,放入200个任务,每个任务都是执行获取Redis的某个键,并加一,再设置回Redis。在执行代码前,进行锁获取,执行完毕,进行锁释放。为了等到所有线程执行完毕,便于获取最终执行结果,使用CountDownLatch进行等待线程池所有任务的执行完毕。另外的一些部分是初始化动作,防止锁已经设置了或者指定的键已经有值了。代码如下:
public class LockTest {
private static final Log log = LogFactory.getLog(LockTest.class);
public static void main(String[] args)throws Exception {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,8,10,
TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(1000),new ThreadPoolExecutor.CallerRunsPolicy());
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(10);
final JedisPool jedisPool = new JedisPool(jedisPoolConfig,"192.168.99.100");
final CountDownLatch countDownLatch = new CountDownLatch(200);
final String lockName = "lock_a";
Jedis jedisInit = jedisPool.getResource();
jedisInit.del(lockName);
final String testResource = "test_str";
jedisInit.set(testResource,"0");
jedisInit.close();
for(int i=0;i<200;i++){
threadPoolExecutor.execute(new Runnable() {
public void run() {
Jedis jedis = jedisPool.getResource();
String identifiler = RedisSetnxLock.acquireLock(jedis,lockName,-1);
if(identifiler==null) {
log.info("获取锁失败");
countDownLatch.countDown();
jedis.close();
return;
}
try{
String value = jedis.get(testResource);
Thread.sleep(200);// 故意休眠
jedis.set(testResource,(Integer.parseInt(value)+1)+"");
}catch (Exception e){
e.printStackTrace();
}finally {
RedisSetnxLock.releaseLock(jedis,lockName,identifiler);
jedis.close();
countDownLatch.countDown();
}
}
});
}
countDownLatch.await();
log.info(jedisPool.getResource().get(testResource));
threadPoolExecutor.shutdown();
}
}
最后的执行结果符合预期。
为了演示连接挂掉或者执行超常任务的情形,可以执行下面的测试:
public class LockTest {
private static final Log log = LogFactory.getLog(LockTest.class);
public static void main(String[] args)throws Exception {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,8,10,
TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(1000),new ThreadPoolExecutor.CallerRunsPolicy());
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(10);
final JedisPool jedisPool = new JedisPool(jedisPoolConfig,"192.168.99.100");
final CountDownLatch countDownLatch = new CountDownLatch(200);
final String lockName = "lock_a";
Jedis jedisInit = jedisPool.getResource();
jedisInit.del(lockName);
final String testResource = "test_str";
jedisInit.set(testResource,"0");
jedisInit.close();
for(int i=0;i<200;i++){
threadPoolExecutor.execute(new Runnable() {
public void run() {
Jedis jedis = jedisPool.getResource();
// 锁的超时时间为1s
String identifiler = RedisSetnxLock.acquireLockTimeOur(jedis,lockName,-1,1);
if(identifiler==null) {
log.info("获取锁失败");
countDownLatch.countDown();
jedis.close();
return;
}
try{
String value = jedis.get(testResource);
Thread.sleep(200);
jedis.set(testResource,(Integer.parseInt(value)+1)+"");
}catch (Exception e){
e.printStackTrace();
}finally {
// 每次不释放锁,模拟执行超常任务或者进程挂掉的情形
//RedisSetnxLock.releaseLock(jedis,lockName,identifiler);
jedis.close();
countDownLatch.countDown();
}
}
});
}
countDownLatch.await();
log.info(jedisPool.getResource().get(testResource));
threadPoolExecutor.shutdown();
}
}
执行结果同样正确,只是执行时间变长了。
相关maven pom:
<!--jedis client-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>
<!--jedis连接池-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.1</version>
</dependency>
参考书籍:《Redis实战》,一本非常棒的书。
网友评论