美文网首页
Zookeeper应用实践

Zookeeper应用实践

作者: 奋斗的蛐蛐 | 来源:发表于2021-02-15 22:16 被阅读0次

Zookeeper应用实践

ZooKeeper是⼀个典型的发布/订阅模式的分布式数据管理与协调框架,我们可以使用它来进行分布式 数据的发布与订阅。另⼀方⾯面,通过对ZooKeeper中丰富的数据节点类型进行交叉使用,配合Watcher 事件通知机制,可以⾮常方便地构建⼀系列分布式应用中都会涉及的核心功能,如数据发布/订阅、命名 服务、集群管理、Master选举、分布式锁和分布式队列等。那接下来就针对这些典型的分布式应⽤场景来做下介绍

Zookeeper 的两大特性

  1. 客户端如果对Zookeeper的数据节点注册Watcher监听,那么当该数据节点的内容或是其子节点列表发生变更时,Zookeeper服务器就会向订阅的客户端发送变更通知。
  2. 对在Zookeeper上创建的临时节点,一旦客户端与服务器之间的会话失效,那么临时节点也会被自动删除

利用其两大特性,可以实现集群机器存活监控系统,若监控系统在/clusterServers节点上注册一个 Watcher监听,那么但凡进行动态添加机器的操作,就会在/clusterServers节点下创建⼀个临时节 点:/clusterServers/[Hostname],这样,监控系统就能够实时监测机器的变动情况。

服务器动态上下线监听

分布式系统中,主节点会有多台,主节点可能因为任何原因出现宕机或者下线,⽽任意一台客户端都要能实时感知到主节点服务器的上下线。


服务器动态上下线监听.png
具体实现

服务端:

package com.zookeeper.monitor;

import org.I0Itec.zkclient.ZkClient;

/**
 * @description:
 * @author: 
 * @date: 2020-07-26 19:48
 **/
public class ServerMain {

    private ZkClient zkClient = null;

    //初始化,判断是否有Servers目录
    public ServerMain() {
        this.zkClient = new ZkClient("linux121:2181,linux122:2181,linux123:2181");
        if (!zkClient.exists("/servers")) {
            zkClient.createPersistent("/servers");
        }
    }


    //创建一个临时目录,告诉zookeeper,我这个服务上线了
    public void publish(String host, int port) {
        String path = zkClient.createEphemeralSequential("/servers/", host + ":" + port);
        System.err.println("创建成功,路径为:" + path + ", 值为:" + host + ":" + port);
    }


    /**
     * 主方法
     *
     * @param args
     */
    public static void main(String[] args) {
        ServerMain serverMain = new ServerMain();
        //将自己的节点发不到zookeeper
        serverMain.publish(args[0], Integer.parseInt(args[1]));
        //创建时间服务
        new Thread(new TimeThreadServer(Integer.parseInt(args[1]))).start();
    }
}

package com.zookeeper.monitor;

import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Date;

/**
 * @description:
 * @author: 
 * @date: 2020-07-26 19:58
 **/
public class TimeThreadServer implements Runnable {


    private int port;

    public TimeThreadServer(int port) {
        this.port = port;
    }

