引言
在程序开发过程中不得不考虑的就是并发问题。在java中对于同一个jvm而言,jdk已经提供了lock和同步等。但是在分布式情况下,往往存在多个进程对一些资源产生竞争关系,而这些进程往往在不同的机器上,这个时候jdk中提供的已经不能满足。分布式锁顾明思议就是可以满足分布式情况下的并发锁。 下面我们讲解怎么利用zk实现分布式锁。
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
ZooKeeper的架构通过冗余服务实现高可用性。因此,如果第一次无应答,客户端就可以询问另一台ZooKeeper主机。ZooKeeper节点将它们的数据存储于一个分层的命名空间,非常类似于一个文件系统或一个前缀树结构。客户端可以在节点读写,从而以这种方式拥有一个共享的配置服务。更新是全序的。
基于ZooKeeper分布式锁的流程
在zookeeper指定节点(locks)下创建临时顺序节点node_n
获取locks下所有子节点children
对子节点按节点自增序号从小到大排序
判断本节点是不是第一个子节点,若是,则获取锁;若不是,则监听比该节点小的那个节点的删除事件
若监听事件生效,则回到第二步重新进行判断,直到获取到锁
具体实现
下面就具体使用java和zookeeper实现分布式锁,操作zookeeper使用的是apache提供的zookeeper的包。
通过实现Watch接口,实现process(WatchedEventevent)方法来实施监控,使CountDownLatch来完成监控,在等待锁的时候使用CountDownLatch来计数,等到后进行countDown,停止等待,继续运行。
以下整体流程基本与上述描述流程一致,只是在监听的时候使用的是CountDownLatch来监听前一个节点。
代码部分
整个代码结构如图:
1、首先创建一个接口Lock,顾名思义,大家很容易想到JDK下面有一个Lock的接口,但是这里我并不打算直接使用这个接口,只是模拟了该接口里的方法自定义实现,这样可以按需使用,请看里面的代码,
public interface Lock {
public void getLock();
public void unlock();
}
接口里面比较简单,一个是获取锁的接口,一个是释放锁的接口,
2、再来看AbstratcLock这个类,这是个抽象类,里面有一个获取锁的方法和两个待实现的抽象方法,
//定义基本模板
public abstract class AbstratcLock implements Lock{
public void getLock() {
if(tryLock()){
System.out.println("##获取锁的资源===============");
}else{
waitLock();
getLock();
}
}
public abstract boolean tryLock();
public abstract void waitLock();
}
3、zookeeper的基本配置项在ZookeeperAbstractLock这个类里面,在这里,我没有使用单独的配置文件或者类去配置连接zookeeper的配置信息,就在这个抽象类里面全部搞定,
public abstract class ZookeeperAbstractLock extends AbstratcLock{
private static String CONNECT_PATH = "127.0.0.1:2181";
protected ZkClient zkClient = new ZkClient(CONNECT_PATH);
protected static final String PATH = "/lock";
protected static final String PATH2 = "/lock2";
}
4、接下来是zookeeper实现具体的分布式锁的业务逻辑部分,实现的思路前面已经解释过,再次总结一下就是,zookeeper基于内存实现的一种文件树节点,一旦某个线程成功创建了某个节点,其他线程继续创建同名节点就无法成功,但该线程可以注册一个监听器,监听上一个线程对该节点的变换情况,通过这个机制来判定对该节点的锁的持有和释放,从而实现效果,下面通过两种方式来实现,
4.1 直接通过一个节点实现分布式锁,代码如下,
public class ZookeeperDistributeLock extends ZookeeperAbstractLock{
private CountDownLatch countDownLatch = null;
//尝试获得锁
@Override
public boolean tryLock() {
try {
zkClient.createEphemeral(PATH);
return true;
} catch (Exception e) {
return false;
}
}
//监听某个节点,匿名回调函数实现对节点信息变化的监听,
@Override
public void waitLock() {
//一旦zookeeper检测到节点信息的变化,就会触发匿名匿名回调函数,通知订阅的客户端,即zkClient
IZkDataListener iZkDataListener = new IZkDataListener() {
public void handleDataDeleted(String path) throws Exception {
//唤醒被等待的线程
if(countDownLatch != null){
countDownLatch.countDown();
}
}
public void handleDataChange(String path, Object data) throws Exception {
}
};
//注册事件监听
zkClient.subscribeDataChanges(PATH, iZkDataListener);
//如果节点存在了,则需要等待一直到接收到了事件通知
if(zkClient.exists(PATH)){
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
zkClient.unsubscribeDataChanges(PATH, iZkDataListener);
}
//释放锁
public void unlock() {
if(zkClient != null){
zkClient.delete(PATH);
zkClient.close();
System.out.println("释放锁资源");
}
}
}
5、下面是测试类,模拟50个线程并发生成订单的动作,OrderService该类,代码如下,
public class OrderService implements Runnable{
//订单号生成类
private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
private Lock lock = new ZookeeperDistributeLock();
//private Lock lock = new ZookeeperDistributeLock2();
public void run() {
getNumber();
}
public void getNumber(){
try {
lock.getLock();
String number = orderNumGenerator.getNumber();
System.out.println(Thread.currentThread().getName() + ",产生了订单:" + number);
} catch (Exception e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
public static void main(String[] args) {
System.out.println("##生成了订单####");
for(int i=0;i<50;i++){
new Thread(new OrderService()).start();
}
}
}
将生成订单的类的代码也附上,
public class OrderNumGenerator {
public static int count =0;
private Lock lock = new ReentrantLock();
public String getNumber(){
try {
lock.lock();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
return sdf.format(new Date()) + "_" + ++count;
} finally {
lock.unlock();
}
}
}
下面运行上述的OrderService的main函数,看控制台输出结果,
上述50个线程很快就执行完毕了,而且没有出现任何问题,我们知道zookeeper是基于内存型的分布式协调服务器,大家有没有发现,我这里只创建了一个文件节点,就可以实现效果,这主要是得益于内存操作的快速,但是问题来了,如果线程数量足够多,等于是某个线程释放了锁,其他的线程一起去争夺锁,那样,不管内存的执行速度有多快,总会有并发争夺的时候出现,所以下面演示用zookeeper的临时有序节点的方式实现,
6、实现的步骤上面已经说明,这里再说一下,主要步骤是:
建立一个节点,这里为:lock2 。节点类型为持久节点(PERSISTENT)
每当进程需要访问共享资源时,会调用分布式锁的lock()或tryLock()方法获得锁,这个时候会在第一步创建的lock节点下建立相应的顺序子节点,节点类型为临时顺序节点(EPHEMERAL_SEQUENTIAL),由于在这样的节点模式下,继续创建同名节点,会直接在该节点下生成一个有序的临时子节点编号,从小到大,依次排序;
在建立子节点后,对lock下面的所有以name开头的子节点进行排序,判断刚刚建立的子节点顺序号是否是最小的节点,假如是最小节点,则获得该锁对资源进行访问。
假如不是该节点,就获得该节点的上一顺序节点,并给该节点是否存在注册监听事件。同时在这里阻塞。等待监听事件的发生,获得锁控制权。
当调用完共享资源后,调用unlock()方法,关闭zk,进而可以引发监听事件,释放该锁。
代码如下,
public class ZookeeperDistributeLock2 extends ZookeeperAbstractLock{
private CountDownLatch countDownLatch = null;
private String beforePath;//前一个节点
private String currentPath;//当前节点
//初始化主节点,如果不存在则创建
public ZookeeperDistributeLock2(){
if(!this.zkClient.exists(PATH2)){
this.zkClient.createPersistent(PATH2);
}
}
@Override
public boolean tryLock() {
//基于lock2节点,新建一个临时节点
if(currentPath == null || currentPath.length() <= 0){
currentPath = this.zkClient.createEphemeralSequential(PATH2 + "/", beforePath);
}
//获取所有临时节点并进行排序
List children = this.zkClient.getChildren(PATH2);
Collections.sort(children);
if(currentPath.equals(PATH2 + "/" + children.get(0))){
return true;
}else{
//如果当前节点在节点列表中不是排第一的位置,则获取当前节点前面的节点,并赋值
int wz = Collections.binarySearch(children, currentPath.substring(7));
beforePath = PATH2 + "/" + children.get(wz-1);
}
return false;
}
@Override
public void waitLock() {//等待锁
IZkDataListener iZkDataListener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
//唤醒被等待的线程
if(countDownLatch != null){
countDownLatch.countDown();
}
}
public void handleDataChange(String path, Object data) throws Exception {
}
};
//注册事件,对前一个节点进行监听
zkClient.subscribeDataChanges(beforePath, iZkDataListener);
//如果节点存在了,则需要等待一直到接收到事件通知
if(zkClient.exists(beforePath)){
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
zkClient.unsubscribeDataChanges(beforePath, iZkDataListener);
}
//释放锁
public void unlock() {
zkClient.delete(currentPath);
zkClient.close();
}
}
运行main函数,
public class OrderService implements Runnable{
//订单号生成类
private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
//方式1实现
//private Lock lock = new ZookeeperDistributeLock();
//方式2实现
private Lock lock = new ZookeeperDistributeLock2();
public void run() {
getNumber();
}
public void getNumber(){
try {
lock.getLock();
String number = orderNumGenerator.getNumber();
System.out.println(Thread.currentThread().getName() + ",产生了订单:" + number);
} catch (Exception e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
public static void main(String[] args) {
System.out.println("##生成了订单####");
for(int i=0;i<50;i++){
new Thread(new OrderService()).start();
}
}
}
看控制台打印的结果,
同样得到了结果,但是方式2的实现上更可靠,这是基于zookeeper自身的可靠性机制实现的;
以上就是使用zookeeper实现分布式锁的过程,当然实际工作中,还可以使用redis,mysql或者其他开源框架也是可以的,个人觉得使用zookeeper还是比较简单而且具有天然的优势,因为其可靠性已经通过众多的其他各类框架和应用得到了检验,本文到此结束,不足之处,敬请见谅!
---------------------
作者:神秘的葱
原文:https://blog.csdn.net/zhangcongyi420/article/details/84204153
网友评论