美文网首页
Zookeeper伪集群部署与分布式读写锁实现

Zookeeper伪集群部署与分布式读写锁实现

作者: MccreeFei | 来源:发表于2018-10-31 17:16 被阅读0次

    初识Zookeeper

    Zookeeper是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调通知、Master选举、分布式锁和分布式队列等功能。本篇主要是用java多线程模拟实现基于Zookeeper的分布式读写锁。

    Zookeeper伪集群部署实践

    Zookeeper搭建分布式集群至少需要三台服务器,手头确实没有那么多资源,好在Zookeeper允许在一台机器上完成一个伪集群的搭建。所谓伪集群其实就是所有的机器都在同一台机器上,但还是以集群的特性来对外提供服务的。这种模式和集群十分相似,只不过在同一台机器上的不同Zookeeper实例是以不同的端口号来互相通信的。从官网下载到最新的Zookeeper发行版本压缩包:zookeeper-3.4.10.tar.gz,将Zookeeper解压到我的阿里云服务器/usr/local/zookeeper地址下。

    配置

    cp /usr/local/zookeeper/conf/zoo_sample.cfg zoo1.cfg
    cp /usr/local/zookeeper/conf/zoo_sample.cfg zoo2.cfg
    cp /usr/local/zookeeper/conf/zoo_sample.cfg zoo3.cfg
    

    zoo_sample.cfg为模板复制三个伪集群的配置。三个配置文件的详细配置如下:
    zoo1.cfg:

    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/usr/local/zookeeper/data_1/
    clientPort=2181
    server.1=127.0.0.1:2887:3887
    server.2=127.0.0.1:2888:3888
    server.3=127.0.0.1:2889:3889
    

    zoo2.cfg:

    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/usr/local/zookeeper/data_2/
    clientPort=2182
    server.1=127.0.0.1:2887:3887
    server.2=127.0.0.1:2888:3888
    server.3=127.0.0.1:2889:3889
    

    zoo3.cfg:

    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/usr/local/zookeeper/data_3/
    clientPort=2183
    server.1=127.0.0.1:2887:3887
    server.2=127.0.0.1:2888:3888
    server.3=127.0.0.1:2889:3889
    
    • tickTime : 心跳时间,单位是毫秒 ,session的最小超时时间是2*tickTime
    • initLimit:多少个tickTime内,允许其他server连接并初始化数据
    • syncLimit:多少个tickTime内,允许follower同步
    • dataDir:存放zookeeper数据的路径
    • clientPort:监听客户端连接的端口号
    • server.id=ip:port1:port2:其中id表示该服务器的id号,需要注意的是,集群部署必须在dataDir路径下新建一个myid的文件,内容就是当前服务器的id号。ip是当前服务器的ip,port1表示Follower服务器和Leader进行运行时通信和数据同步使用的端口号,port2端口用于Leader选举过程中的投票通信。
      使用下面命令开启三个Zookeeper server服务:

    root@iZwz9hs5ueqrblutmr5bncZ:/usr/local/zookeeper# bin/zkServer.sh start zoo1.cfg
    root@iZwz9hs5ueqrblutmr5bncZ:/usr/local/zookeeper# bin/zkServer.sh start zoo2.cfg
    root@iZwz9hs5ueqrblutmr5bncZ:/usr/local/zookeeper# bin/zkServer.sh start zoo3.cfg

    可以在/usr/local/zookeeper路径下的zookeeper.out文件中看到服务开启的日志信息。

    测试客户端的连接

    在测试前,需要在阿里云服务器管理中心打开相应的Zookeeper客户端连接端口。打开Windows命令行,定位到Zookeeper路径执行下面命令:

    D:\zookeeper-3.4.10>bin\zkCli -server 服务器ip:2181

    发现Windows Zookeeper客户端已经成功连接上阿里云上的Zookeeper集群,创建一个test结点。此时我们连接的是2181端口,也就是伪集群当中zoo1的监听的端口号。如下图所示:

    image

    另外在阿里云服务器中也开启一个客户端的连接,看看我们之前添加的test结点有没有真正的添加到Zookeeper集群中。

    image
    上图可以看到test结点确实已经在集群当中了,而且此时连接的是2183端口,也就是zoo3这个server。由此验证了Zookeeper伪集群搭建成功。

    分布式读写锁实现

    下面就基于上面搭建的Zookeeper伪集群,实现分布式读写锁。我们都知道当一个事务获得读锁之后,在这之后的事务只能获取读锁,写锁获取必须等到所有读锁全部释放。而写锁一旦获取后其他事务的读锁以及写锁都必须等待该写锁释放后获取。那么Zookeeper是怎么实现这一特性的呢?
    首先Zookeeper创建节点时可以创建4中形式的节点,分别是持久节点(PERSISTENT)、持久顺序节点(PERSISTENT_SEQUENTIAL)、临时节点(EPHEMERAL)、临时顺序节点(EPHEMERAL_SEQUENTIAL)。

    • 持久节点:该数据节点被创建后,就会一直存在于Zookeeper服务器上,直到有删除操作主动清除这个节点。
    • 持久顺序节点:基本特性和持久节点是一致的,额外的特性表现在顺序性上,在创建节点的过程中,Zookeeper会自动为给定节点名加上一个递增的数字后缀作为新的节点名。
    • 临时节点:临时节点的生命周期和客户端的会话绑定在一起,也就是说,如果客户端会话失效,那么节点就会被自动清理掉。另外,临时节点不能创建子结点。
    • 临时顺序节点:特性和临时节点一致,额外拥有顺序的特性。

    其实分布式读写锁就是基于Zookeeper顺序节点特性来实现的。每个事务想要获得锁时都去同一个节点/lock下,创建命名规范为[hostname]-锁类型-序号的临时顺序节点。如果自身想要获取的是读锁,那么只要查看/lock下节点顺序比自身小的节点中没有写类型的节点便可获得读锁。如果自身想要获取写锁,那么只要看到/lock下自己是顺序最小的节点便可获得写锁。下面是多线程模拟事务的详细实现:

    public class DistributedLockDemo {
        private static CountDownLatch countDownLatch = new CountDownLatch(1);
        private static String rootPath = "/lock";
        private static ZooKeeper zooKeeper;
    
        public static void main(String[] args) {
            try {
                zooKeeper = new ZooKeeper("119.23.216.241:2181", 5000, null); //连接我的Zookeeper集群
                zooKeeper.create(rootPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                WriteThread[] writeThreads = new WriteThread[5];  //创建5个写锁线程
                ReadThread[] readThreads = new ReadThread[10];    //创建10个读锁线程
                for (int i = 0; i < writeThreads.length; i++) {
                    writeThreads[i] = new WriteThread("WriteThread_" + i);
                    writeThreads[i].start();
                }
                for (int i = 0; i < readThreads.length; i++) {
                    readThreads[i] = new ReadThread("ReadThread_" + i);
                    readThreads[i].start();
                }
                TimeUnit.SECONDS.sleep(1);
                countDownLatch.countDown();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        static class WriteThread extends Thread {
            public WriteThread(String name){
                super(name);
            }
            @Override
            public void run() {
                try {
                    countDownLatch.await(); //多个线程同一时间竞争资源
                    //path示例:/lock/WriteThread_0-W-0000000001
                    String path = zooKeeper.create(rootPath + "/" + Thread.currentThread().getName() + "-W-", "".getBytes(),
                            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                    System.out.println(Thread.currentThread().getName() + ": try to acquire write lock...");
                    while (!canGetWriteLock(path)) {
                        TimeUnit.MILLISECONDS.sleep(500);
                    }
                    afterGetLockDo(path, true);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        static class ReadThread extends Thread {
            public ReadThread(String name){
                super(name);
            }
            @Override
            public void run() {
                try {
                    countDownLatch.await();
                    String path = zooKeeper.create(rootPath + "/" + Thread.currentThread().getName() + "-R-", "".getBytes(),
                            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                    System.out.println(Thread.currentThread().getName() + ": try to acquire read lock...");
                    while (!canGetReadLock(path)){
                        TimeUnit.MILLISECONDS.sleep(500);
                    }
                    afterGetLockDo(path, false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }
    
        //判断是否可以获取写锁
        private static boolean canGetWriteLock(String path) {
            List<String> children = null; // /lock下所有子节点列表
            try {
                children = zooKeeper.getChildren(rootPath, false);
                //基于节点名当中的序号进行排序
                Collections.sort(children, new Comparator<String>() {
                    public int compare(String o1, String o2) {
                        int index1 = o1.lastIndexOf("-");
                        int index2 = o2.lastIndexOf("-");
                        long a = Long.parseLong(o1.substring(index1 + 1));
                        long b = Long.parseLong(o2.substring(index2 + 1));
                        return (int) (a - b);
                    }
                });
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //只要最小的节点是自身便可获得写锁
            boolean result = path.replace("/lock/", "").equals(children.get(0));
            return result;
        }
    
        //判断是否可以获得读锁
        private static boolean canGetReadLock(String path){
            List<String> children = null;
            try {
                children = zooKeeper.getChildren(rootPath, false);
                Collections.sort(children, new Comparator<String>() {
                    public int compare(String o1, String o2) {
                        int index1 = o1.lastIndexOf("-");
                        int index2 = o2.lastIndexOf("-");
                        long a = Long.parseLong(o1.substring(index1 + 1));
                        long b = Long.parseLong(o2.substring(index2 + 1));
                        return (int) (a - b);
                    }
                });
                //只要序号比自己小的节点中没有写类型节点便可获得读锁
                int index = children.indexOf(path.replace("/lock/", ""));
                for (int i = 0; i < index; i++) {
                    String child = children.get(i);
                    int k = child.indexOf("-");
                    if (child.substring(k+1, k+2).equals("W")) return false;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
            return true;
        }
    
        //获得锁后所做的工作
        private static void afterGetLockDo(String path, boolean isWriteLock) {
            if (isWriteLock) {
                System.out.println(Thread.currentThread().getName() + ": get write lock...");
                try {
                    TimeUnit.SECONDS.sleep(2); //模拟写事务所做的工作
                    System.out.println(Thread.currentThread().getName() + ": release write lock...");
                    zooKeeper.delete(path, -1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            } else {
                System.out.println(Thread.currentThread().getName() + ": get read lock...");
                try {
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName() + ": release read lock...");
                    zooKeeper.delete(path, -1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    

    最后的运行结果如下图所示:

    image

    ** 可以看到所有线程都是符合正常的读写锁逻辑并且获取到相关的锁资源。至此,Zookeeper读写锁就实现啦。 **

    相关文章

      网友评论

          本文标题:Zookeeper伪集群部署与分布式读写锁实现

          本文链接:https://www.haomeiwen.com/subject/qkoatqtx.html