    @Override
    public void run() {

        try {
            //要监听的端口号
            ServerSocket serverSocket = new ServerSocket(port);
            Socket accept = null;
            while (true) {
                accept = serverSocket.accept();
                OutputStream outputStream = accept.getOutputStream();
                outputStream.write(new Date().toLocaleString().getBytes());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

客户端:

package com.zookeeper.monitor;

import org.I0Itec.zkclient.ZkClient;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @description:
 * @author:
 * @date: 2020-07-26 20:04
 **/
public class ClientMain {


    private ZkClient zkClient;

    //获取路径
    private List<String> dataList = new ArrayList<>();

    public ClientMain() {
        this.zkClient = new ZkClient("linux121:2181,linux122:2181,linux123:2181");
    }


    public void subscription() {
        //获取初始节点
        List<String> list = zkClient.getChildren("/servers");
        for (int i = 0; i < list.size(); i++) {
            Object o = zkClient.readData("/servers/" + list.get(i));
            dataList.add(String.valueOf(o));
        }
        //监控/servers目录下面的节点
        zkClient.subscribeChildChanges("/servers", (String path, List<String> children) -> {
            List<String> dataList = new ArrayList<>();
            for (int i = 0; i < children.size(); i++) {
                Object o = zkClient.readData("/servers/" + children.get(i));
                dataList.add(String.valueOf(o));
            }
            this.dataList = dataList;
            System.err.println("接收到服务器的最新变化信息为:" + dataList);
        });
    }

    public void getTime() {
        String ipPort = dataList.get(new Random().nextInt(dataList.size()) - 1);
        String[] ipPortArr = ipPort.split(":");
        Socket socket = null;
        InputStream inputStream = null;
        OutputStream outputStream = null;
        try {
            socket = new Socket(ipPortArr[0], Integer.parseInt(ipPortArr[1]));
            inputStream = socket.getInputStream();
            outputStream = socket.getOutputStream();
            //请求数据
            outputStream.write("query time ....".getBytes());
            outputStream.flush();

            byte[] bytes = new byte[1024];
            inputStream.read(bytes);
            System.out.println("client端接收到server:+" + ipPort + "+返回结果:" + new String(bytes));
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                outputStream.close();
                inputStream.close();
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) throws InterruptedException {
        ClientMain clientMain = new ClientMain();
        clientMain.subscription();
        while (true) {
            clientMain.getTime();
            TimeUnit.SECONDS.sleep(2);
        }
    }
}

分布式锁

zk实现分布式锁

利用Zookeeper可以创建临时带序号节点的特性来实现⼀个分布式锁

实现思路:

  • 锁就是zk指定目录下序号最小的临时序列节点,多个系统的多个线程都要在此⽬录下创建临时的顺 序节点,因为Zk会为我们保证节点的顺序性,所以可以利用节点的顺序进行锁的判断。
  • 每个线程都是先创建临时顺序节点,然后获取当前⽬录下最小的节点(序号),判断最小节点是不是 当前节点,如果是那么获取锁成功,如果不是那么获取锁失败。
  • 获取锁失败的线程获取当前节点上一个临时顺序节点,并对此节点进行监听,当该节点删除的时候(上一个线程执行结束删除或者是掉线zk删除临时节点)这个线程会获取到通知,代表获取到了锁。


    zk实现分布式锁.png

    main方法

package com.zookeeper.lock;


import java.util.concurrent.TimeUnit;

/**
 * @description:
 * @author:
 * @date: 2020-07-26 20:41
 **/
public class TestMain {

    public static void main(String[] args) {
        for (int i = 0; i < 20; i++) {
            new Thread(new ThreadClient()).start();
        }
    }

    static class ThreadClient implements Runnable {

        private DisClient disClient = new DisClient();

        @Override
        public void run() {
            disClient.getLock();
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            disClient.unLock();
        }
    }
}

锁方法:

package com.zookeeper.lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.lang.StringUtils;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @description: 抢锁
 * 1. 去zk创建临时序列节点,并获取到序号
 * 2. 判断⾃己创建节点序号是否是当前节点最⼩序号,如果是则获取锁 执行相关操作,最后要释放锁
 * 3. 不是最⼩节点,当前线程需要等待,等待你的前一个序号的节点被删除,然后再次判断⾃己是否是最⼩节点。。。
 * @author: 
 * @date: 2020-07-26 20:22
 **/
public class DisClient {


    private ZkClient zkClient;

    String currNode = null;

    String preNode = null;

    private String lockPath = "/lock/";

    private String childrenPath = "/lock";

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public DisClient() {
        this.zkClient = new ZkClient("linux121:2181,linux122:2181,linux123:2181");
    }


    public void getLock() {
        String name = Thread.currentThread().getName();
        if (tryGetLock()) {
            System.err.println("线程:" + name + "获取到了锁");
        } else {
            System.err.println(name + ":获取锁失败,进⼊入等待状态");
            //等待获取锁
            waitLock();
            //再次尝试获取锁
            getLock();
        }
    }

    //判断是否获取到了锁
    private boolean tryGetLock() {
        if (StringUtils.isEmpty(currNode)) {
            //在lock目录下面创建临时顺序文件夹
            currNode = zkClient.createEphemeralSequential(lockPath, 1);
            System.err.println("currNode=>" + currNode);
        }
        List<String> childrenPath = zkClient.getChildren(this.childrenPath);
        System.err.println("childrenPath===>>>" + childrenPath);
        //默认升序,则下标索引为0的,为最小目录
        Collections.sort(childrenPath);
        //如果最小的节点目录就是自己,说明自己获取锁
        if (currNode.endsWith(childrenPath.get(0))) {
            return true;
        }
        //否则查找当前节点的前一个节点
        int i = Collections.binarySearch(childrenPath, currNode.substring(lockPath.length()));
        preNode = lockPath + childrenPath.get(i - 1);
        return false;
    }


    //订阅比自己小的节点,等待获取锁
    private void waitLock() {

        //监听器
        IZkDataListener iZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {

            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
                countDownLatch.countDown();
            }
        };
        System.err.println("当前节点为:" + currNode + "监控的节点为:" + preNode);
        zkClient.subscribeDataChanges(preNode, iZkDataListener);

        if (zkClient.exists(preNode)) {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //解除监听
        zkClient.unsubscribeDataChanges(preNode, iZkDataListener);
    }

    public void unLock() {
        zkClient.delete(currNode);
        zkClient.close();
    }

}

分布式锁的实现可以是 Redis、Zookeeper,相对来说生产环境如果使⽤分布式锁可以考虑使用Redis实 现⽽非Zk。

相关文章

网友评论

      本文标题:Zookeeper应用实践

      本文链接:https://www.haomeiwen.com/subject/ofmcsktx.html