前言
在实际开发过程中,当遇到高并发的场景时,我们通常会使用锁来保证线程安全。例如ReentrantLock,Synchronized。但是,熏弟,2020年了,应用都开始分布式了。普通的锁已经无法满足我们对程序加锁的欲望了。那么,如何解决呢?
首先,我们先要搞懂“锁”的基本原理。ReentrantLock是怎么实现锁的呢?(暂时不考虑AQS队列,只写最基础的方法。)
![](https://img.haomeiwen.com/i10185696/70405f13b0741ad8.png)
看完以后,是不是觉得so easy?关键就在于这个state,在jdk的lock中,可以使用volite关键字来进行状态的维护。那么我们,首先就可以考虑如何在分布式环境中维护这个状态。想想我们日常用到的技术中,哪些地方有这样的唯一性约束条件。聪明的你,有没有一些大胆的想法?mysql的主键索引,正好满足这个要求。
废话不多说,开整。
代码实例
- 首先,作为一名有逼格的程序员,我们得面向接口编程,先上接口。
public interface Lock {
//获取锁
void getLock();
//释放锁
void unLock();
}
2.第二步,用一个抽象类,构建锁的基本流程(模板方法模式)
public abstract class AbstractLock implements Lock {
@Override
public void getLock() {
//竞争锁
if(tryLock()){
System.out.println("获取Lock锁资源");
}else {
//任务阻塞
waitLock();
//重新获取锁
getLock();
}
}
//占有锁
protected abstract boolean tryLock();
//等待锁
protected abstract void waitLock();
}
接下来,就是对锁的实现了。
其实除了利用mysql外,我们还可以利用redis的setnx特性,zookeeper的临时节点和监听机制来实现。那我门就一个一个来吧。
3.锁的实现
(1)Mysql实现分布式锁
public class MysqlLock extends AbstractLock{
@Autowired
private JdbcTemplate jdbcTemplate;
private static final int LOCK_ID=1;
@Override
protected boolean tryLock() {
try {
jdbcTemplate.update("insert into mlock values (?)",LOCK_ID);//定义mlock表,只有一个id主键字段
}catch (Exception e){
return false;
}
return true;
}
@Override
protected void waitLock() {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void unLock() {
jdbcTemplate.update("delete from mlock where id=?",LOCK_ID);
}
}
(2)redis实现分布式锁
public class RedisLock extends AbstractLock {
@Autowired
private JedisConnectionFactory factory;
public static final String KEY = "LOCK_KEY";
private ThreadLocal<String> local = new ThreadLocal<>();
@Override
protected boolean tryLock() {
String uuid = UUID.randomUUID().toString();//定义uuid,避免解锁的时候把别人的锁解了
Jedis jedis = (Jedis) factory.getConnection().
String res = jedis.set(KEY, uuid, "NX", "PX", 10000);getNativeConnection();//设置临时节点,并设置过期时间为10s,避免死锁
if("OK".equals(res)){
local.set(uuid);
return true;
}
return false;
}
@Override
protected void waitLock() {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 使用lua脚本进行删除操作,保证了get和del两步操作的原子性
* */
@Override
public void unLock() {
//读取lua脚本
String script = FileUtils.readFile("unlock.lua");
//获取redis的原始连接
Jedis jedis = (Jedis) factory.getConnection().getNativeConnection();
//通过原始连接连接redis执行脚本
jedis.eval(script, Arrays.asList(KEY),Arrays.asList(local.get()));
}
}
lua脚本
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
(3)zookeeper实现分布式锁
第一种实现方式
public class ZkLock extends AbstractLock {
private final static String PATH="/lock";
// zk连接地址
private static final String CONNECTSTRING = "127.0.0.1:2181";
// 创建zk连接
protected ZkClient zkClient = new ZkClient(CONNECTSTRING);
private CountDownLatch countDownLatch;
@Override
protected boolean tryLock() {
try {
/**创建临时节点*/
zkClient.createEphemeral(PATH);
} catch (RuntimeException e) {
return false;
}
return true;
}
@Override
protected void waitLock() {
/**监听数据变化*/
IZkDataListener iZkDataListener = new IZkDataListener(){
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
/** 节点删除,唤醒被等待的线程*/
if(countDownLatch!=null){
countDownLatch.countDown();
}
}
};
/**PATH 节点订阅监听器*/
zkClient.subscribeDataChanges(PATH,iZkDataListener);
if(zkClient.exists(PATH)){
countDownLatch=new CountDownLatch(1);
try {
/** 阻塞,一直等到接收到事件通知*/
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
zkClient.unsubscribeDataChanges(PATH,iZkDataListener);
}
@Override
public void unLock() {
if(zkClient!=null){
zkClient.delete(PATH);
zkClient.close();
System.out.println("释放锁资源");
}
}
}
这种实现方式会存在"惊群效应",当锁被释放的时候,其他的进程会一起去抢这把锁,可能导致有的进程永远无法获得到锁。那么,我们是不是可以像公平锁那样,让进程排队,挨个去用这把锁呢。zookeeper刚好有有序节点功能,我们可以利用。
第二种实现方式
public class ZkLock2 extends AbstractLock {
private static final String PATH2 = "/lock2";
// zk连接地址
private static final String CONNECTSTRING = "127.0.0.1:2181";
// 创建zk连接
protected ZkClient zkClient = new ZkClient(CONNECTSTRING);
private CountDownLatch countDownLatch= null;
private String beforePath;//当前请求的节点前一个节点
private String currentPath;//当前请求的节点
public ZkLock2() {
if (!this.zkClient.exists(PATH2)) {
this.zkClient.createPersistent(PATH2);
}
}
@Override
public boolean tryLock() {
//如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
if(currentPath == null || currentPath.length()<= 0){
//创建一个临时顺序节点
currentPath = this.zkClient.createEphemeralSequential(PATH2 + '/',"lock");
}
//获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
List<String> childrens = this.zkClient.getChildren(PATH2);
Collections.sort(childrens);
if (currentPath.equals(PATH2 + '/'+childrens.get(0))) {//如果当前节点在所有节点中排名第一则获取锁成功
return true;
} else {//如果当前节点在所有节点中排名中不是排名第一,则获取前面的节点名称,并赋值给beforePath
int wz = Collections.binarySearch(childrens,
currentPath.substring(7));
beforePath = PATH2 + '/'+childrens.get(wz-1);
}
return false;
}
@Override
public void waitLock() {
IZkDataListener listener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
if(countDownLatch!=null){
countDownLatch.countDown();
}
}
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
//给排在前面的的节点增加数据删除的watcher,本质是启动另外一个线程去监听前置节点
this.zkClient.subscribeDataChanges(beforePath, listener);
if(this.zkClient.exists(beforePath)){
countDownLatch=new CountDownLatch(1);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.zkClient.unsubscribeDataChanges(beforePath, listener);
}
@Override
public void unLock() {
//删除当前临时节点
zkClient.delete(currentPath);
zkClient.close();
System.out.println("释放锁");
}
}
总结
以上就是三种分布式锁的实现了,三种锁中,mysql和redis会存在死锁的可能,我建议大家使用zookeeper的实现。
网友评论