美文网首页
分布式锁-zk实现

分布式锁-zk实现

作者: sun_7191 | 来源:发表于2020-05-28 22:37 被阅读0次

一 方案

二 依赖

<dependency>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

<version>3.4.12</version>

</dependency>

三 代码

public interface Lock {

boolean lock() throws Exception;

boolean unlock();

}

import java.util.List;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

import java.util.stream.Collectors;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

public class ZkLock implements Lock {

private String locked_path = null;

private String locked_short_path = null;

private String prior_path = "";

private String ZK_PATH = "/sunTest1";

private String chd_PATH = "v1";

private String LOCK_PREFIX = ZK_PATH + "/" + chd_PATH;

private ZKclient client;

private String taskName;

public ZkLock(String taskName) {

client = ZKclient.getInstance();

client.zkConnect();

this.taskName = taskName;

}

public boolean lock() {

try {

boolean locked = false;

locked = tryLock();

if (locked) {

return true;

}

while (!locked) {

await();

if (checkLocked()) {

locked = true;

}

}

return true;

} catch (Exception e) {

e.printStackTrace();

unlock();

}

return false;

}

public boolean unlock() {

client.delNode(locked_path);

return false;

}

private boolean tryLock() throws Exception {

// 创建临时Znode

locked_path = client.createEphemeralSeqNode(LOCK_PREFIX);

if (null == locked_path) {

throw new Exception("zk error");

}

locked_short_path = getShorPath(locked_path);

// 获取等待的子节点列表 判断自己是否第一个

if (checkLocked()) {

return true;

}

List<String> waiters = getWaiters();

// 判断自己排第几个

int index = binarySearch(waiters, locked_short_path);

if (index < 0) { // 网络抖动,获取到的子节点列表里可能已经没有自己了

throw new Exception("节点没有找到: " + locked_short_path);

}

// 如果自己没有获得锁 则要监听前一个节点

prior_path = ZK_PATH + "/" + waiters.get(index - 1);

return false;

}

private boolean checkLocked() {

// 获取等待的子节点列表

List<String> waiters = getWaiters();

// 如果是第一个,代表自己已经获得了锁

if (locked_short_path.equals(waiters.get(0))) {

// log.info("成功的获取分布式锁,节点为{}", locked_short_path);

return true;

}

return false;

}

private void await() throws Exception {

if (null == prior_path) {

throw new Exception("prior_path error");

}

final CountDownLatch latch = new CountDownLatch(1);

// 订阅比自己次小顺序节点的删除事件

Watcher w = new Watcher() {

@Override

public void process(WatchedEvent watchedEvent) {

System.out.println(taskName + ":" + "监听到的变化 watchedEvent = " + watchedEvent);

latch.countDown();

}

};

client.watchNode(prior_path, w);

latch.await(10, TimeUnit.SECONDS);

}

public String getShorPath(String locked_path) {

return locked_path.substring(ZK_PATH.length() + 1);

}

// 去除了父级目录的路径

public List<String> getWaiters() {

List<String> originalList = client.getAllChilds(ZK_PATH);

// 节点按照编号 升序排列

List<String> waitersSorted = sortWaiters(originalList);

for (String str : waitersSorted) {

System.out.println(str);

}

return waitersSorted;

}

public int binarySearch(List<String> waiters, String locked_short_path) {

return waiters.indexOf(locked_short_path);

}

public List<String> sortWaiters(List<String> waiters) {

return waiters.stream().sorted((v1, v2) -> {

int n1 = Integer.parseInt(v1.substring(chd_PATH.length()));

int n2 = Integer.parseInt(v2.substring(chd_PATH.length()));

return n1 - n2;

}).collect(Collectors.toList());

}

}

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.Stat;

public class ZKclient {

public static ZKclient instance;

public ZooKeeper zookeeper;

public static ZKclient getInstance() {

instance = new ZKclient();

return instance;

}

public ZooKeeper zkConnect()  {

String path = "127.0.0.1:2181";

try {

zookeeper = new ZooKeeper(path, 30 * 1000, null);

} catch (IOException e) {

e.printStackTrace();

}

return zookeeper;

}

public String createEphemeralSeqNode(String path) throws KeeperException, InterruptedException {

String res = zookeeper.create(path, "x".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,

CreateMode.EPHEMERAL_SEQUENTIAL);

return res;

}

public List<String> getAllChilds(String parentPath) {

List<String> list = new ArrayList<String>();

try {

list = zookeeper.getChildren(parentPath, null);

} catch (KeeperException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

return list;

}

public boolean delNode(String path) {

try {

zookeeper.delete(path, 0);

} catch (InterruptedException e) {

e.printStackTrace();

} catch (KeeperException e) {

e.printStackTrace();

}

return true;

}

public void watchNode(String path, Watcher w) {

try {

zookeeper.getData(path, w, new Stat());

} catch (Exception e) {

e.printStackTrace();

}

}

}

public class ZkLockTest {

public static void main(String[] args) {

MyTask task1 = new MyTask("任务1");

task1.start();

MyTask task2 = new MyTask("任务2");

task2.start();

MyTask task3 = new MyTask("任务3");

task3.start();

MyTask task4 = new MyTask("任务4");

task4.start();

}

}

class MyTask extends Thread {

public String name;

public MyTask(String name) {

this.name = name;

}

@Override

public void run() {

doTask(name);

}

public void doTask(String name) {

ZkLock lock = new ZkLock(name);

try {

lock.lock();

System.out.println(name + "获取到锁,开始执行任务.");

Thread.sleep(5000);

System.out.println(name + "执行任务完成.");

} catch (Exception e) {

e.printStackTrace();

} finally {

lock.unlock();

}

}

}

四 运行结果

相关文章

网友评论

      本文标题:分布式锁-zk实现

      本文链接:https://www.haomeiwen.com/subject/yunzahtx.html