美文网首页
ZooKeepr之应用实例

ZooKeepr之应用实例

作者: 冰河winner | 来源:发表于2020-06-15 00:38 被阅读0次

    1、 Java API

    客户端要连接 Zookeeper 服务器可以通过创建 org.apache.zookeeper. ZooKeeper 的一个实例对象,然后调用这个类提供的接口来和服务器交互。

    ZooKeeper 主要是用来维护和监控一个目录节点树中存储的数据的状态,所有我们能够操作 ZooKeeper 和操作目录节点树大体一样,如创建一个目录节点,给某个目录节点设置数据,获取某个目录节点的所有子目录节点,给某个目录节点设置权限和监控这个目录节点的状态变化。

    下面通过代码实例,来熟悉一下Java API的常用方法。

    public class ZkTest {
       private static final String CONNECT_STRING = "127.0.0.1:2181";
       private static final int SESSION_TIMEOUT = 3000;
    
       public static void main( String[] args ) throws Exception
       {
           /* 定义一个监控所有节点变化的Watcher */
           Watcher allChangeWatcher = new Watcher()
           {
               @Override
    
               public void process( WatchedEvent event )
               {
                   System.out.println( "**watcher receive WatchedEvent** changed path: " + event.getPath()
                               \ + "; changed type: " + event.getType().name() );
               }
           };
    
       /* 初始化一个与ZK连接。三个参数: */
       /* 1、要连接的服务器地址,"IP:port"格式; */
       /* 2、会话超时时间 */
       /* 3、节点变化监视器 */
           ZooKeeper zk = new ZooKeeper( CONNECT_STRING, SESSION_TIMEOUT, allChangeWatcher );
           
       /* 新建节点。四个参数:1、节点路径;2、节点数据;3、节点权限;4、创建模式 */
           zk.create( "/myName", "chenlongfei".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
           System.out.println( "create new node '/myName'" );
    
           /* 判断某路径是否存在。两个参数:1、节点路径;2、是否监控(Watcher即初始化ZooKeeper时传入的Watcher) */
           Stat beforSstat = zk.exists( "/myName", true );
           System.out.println( "Stat of '/myName' before change : " + beforSstat.toString() );
    
           /* 修改节点数据。三个参数:1、节点路径;2、新数据;3、版本,如果为-1,则匹配任何版本 */
           Stat afterStat = zk.setData( "/myName", "clf".getBytes(), -1 );
           System.out.println( "Stat of '/myName' after change: " + afterStat.toString() );
    
           /* 获取所有子节点。两个参数:1、节点路径;2、是否监控该节点 */
           List<String> children = zk.getChildren( "/", true );
           System.out.println( "children of path '/': " + children.toString() );
    
           /* 获取节点数据。三个参数:1、节点路径;2、书否监控该节点;3、版本等信息可以通过一个Stat对象来指定 */
           byte[] nameByte = zk.getData( "/myName", true, null );
           String name = new String( nameByte, "UTF-8" );
           System.out.println( "get data from '/myName': " + name );
    
           /* 删除节点。两个参数:1、节点路径;2、 版本,-1可以匹配任何版本,会删除所有数据 */
           zk.delete( "/myName", -1 );
           System.out.println( "delete '/myName'" );
           zk.close();
       }
    }
    

    运行程序,打印结果如下:

    1.png

    更详细的API请参考官方网站。

    Zookeeper 从设计模式角度来看,是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应。

    下面通过两个ZooKeeper的典型用用场景来体会下ZooKeeper的特性与使用方法。

    2、 分布式锁

    先来回顾一下多线程中的锁控制。

    public class MultiThreadTest {
       /* 以一个静态变量来模拟公共资源 */
       private static int counter = 0;
    
    
       /* 多线程环境下,会出现并发问题 */
       public static void plus()
       {
           /* 计数器加一 */
           counter++;
    
           /* 线程随机休眠数毫秒,模拟现实中的耗时操作 */
           int sleepMillis = (int) (Math.random() * 100);
    
           try {
               Thread.sleep( sleepMillis );
           } catch ( InterruptedException e ) {
               e.printStackTrace();
           }
       }
    
    
       /* 线程实现类 */
       static class CountPlus extends Thread {
           @Override
    
           public void run()
           {
               for ( int i = 0; i < 20; i++ )
               {
                   plus();
               }
    
               System.out.println( Thread.currentThread().getName() + "执行完毕:" + counter );
           }
    
           public CountPlus( String threadName )
           {
               super(threadName);
           }
       }
    
       public static void main( String[] args ) throws Exception
       {
           /* 开启五个线程 */
           CountPlus threadA = new CountPlus( "threadA" );
           threadA.start();
    
           CountPlus threadB = new CountPlus( "threadB" );
           threadB.start();
    
           CountPlus threadC = new CountPlus( "threadC" );
           threadC.start();
    
           CountPlus threadD = new CountPlus( "threadD" );
           threadD.start();
    
           CountPlus threadE = new CountPlus( "threadE" );
           threadE.start();
       }
    }
    

    上例中,开启了五个线程,每个线程通过plus()方法对静态变量counter分别进行20次累加,预期counter最后会变成100。运行程序:

    2.png

    可以发现,五个线程执行完毕之后,counter并没有变成100。plus()方法涉及到对公共资源的改动,但是并没有对它进行同步控制,可能会造成多个线程同时对公共资源发起改动,进而出现并发问题。问题的根源在于,上例中没有保证同一时刻只能有一个线程可以改动公共资源。

    给plus()方法加上synchronized关键字,重新运行程序:

    3.png

    可见,最终达到了预期结果。

    synchronized关键字的作用是对plus()方法加入锁控制,一个线程想要执行该方法,首先需要获得锁(锁是唯一的),执行完毕后,再释放锁。如果得不到锁,该线程会进入等待池中等待,直到抢到锁才能继续执行。这样就保证了同一时刻只能有一个线程可以改动公共资源,避免了并发问题。

    共享锁在同一个进程中很容易实现,可以靠Java本身提供的同步机制解决,但是在跨进程或者在不同 Server 之间就不好实现了,这时候就需要一个中间人来协调多个Server之间的各种问题,比如如何获得锁/释放锁、谁先获得锁、谁后获得锁等。

    借助Zookeeper 可以实现这种分布式锁:需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 getChildren()方法获取列表中最小的目录节点,如果最小节点就是自己创建的目录节点,那么它就获得了这个锁,如果不是那么它就调用 exists() 方法并监控前一节点的变化,一直到自己创建的节点成为列表中最小编号的目录节点,从而获得锁。释放锁很简单,只要删除它自己所创建的目录节点就行了。

    流程图如下:

    4.png

    下面我们对刚才的代码进行改造,不用synchronize关键字而是使用ZooKeeper达到锁控制的目的,模拟分布式锁的实现。

    public class ZkDistributedLock {
        /* 以一个静态变量来模拟公共资源 */
        private static int counter = 0;
    
        public static void plus()
        {
            /* 计数器加一 */
            counter++;
    
            /* 线程随机休眠数毫秒,模拟现实中的费时操作 */
            int sleepMillis = (int) (Math.random() * 100);
            try {
                Thread.sleep( sleepMillis );
            } catch ( InterruptedException e ) {
                e.printStackTrace();
            }
        }
    
        /* 线程实现类 */
        static class CountPlus extends Thread {
            private static final String LOCK_ROOT_PATH = "/Locks";
            private static final String LOCK_NODE_NAME = "Lock_";
    
            /* 每个线程持有一个zk客户端,负责获取锁与释放锁 */
            ZooKeeper zkClient;
    
            @Override
    
            public void run()
            {
                for ( int i = 0; i < 20; i++ )
                {
                    /* 访问计数器之前需要先获取锁 */
                    String path = getLock();
    
                    /* 执行任务 */
                    plus();
    
                    /* 执行完任务后释放锁 */
                    releaseLock( path );
                }
    
                closeZkClient();
                System.out.println( Thread.currentThread().getName() + "执行完毕:" + counter );
            }
    
            /**
             * 获取锁,即创建子节点,当该节点成为序号最小的节点时则获取锁
             */
            private String getLock()
            {
                try {
                    /* 创建EPHEMERAL_SEQUENTIAL类型节点 */
                    String lockPath = zkClient.create( LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME,
    
                                       Thread.currentThread().getName().getBytes(), Ids.OPEN_ACL_UNSAFE,
    
                                       CreateMode.EPHEMERAL_SEQUENTIAL );
    
                    System.out.println( Thread.currentThread().getName() + " create path : " + lockPath );
    
                    /* 尝试获取锁 */
                    tryLock( lockPath );
    
                    return(lockPath);
                } catch ( Exception e ) {
                    e.printStackTrace();
                }
                return(null);
            }
    
            /**
             * 该函数是一个递归函数 如果获得锁,直接返回;否则,阻塞线程,等待上一个节点释放锁的消息,然后重新tryLock
             */
            private boolean tryLock( String lockPath ) throws KeeperException, InterruptedException
            {
                /* 获取LOCK_ROOT_PATH下所有的子节点,并按照节点序号排序 */
                List<String> lockPaths = zkClient.getChildren( LOCK_ROOT_PATH, false );
                Collections.sort( lockPaths );
    
                int index = lockPaths.indexOf( lockPath.substring( LOCK_ROOT_PATH.length() + 1 ) );
                if ( index == 0 )       
          /* lockPath是序号最小的节点,则获取锁 */
                {
                    System.out.println( Thread.currentThread().getName() + " get lock, lockPath: " + lockPath );
    
                    return(true);
                } else {              
            /* lockPath不是序号最小的节点 */
                    /* 创建Watcher,监控lockPath的前一个节点 */
                    Watcher watcher = new Watcher()
                    {
                        @Override
    
                        public void process( WatchedEvent event )
                        {
                            System.out.println( event.getPath() + " has been deleted" );
    
                            synchronized (this) {
                                notifyAll();
                            }
                        }
                    };
    
                    String preLockPath = lockPaths.get( index - 1 );
                    Stat stat = zkClient.exists( LOCK_ROOT_PATH + "/" + preLockPath, watcher );
    
                    if ( stat == null )     
              /* 由于某种原因,前一个节点不存在了(比如连接断开),重新tryLock */
                    {
                        return(tryLock( lockPath ) );
                    } else {                
              /* 阻塞当前进程,直到preLockPath释放锁,重新tryLock */
                        System.out.println( Thread.currentThread().getName() + " wait for " + preLockPath );
    
                        synchronized (watcher) {
                            watcher.wait();
                        }
    
                        return(tryLock( lockPath ) );
                    }
                }
            }
    
            /**
             * 释放锁,即删除lockPath节点
             */
            private void releaseLock( String lockPath )
            {
                try {
                    zkClient.delete( lockPath, -1 );
                } catch ( InterruptedException | KeeperException e ) {
                    e.printStackTrace();
                }
            }
    
            public void setZkClient( ZooKeeper zkClient )
            {
                this.zkClient = zkClient;
            }
    
            public void closeZkClient()
            {
                try {
                    zkClient.close();
                } catch ( InterruptedException e ) {
                    e.printStackTrace();
                }
            }
    
            public CountPlus( String threadName )
            {
                super(threadName);
            }
        }
    
        public static void main( String[] args ) throws Exception
        {
            /* 开启五个线程 */
            CountPlus threadA = new CountPlus( "threadA" );
            setZkClient( threadA );
            threadA.start();
    
            CountPlus threadB = new CountPlus( "threadB" );
            setZkClient( threadB );
            threadB.start();
    
            CountPlus threadC = new CountPlus( "threadC" );
            setZkClient( threadC );
            threadC.start();
    
            CountPlus threadD = new CountPlus( "threadD" );
            setZkClient( threadD );
            threadD.start();
    
            CountPlus threadE = new CountPlus( "threadE" );
            setZkClient( threadE );
            threadE.start();
        }
    
        public static void setZkClient( CountPlus thread ) throws Exception
        {
            ZooKeeper zkClient = new ZooKeeper( "127.0.0.1:2181", 3000, null );
            thread.setZkClient( zkClient );
        }
    }
    

    运行程序之前需要创建“/Locks”作为存放锁信息的根节点。

    一旦某个Server想要获得锁,就会在/Locks”下创建一个EPHEMERAL_SEQUENTIAL类型的名为“Lock_”子节点,ZooKeeper会自动为每个子节点附加一个递增的编号,该编号为int类型,长度为10,左端以0补全。“/Locks”下会维持着这样一系列的节点:

    Lock_0000000001, Lock_0000000002, Lock_0000000003, Lock_0000000004…

    一旦这些创建这些节点的Server断开连接,该节点就会被清除(当然也可以主动清除)。

    由于节点的编号是递增的,创建越晚排名越靠后。遵循先到先得的原则,Server创建完节点之后会检查自己的节点是不是最小的,如果是,那就获得锁,如果不是,排队等待。执行完任务之后,Server清除自己创建的节点,这样后面的节点会依次获得锁。

    程序的运行结果如下:

    5.png

    3、 分布式队列

    很多单机上很平常的事情,放在集群环境中都会发生质的变化。

    以一个常见的生产者-消费者模型举例:有一个容量有限的邮筒,寄信者(即生产者)不断地将信件塞入邮筒,邮递员(即消费者)不断地从邮筒取出信件发往目的地。运行期间需要保证:

    (1)邮筒已达上限时,寄信者停止活动,等带邮筒恢复到非满状态

    (2)邮筒已空时,邮递员停止活动,等带邮筒恢复到非空状态

    该邮筒用有序队列实现,保证FIFO(先进先出)特性。

    在一台机器上,可以用有序队列来实现邮筒,保证FIFO(先进先出)特性,开启两个线程,一个充当寄信者,一个充当邮递员,通过wait()/notify()很容易实现上述功能。

    但是,如果在跨进程或者分布式环境下呢?比如,一台机器运行生产者程序,另一台机器运行消费者程序,代表邮筒的有序队列无法跨机器共享,但是两者需要随时了解邮筒的状态(是否已满、是否已空)以及保证信件的有序(先到达的先发送)。

    这种情况下,可以借助ZooKeeper实现一个分布式队列。新建一个“/mailBox”节点代表邮筒。一旦有信件到达,就在该节点下创建PERSISTENT_SEQUENTIAL类型的子节点,当子节点总数达到上限时,阻塞生产者,然后使用getChildren(String path, Watcher watcher)方法监控子节点的变化,子节点总数减少后再回复生产;而消费者每次选取序号最小的子节点进行处理,然后删除该节点,当子节点总数为0时,阻塞消费者,同样设置监控,子节点总数增加后再回复消费。

    代码如下:

    public class ZkDistributedQueue {
        /* 邮箱上限为10封信 */
        private static final int MAILBOX_MAX_SIZE = 10;
    
        /* 邮箱路径 */
        private static final String MAILBOX_ROOT_PATH = "/mailBox";
    
        /* 信件节点 */
        private static final String LETTER_NODE_NAME = "letter_";
    
        /* 生产者线程,负责接受信件 */
        static class Producer extends Thread {
            ZooKeeper zkClient;
    
            @Override
            public void run()
            {
                while ( true )
                {
                    try {
                        if ( getLetterNum() == MAILBOX_MAX_SIZE ) /* 信箱已满 */
                        {
                            System.out.println( "mailBox has been full" );
                            
                /* 创建Watcher,监控子节点的变化 */
                            Watcher watcher = new Watcher()
                            {
                                @Override
                                public void process( WatchedEvent event )
                                {
                                    /* 生产者已停止,只有消费者在活动,所以只可能出现发送信件的动作,即子节点被删除的变化 */
                                    System.out.println( "mailBox has been not full" );
    
                                    synchronized (this) {
                                        notify(); /* 唤醒生产者 */
                                    }
                                }
                            };
    
                            zkClient.getChildren( MAILBOX_ROOT_PATH, watcher );
    
                            synchronized (watcher) {
                                watcher.wait(); /* 阻塞生产者 */
                            }
                        } else {
                            /* 线程随机休眠数毫秒,模拟现实中的费时操作 */
                            int sleepMillis = (int) (Math.random() * 1000);
                            Thread.sleep( sleepMillis );
    
                            /* 接收信件,创建新的子节点 */
                            String newLetterPath = zkClient.create( MAILBOX_ROOT_PATH + "/" + LETTER_NODE_NAME,
                                                "letter".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL );
    
                            System.out.println( "a new letter has been received: "
                                        + newLetterPath.substring( MAILBOX_ROOT_PATH.length() + 1 )
                                        + ", letter num: " + getLetterNum() );
                        }
                    } catch ( Exception e ) {
                        System.out.println( "producer equit task becouse of exception !" );
                        e.printStackTrace();
                        break;
                    }
                }
            }
    
            private int getLetterNum() throws KeeperException, InterruptedException
            {
                Stat stat = zkClient.exists( MAILBOX_ROOT_PATH, null );
                int letterNum = stat.getNumChildren();
                return(letterNum);
            }
    
            public void setZkClient( ZooKeeper zkClient )
            {
                this.zkClient = zkClient;
            }
        }
    
        /* 消费者线程,负责发送信件 */
        static class Consumer extends Thread {
            ZooKeeper zkClient;
    
            @Override
            public void run()
            {
                while ( true )
                {
                    try {
                        if ( getLetterNum() == 0 ) /* 信箱已空 */
    
                        {
                            System.out.println( "mailBox has been empty" );
                            /* 创建Watcher,监控子节点的变化 */
                            Watcher watcher = new Watcher()
                            {
                                @Override
                                public void process( WatchedEvent event )
                                {
                                    /* 消费者已停止,只有生产者在活动,所以只可能出现收取信件的动作,即子节点被增加的变化 */
                                    System.out.println( "mailBox has been not empty" );
                                    synchronized (this) {
                                        notify(); /* 唤醒消费者 */
                                    }
                                }
                            };
    
                            zkClient.getChildren( MAILBOX_ROOT_PATH, watcher );
    
                            synchronized (watcher) {
                                watcher.wait(); /* 阻塞消费者 */
                            }
                        } else {
                            /* 线程随机休眠数毫秒,模拟现实中的费时操作 */
                            int sleepMillis = (int) (Math.random() * 1000);
                            Thread.sleep( sleepMillis );
    
                            /* 发送信件,删除序号最小的子节点 */
                            String firstLetter = getFirstLetter();
                            zkClient.delete( MAILBOX_ROOT_PATH + "/" + firstLetter, -1 );
                            System.out.println( "a letter has been delivered: " + firstLetter + ", letter num: " + getLetterNum() );
                        }
                    } catch ( Exception e ) {
                        System.out.println( "consumer equit task becouse of exception !" );
                        e.printStackTrace();
                        break;
                    }
                }
            }
    
            private int getLetterNum() throws KeeperException, InterruptedException
            {
                Stat stat = zkClient.exists( MAILBOX_ROOT_PATH, false );
                int letterNum = stat.getNumChildren();
                return(letterNum);
            }
    
            private String getFirstLetter() throws KeeperException, InterruptedException
            {
                List<String> letterPaths = zkClient.getChildren( MAILBOX_ROOT_PATH, false );
                Collections.sort( letterPaths );
                return(letterPaths.get( 0 ) );
            }
    
            public void setZkClient( ZooKeeper zkClient )
            {
                this.zkClient = zkClient;
            }
        }
    
        public static void main( String[] args ) throws IOException
        {
            /* 开启生产者线程 */
            Producer producer = new Producer();
            ZooKeeper zkClientA = new ZooKeeper( "127.0.0.1:2181", 3000, null );
            producer.setZkClient( zkClientA );
            producer.start();
    
            /* 开启消费者线程 */
            Consumer consumer = new Consumer();
            ZooKeeper zkClientB = new ZooKeeper( "127.0.0.1:2181", 3000, null );
            consumer.setZkClient( zkClientB );
            consumer.start();
        }
    }
    

    打印结果如下:

    6.png

    上例中还有一个可以改进的地方,在分布式环境下,像MAILBOX_MAX_SIZE这类常量是被多台机器共用的,而且运行期间可能发生改变,比如邮筒上限需要从10改为20,只能停掉机器,然后改动每台机器上的参数,再重新部署。可是,如果该服务不允许停机,而且部署在数十台机器上,让参数在运行时生效且保持一致,怎么办?

    这就涉及到了ZooKeeper另一个典型的应用场景——配置中心。被多台机器共享的参数可以托管在ZNode上,对该参数关心的机器在Znode上注册Watcher,一旦该参数发生变化,注册者会收到消息,然后做出相应的调整。

    ZooKeeper的作用当然不止于此,更多的应用场景就需要使用者在实际项目中发掘跟探索了,毕竟,纸上得来终觉浅,实践出真知。

    相关文章

      网友评论

          本文标题:ZooKeepr之应用实例

          本文链接:https://www.haomeiwen.com/subject/qevqxktx.html