实现分布式锁的方式
一.基于数据库实现分布式锁
1、在数据库新建一张锁表(lock)
create table lock (
id
method_name (唯一约束)
.......
)
2、获取锁时向表中插入一条数据,由于有唯一约束,只会有一个线程插入成功,插入成功的线程获得锁,可以继续操作,没有插入成功的线程没有获得锁,不能操作;
3、解锁时删除该条数据;
主要问题:
可用性差,数据库故障会导致业务系统不可用;
数据库性能存在瓶颈,不适合高并发场景;
删除锁失败容易造成死锁;
锁的失效时间难以控制;
第二种:基于Redis实现分布式锁
加锁:
setnx命令加锁,并设置锁的有效时间和持有人标识;
expire命令设置锁的过期时间;
解锁:
检查是否持有锁,然后删除锁;
delete命令删除锁
在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。
基于Redis实现分布式锁,采用一个开源项目http://redisson.org
第三种:基于zookeeper实现分布式锁
zookeeper是分布式系统的协调服务,提供配置管理、分布式协同、命名的中心化服务、服务注册发现等;
zookeeper实现分布式锁采用其提供的临时有序节点+监听来实现;
Curator客户端给我们提供了现成的分布式互斥锁来实现分布式锁;
1、创建分布式互斥锁
InterProcessMutex lock = new InterProcessMutex(zookeeperCuratorClient.client, "/storeLock");
2、获取分布式互斥锁
if (lock.acquire(10, TimeUnit.SECONDS))
3、释放分布式互斥锁
lock.release();
curator客户端分布式锁
public class lockService {
@Autowired
private ZookeeperDao dao;
InterProcessMutex lock = new InterProcessMutex(new CuratorClient().getClient(),"/lock");
public void loclMethod(){
try {
//获取锁
//一直等待锁
//lock.acquire();
//尝试获取锁,如果在指定时间获取锁,则返回true
if (lock.acquire(1000, TimeUnit.SECONDS)){
int check = dao.check();
if (check > 0){
System.out.println("售出");
dao.des(--check);
}else {
System.out.println("没有库存");
}
}
//Thread.sleep(50);
} catch (Exception e) {
System.out.println(e);
}finally {
try {
//释放锁
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
zookeeper原生客户端分布式锁实现
/**
* zookeeper原生客户端实现分布式锁
* 基于有序临时节点
*/
public class ZookeeperLock {
private ZooKeeper zooKeeper;
private static final String ZK_ADDRESS = "127.0.0.1";
//锁根节点
private static String rootName = "/zklock";
//锁节点
private String lockName;
//当前锁节点名称
private String cuurentLockName;
//zookeeper 超时时间
private int sessionTimeout = 100000;
private CountDownLatch countDownLatch = new CountDownLatch(1);
public ZookeeperLock(String lockName) {
//锁节点名称 初始化
this.lockName = lockName;
//zookeeper初始化
try{
zooKeeper = new ZooKeeper(ZK_ADDRESS, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected){
//zookeeper连接上了
countDownLatch.countDown();
}
}
});
//等待zookeeper连接成功
countDownLatch.await();
//创建一个根节点/lock
if (zooKeeper.exists(rootName,false) == null){
zooKeeper.create(rootName,"rootLock".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}catch (Exception e){
throw new RuntimeException(e);
}
}
/**
* zookeeper加锁
* 获取分布式锁
*/
public void lock(){
try {
String node = zooKeeper.create(rootName + "/" + lockName,
"lock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
//拿到根节点下的子节点 临时有序的子节点
List<String> children = zooKeeper.getChildren(rootName, false);
TreeSet<String> childNodes = new TreeSet<>();
//childNodes.addAll(children);
for (String c : children){
childNodes.add(rootName+ "/" +c);
}
//排好顺序的节点,取出最小的
String minNode = childNodes.first();
//获取指定节点的前一个节点
String preNode = childNodes.lower(node);
//判断是不是最小节点
if (node.equals(minNode)) {
//当前进来的这个线程锁创建的节点就是分布式锁节点
cuurentLockName = node;
return;
}
//用于阻塞未获取锁线程
CountDownLatch countDownLatch = new CountDownLatch(1);
//其他线程没拿到锁
if (null != preNode){
//如果前一个节点不为null 则去监听她
Stat exists = zooKeeper.exists(preNode, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted){
//监听到前一个节点的删除事件,则到计数器减一
countDownLatch.countDown();
}
}
});
if (exists != null){
//等待获取锁
countDownLatch.await();
//当前前一个节点删除,到计数器减一之后,当前线程获取锁
cuurentLockName = node;
}
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
/**
* zookeeper解锁
* 主要做的就是删除当前锁节点删除
*/
public void unlock(){
try{
if (StringUtils.isNotBlank(cuurentLockName)){
zooKeeper.delete(cuurentLockName,-1);
}
}catch (Exception e){
throw new RuntimeException(e);
}
}
}
zkClient实现分布式锁
/**
* zkClient客户端实现分布式锁
* 基于有序临时节点
*/
public class ZkClientLock {
private ZkClient zooKeeper;
private static final String ZK_ADDRESS = "127.0.0.1";
//锁根节点
private static String rootName = "/zklock";
//锁节点
private String lockName;
//当前锁节点名称
private String cuurentLockName;
//zookeeper 超时时间
private int sessionTimeout = 100000;
/**
* 创建连接
* @param lockName
*/
public ZkClientLock(String lockName) {
this.lockName = lockName;
try{
zooKeeper = new ZkClient(ZK_ADDRESS);
//创建一个根节点/lock
if (!zooKeeper.exists(rootName)){
zooKeeper.create(rootName,"rootLock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}catch (Exception e){
throw new RuntimeException(e);
}
}
/**
* 加锁
*/
public void lock(){
String node = zooKeeper.create(rootName + "/" + lockName,
"zkclient".getBytes(), CreateMode.PERSISTENT_SEQUENTIAL);
//获取所有子节点
List<String> subNodes = zooKeeper.getChildren(rootName);
TreeSet<String> sortNodes = new TreeSet<>();
for (String subNode : subNodes){
sortNodes.add(rootName + "/" +subNode);
}
//获取最小节点
String firstNode = sortNodes.first();
//获取前一个节点
String preNode = sortNodes.lower(node);
if (firstNode.equals(node)){
//则当前节点为锁节点
cuurentLockName= node;
return;
}
CountDownLatch countDownLatch = new CountDownLatch(1);
if (StringUtils.isNotBlank(preNode)){
//说明非锁节点,需要等待,并监听前一个节点
boolean exists = zooKeeper.exists(preNode);
if (exists){
//监听前一个节点
zooKeeper.subscribeDataChanges(preNode, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
countDownLatch.countDown();
}
});
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
cuurentLockName = node;
}
}
}
/**
* zookeeper解锁
* 主要做的就是删除当前锁节点删除
*/
public void unlock(){
try{
if (StringUtils.isNotBlank(cuurentLockName)){
zooKeeper.delete(cuurentLockName,-1);
}
}catch (Exception e){
throw new RuntimeException(e);
}
}
}
网友评论