基本实现思路为基于zookeeper的本身的临时有序节点特性来实现分布式锁,如下图所示
每个客户端都往zookeeper上创建一个临时有序节点,注册完后,取/locks下所有节点中最小的节点,则创建该最小节点的客户端拿到锁。如果当前最小节点是/lock_seq1,那么/lock_seq2边监听/lock_seq1边阻塞等待,当/lock_seq1被删除后,/lock_seq2节点的客户端获得锁,以此类推。
每个节点监听比自己小一个数的临时节点,当监听的节点被删除的(此时认为是释放锁),监听事件被触发
1.代码演示
/**
* @Project: 3.DistributedProject
* @description: 手动实现分布式锁
* @author: sunkang
* @create: 2018-06-24 08:19
* @ModificationHistory who when What
**/
public class DistributedLocks implements Lock,Watcher {
private ZooKeeper zooKeeper = null;
private String ROOT_LOCK ="/locks";
private String CURRENT_LOCK ;
private String WAIT_LOCK;
private CountDownLatch countDownLatch;
//连接才开始操作的
private CountDownLatch sysConectDownLatch;
public DistributedLocks() {
try {
//建立zookeeper的连接
sysConectDownLatch = new CountDownLatch(1);
zooKeeper = new ZooKeeper("192.168.44.129:2181",40000,this);
sysConectDownLatch.await();
//是否存在根节点,如果不存在,则去创建持久化节点
Stat stat = zooKeeper.exists(ROOT_LOCK,false);
if(stat == null){
zooKeeper.create(ROOT_LOCK,"0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public void lock() {
if(tryLock()){
System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+"->获得锁成功");
return ;
}
//添加监控
waitForLocks(WAIT_LOCK);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
//尝试获取锁
@Override
public boolean tryLock() {
try {
//创建当前的节点
CURRENT_LOCK = zooKeeper.create(ROOT_LOCK+"/","0".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
//获取当前的所有的子节点
System.out.println(Thread.currentThread().getName()+"->"+ CURRENT_LOCK+",尝试竞争锁");
List<String> childrens = zooKeeper.getChildren(ROOT_LOCK,false);
SortedSet<String> treeSet =new TreeSet<String>();
for(String children : childrens){
treeSet.add(ROOT_LOCK+"/"+children);
}
//获取最小的节点
String minNode = treeSet.first();
if(CURRENT_LOCK.equals(minNode)){
return true;
}
//获取当前节点的上一个节点
SortedSet<String> lessCurrentSets= treeSet.headSet(CURRENT_LOCK);
if(lessCurrentSets != null){
WAIT_LOCK = lessCurrentSets.last();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
//对上一节点记性监控
private boolean waitForLocks(String pre){
try {
//进行监控
Stat stat= zooKeeper.exists(pre,true);
if(stat!=null){
System.out.println("当前线程"+Thread.currentThread().getName()+"等待"+WAIT_LOCK+"释放");
countDownLatch = new CountDownLatch(1);
countDownLatch.await();
System.out.println("当前线程"+Thread.currentThread().getName()+"获得锁");
return true;
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public void unlock() {
System.out.println(Thread.currentThread().getName()+ "->释放锁"+ CURRENT_LOCK);
try {
zooKeeper.delete(CURRENT_LOCK,-1);
CURRENT_LOCK=null;
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void process(WatchedEvent watchedEvent) {
if( watchedEvent.getState().equals(Event.KeeperState.SyncConnected)){
sysConectDownLatch.countDown();//说明已经建立了连接
}
//当节点的信息发送删除的时候就会触发该监控
if(this.countDownLatch != null){
countDownLatch.countDown();
}
}
}
2.测试演示
- 测试代码,启动是个线程,模仿是个客户端同时访问zookeeper
/**
* @Project: 3.DistributedProject
* @description: 测试
* @author: sunkang
* @create: 2018-06-24 08:44
* @ModificationHistory who when What
**/
public class LocksDemo {
public static void main(String[] args) throws IOException {
final CountDownLatch countDownLatch = new CountDownLatch(10);
for(int i=0;i<10;i++){
new Thread(new Runnable() {
@Override
public void run() {
DistributedLocks lock = null;
try {
countDownLatch.await();
lock = new DistributedLocks();
lock.lock(); //获得锁
//todo 做一些事情
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
},"Thread-"+i).start();
countDownLatch.countDown();
}
}
}
- 观察结果 :可以看到每2秒之后,或有线程获取锁
Thread-1->/locks/0000000087,尝试竞争锁
Thread-7->/locks/0000000088,尝试竞争锁
Thread-4->/locks/0000000089,尝试竞争锁
Thread-8->/locks/0000000091,尝试竞争锁
Thread-3->/locks/0000000090,尝试竞争锁
Thread-2->/locks/0000000092,尝试竞争锁
Thread-9->/locks/0000000095,尝试竞争锁
Thread-5->/locks/0000000094,尝试竞争锁
Thread-6->/locks/0000000096,尝试竞争锁
Thread-0->/locks/0000000093,尝试竞争锁
Thread-1->/locks/0000000087->获得锁成功
当前线程Thread-2等待/locks/0000000091释放
当前线程Thread-8等待/locks/0000000090释放
当前线程Thread-4等待/locks/0000000088释放
当前线程Thread-9等待/locks/0000000094释放
当前线程Thread-6等待/locks/0000000095释放
当前线程Thread-7等待/locks/0000000087释放
当前线程Thread-0等待/locks/0000000092释放
当前线程Thread-3等待/locks/0000000089释放
当前线程Thread-5等待/locks/0000000093释放
Thread-1->释放锁/locks/0000000087
当前线程Thread-7获得锁
Thread-7->释放锁/locks/0000000088
当前线程Thread-4获得锁
Thread-4->释放锁/locks/0000000089
当前线程Thread-3获得锁
Thread-3->释放锁/locks/0000000090
当前线程Thread-8获得锁
Thread-8->释放锁/locks/0000000091
当前线程Thread-2获得锁
Thread-2->释放锁/locks/0000000092
当前线程Thread-0获得锁
Thread-0->释放锁/locks/0000000093
当前线程Thread-5获得锁
Thread-5->释放锁/locks/0000000094
当前线程Thread-9获得锁
Thread-9->释放锁/locks/0000000095
当前线程Thread-6获得锁
Thread-6->释放锁/locks/0000000096
网友评论