一 方案

二 依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
三 代码
public interface Lock {
boolean lock() throws Exception;
boolean unlock();
}
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
public class ZkLock implements Lock {
private String locked_path = null;
private String locked_short_path = null;
private String prior_path = "";
private String ZK_PATH = "/sunTest1";
private String chd_PATH = "v1";
private String LOCK_PREFIX = ZK_PATH + "/" + chd_PATH;
private ZKclient client;
private String taskName;
public ZkLock(String taskName) {
client = ZKclient.getInstance();
client.zkConnect();
this.taskName = taskName;
}
public boolean lock() {
try {
boolean locked = false;
locked = tryLock();
if (locked) {
return true;
}
while (!locked) {
await();
if (checkLocked()) {
locked = true;
}
}
return true;
} catch (Exception e) {
e.printStackTrace();
unlock();
}
return false;
}
public boolean unlock() {
client.delNode(locked_path);
return false;
}
private boolean tryLock() throws Exception {
// 创建临时Znode
locked_path = client.createEphemeralSeqNode(LOCK_PREFIX);
if (null == locked_path) {
throw new Exception("zk error");
}
locked_short_path = getShorPath(locked_path);
// 获取等待的子节点列表 判断自己是否第一个
if (checkLocked()) {
return true;
}
List<String> waiters = getWaiters();
// 判断自己排第几个
int index = binarySearch(waiters, locked_short_path);
if (index < 0) { // 网络抖动,获取到的子节点列表里可能已经没有自己了
throw new Exception("节点没有找到: " + locked_short_path);
}
// 如果自己没有获得锁 则要监听前一个节点
prior_path = ZK_PATH + "/" + waiters.get(index - 1);
return false;
}
private boolean checkLocked() {
// 获取等待的子节点列表
List<String> waiters = getWaiters();
// 如果是第一个,代表自己已经获得了锁
if (locked_short_path.equals(waiters.get(0))) {
// log.info("成功的获取分布式锁,节点为{}", locked_short_path);
return true;
}
return false;
}
private void await() throws Exception {
if (null == prior_path) {
throw new Exception("prior_path error");
}
final CountDownLatch latch = new CountDownLatch(1);
// 订阅比自己次小顺序节点的删除事件
Watcher w = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(taskName + ":" + "监听到的变化 watchedEvent = " + watchedEvent);
latch.countDown();
}
};
client.watchNode(prior_path, w);
latch.await(10, TimeUnit.SECONDS);
}
public String getShorPath(String locked_path) {
return locked_path.substring(ZK_PATH.length() + 1);
}
// 去除了父级目录的路径
public List<String> getWaiters() {
List<String> originalList = client.getAllChilds(ZK_PATH);
// 节点按照编号 升序排列
List<String> waitersSorted = sortWaiters(originalList);
for (String str : waitersSorted) {
System.out.println(str);
}
return waitersSorted;
}
public int binarySearch(List<String> waiters, String locked_short_path) {
return waiters.indexOf(locked_short_path);
}
public List<String> sortWaiters(List<String> waiters) {
return waiters.stream().sorted((v1, v2) -> {
int n1 = Integer.parseInt(v1.substring(chd_PATH.length()));
int n2 = Integer.parseInt(v2.substring(chd_PATH.length()));
return n1 - n2;
}).collect(Collectors.toList());
}
}
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class ZKclient {
public static ZKclient instance;
public ZooKeeper zookeeper;
public static ZKclient getInstance() {
instance = new ZKclient();
return instance;
}
public ZooKeeper zkConnect() {
String path = "127.0.0.1:2181";
try {
zookeeper = new ZooKeeper(path, 30 * 1000, null);
} catch (IOException e) {
e.printStackTrace();
}
return zookeeper;
}
public String createEphemeralSeqNode(String path) throws KeeperException, InterruptedException {
String res = zookeeper.create(path, "x".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
return res;
}
public List<String> getAllChilds(String parentPath) {
List<String> list = new ArrayList<String>();
try {
list = zookeeper.getChildren(parentPath, null);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return list;
}
public boolean delNode(String path) {
try {
zookeeper.delete(path, 0);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
return true;
}
public void watchNode(String path, Watcher w) {
try {
zookeeper.getData(path, w, new Stat());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ZkLockTest {
public static void main(String[] args) {
MyTask task1 = new MyTask("任务1");
task1.start();
MyTask task2 = new MyTask("任务2");
task2.start();
MyTask task3 = new MyTask("任务3");
task3.start();
MyTask task4 = new MyTask("任务4");
task4.start();
}
}
class MyTask extends Thread {
public String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
doTask(name);
}
public void doTask(String name) {
ZkLock lock = new ZkLock(name);
try {
lock.lock();
System.out.println(name + "获取到锁,开始执行任务.");
Thread.sleep(5000);
System.out.println(name + "执行任务完成.");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
四 运行结果
网友评论