/**
* @version 2019/7/12
* @description:
* @since 2019/7/12
*/
public class DistributedLock implements Lock,Watcher {
private ZooKeeper zk=null;
//定义根节点
private String ROOT_LOCK="/locks";
//等待前一个锁
private String WAIT_LOCK;
//当前锁
private String CURRENT_LOCK;
private CountDownLatch countDownLatch;
public DistributedLock() {
try {
//这里地址要填写zookeeper上面的地址
zk=new ZooKeeper("localhost:2181",4000,this);
//判断当前根节点是否存在
Stat exists = zk.exists(ROOT_LOCK, false);
if (exists == null) {
zk.create(ROOT_LOCK,"0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public void lock() {
//如果获得锁成功
if(tryLock()){
System.out.println(Thread.currentThread().getName()+"==="+CURRENT_LOCK+"->获得锁成功");
return;
}
try {
waitForLock(WAIT_LOCK);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private boolean waitForLock(String prev) throws KeeperException, InterruptedException {
//监听上一个锁是否存在
Stat stat = zk.exists(prev,true);
if(stat!=null){
System.out.println(Thread.currentThread().getName()+"->等待锁"+ROOT_LOCK+"/"+prev+"释放");
countDownLatch=new CountDownLatch(1);
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"->获得锁成功");
}
return true;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
try {
//创建临时有序节点
zk.create(ROOT_LOCK+"/","0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+",尝试竞争锁");
//尝试获取根节点下面的所有子节点
List<String> childrens = zk.getChildren(ROOT_LOCK, false);
SortedSet<String> sortedSet=new TreeSet<>();
for(String children:childrens){
sortedSet.add(ROOT_LOCK+"/"+children);
}
//获得当前所有子节点的最小节点
String first = sortedSet.first();
SortedSet<String> lessThenMe=((TreeSet<String>) sortedSet).headSet(CURRENT_LOCK);
//如果当前的锁的节点和字节点中最小的节点相同,表示获得锁成功。
if(CURRENT_LOCK.equals(first)){
return true;
}
//如果没有获得比当前的节点更小的最后一个节点,设置成WAIT_LOCK
if(!lessThenMe.isEmpty()){
WAIT_LOCK=lessThenMe.last();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
System.out.println(Thread.currentThread().getName()+"->释放锁"+CURRENT_LOCK);
try {
zk.delete(CURRENT_LOCK,-1);
CURRENT_LOCK=null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void process(WatchedEvent watchedEvent) {
if(this.countDownLatch!=null){
this.countDownLatch.countDown();
}
}
}
测试类:
public class App {
public static void main(String[] args) throws IOException {
CountDownLatch countDownLatch = new CountDownLatch(10);
for(int i=0;i<10;i++){
new Thread(()->{
try {
countDownLatch.await();
DistributedLock distributedLock = new DistributedLock();
distributedLock.lock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
countDownLatch.countDown();
}
System.in.read();
}
}
结果如下:

网友评论