为什么要设计分布式锁
在简单的单机系统中,当存在多个线程同时要修改某个共享变量时,为了数据的操作安全,往往需要通过加锁的方法,在同一时刻同一代码块只能有一个进程执行操作,存在很多加锁的方式,比如在java中有synchronize或Lock子类等。
但是在分布式中,会存在多个主机,即会存在多个jvm, 在jvm之间数据是不能共享的,上面的方法只能在一个jvm中执行有效,在多个jvm中同一变量可能会有不同的值。所以我们要设计一种跨jvm的共享互斥机制来控制共享变量资源的访问,这也是提出分布式锁的初衷。
需要解决的问题
为了将分布式锁实现较好的性能,我们需要解决下面几个重要的问题:
- 一个方法或代码片段在同一时刻只能被一个进程所执行。
- 高可用的获取锁与释放锁功能。
- 避免死锁
- 锁只能被持有该锁的客户端删除或者释放。
- 容错,在服务器宕机时,锁依然能得到释放或者其他服务器可以进行加锁。
下面分别利用redis和zookeeper来实现加锁和解锁机制。
基于Redis的加锁第一版
本版本通过变量sign设置锁的唯一标识,确保只有拥有该锁的客户端才能删除它,其他客户端不能删除。
利用阻塞锁的思想, 通过while(System.currentTimeMillis() < endTime)
和Thread.sleep()
相结合,在设置的规定时间内进行多次尝试。
但是setnx
操作和expire
分割开了,不具有原子性,可能会出现问题。
比如说,在执行到jedis.expire
时,可能系统发生了崩溃,导致锁没有设置过期时间,导致发生死锁。
public String addLockVersion1(String key, int blockTime, int expireTime) {
if (blockTime <=0 || expireTime <= 0)
return null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
String sign = UUID.randomUUID().toString();
String token = null;
//设置阻塞尝试时间
long endTime = System.currentTimeMillis() + blockTime;
while (System.currentTimeMillis() < endTime) {
if (jedis.setnx(key, sign) == 1) {
// 添加成功,设置锁的过期时间,防止死锁
jedis.expire(key, expireTime);
// 在释放锁时用于验证
token = sign;
return token;
}
//加锁失败,休眠一段时间,再进行尝试。
try {
Thread.sleep(DEFAULT_SLEEP_TIME);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (JedisException e) {
e.printStackTrace();
} finally {
if (jedis != null)
jedis.close();
}
return null;
}
基于Redis的加锁第二版
通过设置key对应的value值为锁的过期时间,当遇到系统崩溃,致使利用expire
设置锁过期时间失败时,通过获取value值,来判断当前锁是否过期,如果该锁已经过期了,则进行重新获取。
但是它也存在一些问题。当锁过期时,如果多个进程同时执行jedis.getSet
方法,虽然只有一个进程可以获得该锁,但是这个进程的锁的过期时间可能被其他进程的锁所覆盖。
该锁没有设置唯一标识,也会被其他客户端锁释放,不满足只能被锁的拥有者锁释放的条件。
public boolean addLockVersion2(String key, int blockTime, int expireTime) {
if (blockTime <=0 || expireTime <= 0)
return false;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
long endTime = System.currentTimeMillis() + blockTime;
while (System.currentTimeMillis() < endTime) {
long redisExpierTime = System.currentTimeMillis() + expireTime;
if (jedis.setnx(key, redisExpierTime + "") == 1) {
jedis.expire(key, expireTime);
return true;
} else {
String oldRedisExpierTime = jedis.get(key);
// 当锁设置成功,但是没有通过expire成功设置过期时间,但是根据存的值判断出它实际上已经过期了
if (oldRedisExpierTime != null && Long.parseLong(oldRedisExpierTime) < System.currentTimeMillis()) {
String lastRedisExpierTime = jedis.getSet(key, System.currentTimeMillis() + blockTime + "");
//获取到该锁,没有被其他线程所修改
if (lastRedisExpierTime.equals(oldRedisExpierTime)) {
jedis.expire(key, expireTime);
return true;
}
}
}
//加锁失败,休眠一段时间,再进行尝试。
try {
Thread.sleep(DEFAULT_SLEEP_TIME);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (JedisException e) {
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
}
return false;
}
基于Redis的加锁第三版
具体通过set
方法来实现setnx
和expire
的相加功能,实现了原子操作。
如果key不存在时,就进行加锁操作,并对锁设置一个有效期,同时uniqueId表示加锁的客户端;如果key存在,不做任何操作。
public boolean addLockVersion3(String key, String uniqueId, int blockTime, int expireTime) {
Jedis jedis = null;
try {
long endTime = System.currentTimeMillis() + blockTime;
while (System.currentTimeMillis() < endTime) {
jedis = jedisPool.getResource();
String result = jedis.set(key, uniqueId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_STATE.equals(result))
return true;
try {
Thread.sleep(DEFAULT_SLEEP_TIME);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return false;
} catch (JedisException e) {
e.printStackTrace();
} finally {
if (jedis != null)
jedis.close();
}
return false;
}
基于Redis的加锁第四版
为了使对同一个对象添加多次锁,并且不发生阻塞,即实现类似可重入锁,我们借鉴了ReetrantLock
的思想,添加了变量states
来控制。
public boolean addLockVersion4(String key, String uniqueId, int expireTime) {
int state = states.get();
if (state > 1) {
states.set(state+1);
return true;
}
return doLock(key, uniqueId, expireTime);
}
private boolean doLock(String key, String uniqueId, int expireTime) {
Jedis jedis = null;
if (expireTime <= 0)
return false;
try {
jedis = jedisPool.getResource();
String result = jedis.set(key, uniqueId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_STATE.equals(result))
states.set(states.get() + 1);
return true;
} catch (JedisException e) {
e.printStackTrace();
} finally {
if (jedis != null)
jedis.close();
}
return false;
}
基于Redis的加锁第五版
从上面可知,利用setnx
和expire
实现加锁机制时因为不是原子操作,会产生一些问题,我们可用lua脚本来实现。
public boolean addLockVersion5(String key, String uniqueId, int expireTime) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
String luaScript = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" +
"redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
List<String> keys = new ArrayList<>();
List<String> values = new ArrayList<>();
keys.add(key);
values.add(uniqueId);
values.add(String.valueOf(expireTime));
Object result = jedis.eval(luaScript, keys, values);
if ((Long)result == 1L)
return true;
} catch (JedisException e) {
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
}
return false;
}
基于Redis的释放锁第一版
在解锁时首先判断加速与解锁是否是同一个客户端,然后利用del
方法进行删除。
但是会出现一些问题。
当方法执行到判断内部时,即将要执行del
方法时,该锁已经过期了,并被其他的客户端所请求应有,此时执行del
会造成锁的误删。
public boolean releaseLockVersion1(String key, String uniqueId) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
//加锁与解锁是否是同一个客户端
String lockId = jedis.get(key);
if (lockId != null && lockId.equals(uniqueId)) {
jedis.del(key);
return true;
}
} catch (JedisException e) {
e.printStackTrace();
} finally {
if (jedis != null)
jedis.close();
}
return false;
}
基于Redis的释放锁第二版
从上面的分析来看,我们要确保删除的原子性,利用lua脚本可以保证一点。
在脚本语言里,KEYS[1]和ARGV[1]分别表示传入的key名和唯一标识符。
public boolean releaseLockVersion2(String key, String uniqueId) {
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Jedis jedis = null;
Object result = null;
try{
jedis = jedisPool.getResource();
result = jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(uniqueId));
if ((Long)result == 1)
return true;
} catch (JedisException e) {
e.printStackTrace();
} finally {
if (jedis != null)
jedis.close();
}
return false;
}
基于Redis的释放锁第三版
在利用可重入锁思想时,只有当states=1
时才能被释放,大于0时,只能进行减1操作。
public boolean releaseLockVersion3(String key, String uniqueId) {
int state = states.get();
if (state > 1) {
states.set(states.get() - 1);
return false;
}
return this.doRelease(key, uniqueId);
}
private boolean doRelease(String key, String uniqueId) {
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Jedis jedis = null;
Object result = null;
try{
jedis = jedisPool.getResource();
result = jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(uniqueId));
if ((Long)result == 1)
return true;
} catch (JedisException e) {
e.printStackTrace();
} finally {
states.set(0);
if (jedis != null)
jedis.close();
}
return false;
}
利用Zookeeper实现分布式锁
Zookeeper提供一个多层次的节点命名空间,每个节点都用一个以斜杠(/)分割的路径表示,
而且每个节点都有父节点(根节点除外),非常类似于文件系统。
基本思想流程
- 在某父节点下添加创建一个节点,
- 获取该父节点下的所有子节点,并进行排序,获得有个有序序列
- 如果当前添加的节点是序列中序号最小的节点,表示获取锁成功
- 如果不是最小的节点,则对在有序列表中的它的前一个节点进行监听,当被监听的节点被删除后,会通知该节点获取锁。
- 解锁的时候删除当前节点。
实现代码
public class zklock {
private ZkClient zkClient;
private String name;
private String currentLockPath;
private CountDownLatch countDownLatch;
private static final String PATENT_LOCK_PATH = "distribute_lock";
private static final int MAX_RETEY_TIMES = 3;
private static final int DEFAULT_WAIT_TIME = 3;
public zklock(ZkClient zkClient, String name) {
this.zkClient = zkClient;
this.name = name;
}
public void addLock() {
if (!zkClient.exists(PATENT_LOCK_PATH)) {
zkClient.createPersistent(PATENT_LOCK_PATH);
}
int count = 0;
boolean iscompleted = false;
while (!iscompleted) {
iscompleted = true;
try {
//创建当前目录下的临时有序节点
currentLockPath = zkClient.createEphemeralSequential(PATENT_LOCK_PATH + "/", System.currentTimeMillis());
} catch (Exception e) {
if (count++ < MAX_RETEY_TIMES) {
iscompleted = false;
} else
throw e;
}
}
}
public void releaseLock() {
zkClient.delete(currentLockPath);
}
//检查是否是最小的节点
private boolean checkMinNode(String localPath) {
List<String> children = zkClient.getChildren(PATENT_LOCK_PATH);
Collections.sort(children);
int index = children.indexOf(localPath.substring(PATENT_LOCK_PATH.length()+1));
if (index == 0) {
if (countDownLatch != null) {
countDownLatch.countDown();
}
return true;
} else {
String waitPath = PATENT_LOCK_PATH + "/" + children.get(index-1);
waitForLock(waitPath, false);
return false;
}
}
//监听有序序列中的前一个节点
private void waitForLock(String waitPath, boolean useTime) {
countDownLatch = new CountDownLatch(1);
zkClient.subscribeDataChanges(waitPath, new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
checkMinNode(currentLockPath);
}
});
if (!zkClient.exists(waitPath)) {
return;
}
try {
if (useTime == true)
countDownLatch.await(DEFAULT_WAIT_TIME, TimeUnit.SECONDS);
else
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch = null;
}
}
基于Redis和Zookeeper的分布式锁的优劣
- Redis是nosql数据库,主要特点是缓存;
- Zookeeper是分布式协调工具,主要用于分布式解决方案。
加锁机制 - Redis: 通过
set
方法创建key, 因为Redis的key是唯一的,谁先创建成功,谁能够先获得锁。 - Zookeeper: 会在Zookeeper上创建一个临时节点,因为Zookeeper节点命名路径保证唯一,只要谁先创建成功,谁能够获取到锁。
释放锁 - Redis: 为了确保锁的一致性问题,在删除的redis的key时,需要判断是否是之前拥有该锁的客户端;通过设置有效期解决死锁。
- Zookeeper: 直接关闭临时节点session会话连接,因为临时节点生命周期与session会话绑定在一块,如果session会话连接关闭的话,该临时节点也会被删除。
就性能而言,redis是Nosql数据库,性能优于zookeeper;就健壮性而言,zookeeper明显优于redis。
网友评论