美文网首页ZooKeeper程序员java进阶干货
9. 使用ZooKeeper Java API编程

9. 使用ZooKeeper Java API编程

作者: 码匠安徒生 | 来源:发表于2017-11-23 00:31 被阅读1037次

    ZooKeeper是用Java开发的,3.4.6版本的Java API文档可以在http://zookeeper.apache.org/doc/r3.4.6/api/index.html上找到。

    Tips
    本章的代码在Linux操作系统下进行测试,运行ZooKeeper服务器实例的版本为3.4.6。

    开发应用程序的ZooKeeper Java绑定主要由两个Java包组成:

    • org.apache.zookeeper
    • org.apache.zookeeper.data

    org.apache.zookeeper包由ZooKeeper监视的接口定义和ZooKeeper的各种回调处理程序组成。 它定义了ZooKeeper客户端类库的主要类以及许多ZooKeeper事件类型和状态的静态定义。 org.apache.zookeeper.data包定义了与数据寄存器(也称为znode)相关的特性,例如访问控制列表(ACL),IDs,stats等。

    ZooKeeper Java API中的org.apache.zookeeper.server,org.apache.zookeeper.server.quorum和org.apache.zookeeper.server.upgrade包是服务器实现的一部分。 org.apache.zookeeper.client包用于查询ZooKeeper服务器的状态。

    一 准备开发环境

    Apache ZooKeeper是一个复杂的软件,因此它需要运行许多其他类库。 依赖库作为jar文件在ZooKeeper发行版中附带在lib目录中。 核心ZooKeeper jar文件名字为zookeeper-3.4.6.jar,位于主目录下。

    要开发Java的ZooKeeper应用程序,我们必须设置指向ZooKeeper jar的类路径,以及ZooKeeper所依赖的所有第三方库。在 bin 目录下有一个 zkEnv.sh文件,可以用来设置CLASSPATH。

    我们需要将脚本如下设置,在命令行中执行以下语句:

    $ ZOOBINDIR=${ZK_HOME}/bin
    $ source ${ZOOBINDIR}/zkEnv.sh
    

    shell变量ZK_HOME被设置为安装ZooKeeper的路径,在我的设置中,它是/usr/share/zookeeper。 之后,CLASSPATH变量被正确设置,在我的系统中,如下所示:

    $ echo $CLASSPATH 
    /usr/share/zookeeper-3.4.6/bin/../build/classes :/usr/share/zookeeper-3.4.6/bin/../build/lib/*.jar :/usr/share/zookeeper-3.4.6/bin/../lib/slf4j-log4j12-1.6.1.jar :/usr/share/zookeeper-3.4.6/bin/../lib/slf4j-api-1.6.1.jar :/usr/share/zookeeper-3.4.6/bin/../lib/netty-3.7.0.Final.jar :/usr/share/zookeeper-3.4.6/bin/../lib/log4j-1.2.16.jar :/usr/share/zookeeper-3.4.6/bin/../lib/jline-0.9.94.jar :/usr/share/zookeeper-3.4.6/bin/../zookeeper-3.4.6.jar :/usr/share/zookeeper-3.4.6/bin/../src/java/lib/*.jar :/usr/share/zookeeper-3.4.6/bin/../conf:
    

    在Windows操作系统中,需要运行zkEnv.cmd脚本。 现在可以使用CLASSPATH变量来编译和运行使用ZooKeeper API编写的Java程序。 可以在Uni/Linux中的主目录的.bashrc文件中找到zkEnv.sh脚本,避免每次启动shell会话时都采用它。

    二 第一个ZooKeeper程序

    为了引入ZooKeeper Java API,让我们从一个非常简单的程序开始,它可以连接到localhost中的ZooKeeper实例,如果连接成功,它将在ZooKeeper名称空间的根路径下打印znode的列表。

    这个程序的代码如下所示:

    /*Our First ZooKeeper Program*/
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.ZooKeeper;
    public class HelloZooKeeper {
      public static void main(String[] args) throws IOException {
        String hostPort = "localhost:2181";
        String zpath = "/";
        List <String> zooChildren = new ArrayList<String>();
        ZooKeeper zk = new ZooKeeper(hostPort, 2000, null);
        if (zk != null) {
          try {
            zooChildren = zk.getChildren(zpath, false);
            System.out.println("Znodes of '/': ");
            for (String child: zooChildren) {
              //print the children
              System.out.println(child);
            }
          } catch (KeeperException e) {
            e.printStackTrace();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
       }
    }
    

    在构建和执行前面的代码片段之前,让我们来看看它具体做了什么。代码从导入语句开始。使用这些语句,我们导入了程序各个组件所需的包。如前所述,org.apache.zookeeper包包含客户端与ZooKeeper服务器进行交互所需的所有类和接口。在导入包之后,定义了一个名为HelloZooKeeper的类。由于我们要连接到在同一系统中运行的ZooKeeper实例,在main方法中将主机和端口字符串定义为localhost:2181。代码行zk = new ZooKeeper(hostPort, 2000, null)调用ZooKeeper构造方法,该构造方法尝试连接到ZooKeeper服务器并返回一个引用。对于连接到ZooKeeper服务器实例并维护该连接的客户端程序,需要维护一个实时会话。在此例中,构造方法实例化的zk对象返回的引用表示这个会话。 ZooKeeper API是围绕这个引用构建的,每个方法调用都需要一个引用来执行。

    ZooKeeper类的构造方法使用以下代码创建ZooKeeper实例的引用:

    ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
    

    使用的参数含义如下:

    • connectString:以逗号分隔的主机:端口号列表,每个对应一个ZooKeeper服务器。 例如,10.0.0.1:2001,10.0.0.2:2002和10.0.0.3:2003表示三个节点的ZooKeeper ensemble的有效的主机:端口匹配对。
    • sessionTimeout:这是以毫秒为单位的会话超时时间。这是ZooKeeper在宣布session结束之前,没有从客户端获得心跳的时间。
    • watcher:一个watcher对象,如果创建,当状态改变和发生节点事件时会收到通知。这个watcher对象需要通过一个用户定义的类单独创建,通过实现Watcher接口并将实例化的对象传递给ZooKeeper构造方法。客户端应用程序可以收到各种类型的事件的通知,例如连接丢失、会话过期等。

    ZooKeeper Java API定义了另外带有三个参数的构造方法,以指定更高级的操作。代码如下:

    ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
    

    在ZooKeeper类的上面的构造方法中,如果设置为true,boolean canBeReadOnly参数允许创建的客户端在网络分区的情况下进入只读模式。只读模式是客户端无法找到任何多数服务器的场景,但有一个可以到达的分区服务器,以只读模式连接到它,这样就允许对服务器的读取请求,而写入请求则不允许。客户端继续尝试在后台连接到大多数服务器,同时仍然保持只读模式。分区服务器仅仅是ZooKeeper组的一个子集,它是由于集群中的网络分配而形成的。大多数服务器构成了ensemble中的大多数quorum。

    以下构造方法显示了两个附加参数的定义:

    ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
    

    这个构造方法允许ZooKeeper客户端对象创建两个额外的参数:

    • sessionId:在客户端重新连接到ZooKeeper服务器的情况下,可以使用特定的会话ID来引用先前连接的会话
    • sessionPasswd:如果指定的会话需要密码,可以在这里指定

    以下构造方法是前两个调用的组合:

    ZooKeeper(String connectString, int sessionTimeout,Watcher watcher, long sessionId, byte[] sessionPasswd,boolean canBeReadOnly)
    

    此构造方法是前两个调用的组合,允许在启用只读模式的情况下重新连接到指定的会话。

    Note
    ZooKeeper类的详细Java API文档可以在http://zookeeper.apache.org/doc/r3.4.6/api/index.html上查询。

    现在,回到我们的ZooKeeper程序。 在调用构造方法后,如果连接成功,我们将得到ZooKeeper服务器的引用。 我们通过下面的代码将引用传递给getChildren方法:

    zooChildren = zk.getChildren(zpath, false)
    

    ZooKeeper类的getChildren(String path,boolean watch)方法返回给定路径上znode的子级列表。 我们只是迭代这个方法返回的列表,并将字符串打印到控制台。

    将程序命名为HelloZooKeeper.java,并编译我们的程序如下:

    $ javac -cp $CLASSPATH HelloZooKeeper.java
    

    在我们运行的程序之前,需要使用以下命令来启动ZooKeeper服务器实例:

    $ ${ZK_HOME}/bin/zkServer.sh start
    

    运行程序如下:

    $ java -cp $CLASSPATH HelloZooKeeper
    

    执行程序会在控制台上打印日志消息,显示所使用的ZooKeeper版本,Java版本,Java类路径,服务器体系结构等等。 这里显示了这些日志消息的一部分:


    输出结果

    ZooKeeper Java API生成的日志消息对调试非常有用。 它为我们提供了关于客户端连接到ZooKeeper服务器,建立会话等后台得信息。 上面显示的最后三条日志消息告诉我们客户端如何使用程序中指定的参数来启动连接,以及在成功连接后,服务器如何为客户端分配会话ID。

    最后,程序执行最后在控制台中输出以下内容:


    子节点信息

    我们可以使用ZooKeeper shell来验证程序的正确性:

    $ $ZK_HOME/bin/zkCli.sh -server localhost
    
    shell 输出结果

    恭喜! 我们刚刚成功编写了我们的第一个ZooKeeper客户端程序。

    二 实现Watcher接口

    ZooKeeper Watcher监视使客户端能够接收来自ZooKeeper服务器的通知,并在发生时处理这些事件。 ZooKeeper Java API提供了一个名为Watcher的公共接口,客户端事件处理程序类必须实现该接口才能接收有关来自ZooKeeper服务器的事件通知。 以编程方式,使用这种客户端的应用程序通过向客户端注册回调(callback)对象来处理这些事件。

    我们将实现Watcher接口,处理与znode关联的数据更改时由ZooKeeper生成的事件。

    Watcher接口在org.apache.zookeeper包中声明如下:

    public interface Watcher {
      void process(WatchedEvent event);
    }
    

    为了演示znode数据监视器(Watcher),有两个Java类:DataWatcherDataUpdaterDataWatcher将一直运行,并在/MyConfig指定znode路径中侦听来自ZooKeeper服务器的NodeDataChange事件。DataUpdater类将定期更新此znode路径中的数据字段,这将生成事件,并且在接收到这些事件后,DataWatcher类将把更改后的数据打印到控制台上。

    以下是DataWatcher.java类的代码:

    import java.io.IOException;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.ZooKeeper;
    
    public class DataWatcher implements Watcher, Runnable {
      private static String hostPort = "localhost:2181";
      private static String zooDataPath = "/MyConfig";
      byte zoo_data[] = null;
      ZooKeeper zk;
    
      public DataWatcher() {
        try {
          zk = new ZooKeeper(hostPort, 2000, this);
          if (zk != null) {
            try {
              //Create the znode if it doesn't exist, with the following code:
              if (zk.exists(zooDataPath, this) == null) {
                zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
              }
            } catch (KeeperException | InterruptedException e) {
              e.printStackTrace();
            }
          }
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
      public void printData() throws InterruptedException, KeeperException {
        zoo_data = zk.getData(zooDataPath, this, null);
        String zString = new String(zoo_data);
        // The following code prints the current content of the znode to the console:
        System.out.printf("\nCurrent Data @ ZK Path %s: %s", zooDataPath, zString);
      }
      @Override
      public void process(WatchedEvent event) {
        System.out.printf(
        "\nEvent Received: %s", event.toString());
        //We will process only events of type NodeDataChanged
        if (event.getType() == Event.EventType.NodeDataChanged) {
          try {
            printData();
          } catch (InterruptedException e) {
            e.printStackTrace();
          } catch (KeeperException e) {
            e.printStackTrace();
          }
        }
      }
      public static void main(String[] args)
      throws InterruptedException, KeeperException {
        DataWatcher dataWatcher = new DataWatcher();
        dataWatcher.printData();
        dataWatcher.run();
      }
      public void run() {
        try {
          synchronized (this) {
            while (true) {
              wait();
            }
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
          Thread.currentThread().interrupt();
        }
      }
    }
    

    我们来看一下DataWatcher.java类的代码来理解一个ZooKeeper监视器的实现。 DataWatcher公共类实现Watcher接口以及Runnable接口,打算将监视器作为线程运行。 main方法创建DataWatcher类的一个实例。 在前面的代码中,DataWatcher构造方法尝试连接到在本地主机上运行的ZooKeeper实例。 如果连接成功,我们用下面的代码检查znode路径/MyConfig是否存在:

    if (zk.exists(zooDataPath, this) == null) {
    

    如果znode不存在ZooKeeper命名空间中,那么exists方法调用将返回null,并且尝试使用代码将其创建为持久化znode,如下所示:

    zk.create(zooDataPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    

    接下来是process方法,它在org.apache.ZooKeeper的Watcher接口中声明,并由DataWatcher类使用以下代码实现:

    public void process(WatchedEvent event) {
    

    为了简单起见,在process方法中,打印从ZooKeeper实例接收的事件,并仅对NodeDataChanged类型的事件进行进一步处理,如下所示:

    if (event.getType() == Event.EventType.NodeDataChanged)
    

    当znode路径/MyConfig的数据字段发生任何更新或更改而收到NodeDataChanged类型的事件时,调用printData方法来打印znode的当前内容。 在znode上执行一个getData调用时,我们再次设置一个监视,这是该方法的第二个参数,如下面的代码所示:

    zoo_data = zk.getData(zooDataPath, this, null);
    

    监视事件是发送给设置监视的客户端的一次性触发器,为了不断接收进一步的事件通知,客户端应该重置监视器。

    DataUpdater.java是一个简单的类,它连接到运行本地主机的ZooKeeper实例,并用随机字符串更新znode路径/MyConfig的数据字段。 在这里,我们选择使用通用唯一标识符(UUID)字符串更新znode,因为后续的UUID生成器调用将保证生成唯一的字符串。

    DataUpdater.java类代码如下:

    import java.io.IOException;
    import java.util.UUID;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    public class DataUpdater implements Watcher {
      private static String hostPort = "localhost:2181";
      private static String zooDataPath = "/MyConfig";
      ZooKeeper zk;
    
      public DataUpdater() throws IOException {
        try {
          zk = new ZooKeeper(hostPort, 2000, this);
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    
    // updates the znode path /MyConfig every 5 seconds with a new UUID string.
    public void run() throws InterruptedException, KeeperException {
        while (true) {
          String uuid = UUID.randomUUID().toString();
          byte zoo_data[] = uuid.getBytes();
          zk.setData(zooDataPath, zoo_data, -1);
          try {
            Thread.sleep(5000); // Sleep for 5 secs
          } catch(InterruptedException e) {
            Thread.currentThread().interrupt();
          }
        }
      }
    
    public static void main(String[] args) throws
      IOException, InterruptedException, KeeperException {
        DataUpdater dataUpdater = new DataUpdater();
        dataUpdater.run();
      }
      @Override
      public void process(WatchedEvent event) {
        System.out.printf("\nEvent Received: %s", event.toString());
      }
    }
    

    上面的代码使ZooKeeper服务器触发一个NodeDataChanged事件。 由于DataWatcher为此znode路径设置了监视,因此它会接收数据更改事件的通知。 然后它检索更新的数据,重置监视,并在控制台上打印数据。

    使用以下命令编译DataWatcherDataUpdater类:

    $ javac –cp $CLASSPATH DataWatcher.java
    $ javac –cp $CLASSPATH DataUpdater.java
    

    要执行监视器和更新程序,需要打开两个终端窗口。 我要先运行监视器,因为它创建了/MyConfig的znode(如果还未在ZooKeeper的命名空间中创建的话)。 运行监视器之前,请确保ZooKeeper服务器在本地主机上已经运行。

    在其中一个终端窗口中,通过运行以下命令来执行watcher类:

    $ java –cp $CLASSPATH DataWatcher
    

    输出类似于以下屏幕截图所示的消息:


    DataWatcher输出

    如前面的截图所示,znode路径/MyConfig是由DataWatcher类创建的。 它也打印znode的内容,但没有打印在控制台中,因为我们在创建znode时没有设置任何数据。 当znode被创建时,类中的监视者收到了NodeCreated类型的事件通知,这个通知被打印在控制台中。 DataWatcher类继续运行,并从ZooKeeper服务器侦听/MyConfig节点上的事件。

    让我们在另一个终端窗口中运行DataUpdater类:

    $ java -cp $CLASSPATH DataUpdater
    

    将最初的ZooKeeper特定日志消息记录到控制台后,DataUpdater类运行时没有提示。 它将一个新的UUID字符串设置到ZooKeeper路径/MyConfig的数据字段中。 因此,看到每隔5秒钟,在下面的屏幕截图中显示的输出内容打印在运行DataWatche的终端窗口中:

    DataUpdater输出

    DataWatcher也可以使用ZooKeeper shell进行测试。 继续像以前一样在终端中运行DataWatcher类,并在另一个终端中调用ZooKeeper shell并运行以下屏幕截图中所示的命令:

    Shell 测试

    在DataWatcher正在运行的终端中,将打印以下消息:


    事件捕获

    三 示例——群集监视器

    通过互联网提供的流行服务,如电子邮件,文件服务平台,在线游戏等,都是通过跨越多个数据中心的高度可用的成百上千台服务器来服务的,而这些服务器通常在地理位置上分开。 在这种集群中,设置了一些专用的服务器节点来监视生产网络中承载服务或应用程序的服务器的活跃性。 在云计算环境中,也用于管理云环境的这种监控节点被称为云控制器。 这些控制器节点的一个重要工作是实时检测生产服务器的故障,并相应地通知管理员,并采取必要的措施,例如将故障服务器上的应用程序故障转移到另一个服务器,从而确保容错性和高可用性。

    在本节中,我们将使用ZooKeeper Java客户端API开发一个简约的分布式集群监视器模型。 使用ZooKeeper的ephemeral znode概念来构建这个监视模型相当简单和优雅,如以下步骤所述:

    1. 每个生产服务器运行一个ZooKeeper客户端作为守护进程。 这个过程连接到ZooKeeper服务器,并在/ZooKeeper命名空间的预定义路径(比如/Members)下创建一个带有名称(最好是其网络名称或主机名)的ephemeral znode。
    2. 云控制器节点运行ZooKeeper监视器进程,该进程监视路径/Members并监听NodeChildrenChanged类型的事件。 这个监视器进程作为服务或守护进程运行,并设置或重置路径上的监视,并且实现其逻辑以调用适当的模块来为监视事件采取必要的行动。
    3. 现在,如果生产服务器由于硬件故障或软件崩溃而关闭,ZooKeeper客户端进程就会被终止,导致服务器和ZooKeeper服务之间的会话被终止。 由于ephemeral znode的属性唯一,每当客户端连接关闭时,ZooKeeper服务会自动删除路径/Members中的znode。
    4. 路径中znode的删除引发了NodeChildrenChanged事件,因此云控制器中的观察器进程会收到通知。 通过调用路径/Members中的getChildren方法,可以确定哪个服务器节点已经关闭。
    5. 然后,控制器节点可以采取适当的措施,比如执行恢复逻辑以重启另一台服务器中的故障服务。
    6. 这个逻辑可以构建为实时工作,保证接近于零停机的时间和高度可用的服务。

    为实现这个集群监控模型,我们将开发两个Java类。 ClusterMonitor类将持续运行监视器,以监视ZooKeeper树中的路径/Members。 处理完引发事件后,我们将在控制台中打印znode列表并重置监视。 另一个类ClusterClient将启动到ZooKeeper服务器的连接,在/Members下创建一个ephemeral znode。

    要模拟具有多个节点的集群,我们在同一台计算机上启动多个客户端,并使用客户端进程的进程ID创建ephemeral znode。 通过查看进程标识,ClusterMonitor类可以确定哪个客户进程已经关闭,哪些进程还在。 在实际情况中,客户端进程通常会使用当前正在运行的服务器的主机名创建ephemeral znode。 下面显示了这两个类的源代码。

    ClusterMonitor.java类定义如下:

    import java.io.IOException;
    import java.util.List;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    public class ClusterMonitor implements Runnable {
    private static String membershipRoot = "/Members";
    private final Watcher connectionWatcher;
    private final Watcher childrenWatcher;
    private ZooKeeper zk;
    boolean alive=true;
    public ClusterMonitor(String HostPort) throws IOException, InterruptedException, KeeperException {
        connectionWatcher = new Watcher() {
          @Override
          public void process(WatchedEvent event) {
            if(event.getType()==Watcher.Event.EventType.None && event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                System.out.printf("\nEvent Received: %s", event.toString());
              }
          }
        };
    
        childrenWatcher = new Watcher() {
          @Override
          public void process(WatchedEvent event) {
            System.out.printf("\nEvent Received: %s", event.toString());
            if (event.getType() == Event.EventType.NodeChildrenChanged) {
                try {
                  //Get current list of child znode, 
                  //reset the watch
                  List<String> children = zk.getChildren( membershipRoot, this);
                  wall("!!!Cluster Membership Change!!!");
                  wall("Members: " + children);
                } catch (KeeperException e) {
                  throw new RuntimeException(e);
                } catch (InterruptedException e) {
                  Thread.currentThread().interrupt();
                  alive = false;
                  throw new RuntimeException(e);
                }
              }
          }
        };
    
        zk = new ZooKeeper(HostPort, 2000, connectionWatcher);
    
        // Ensure the parent znode exists
        if(zk.exists(membershipRoot, false) == null) {
          zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    
        // Set a watch on the parent znode
        List<String> children = zk.getChildren(membershipRoot, childrenWatcher);
        System.err.println("Members: " + children);
      }
    
      public synchronized void close() {
        try {
          zk.close();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    
      public void wall (String message) {
        System.out.printf("\nMESSAGE: %s", message);
      }
    
      public void run() {
        try {
          synchronized (this) {
            while (alive) {
              wait();
            }
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
          Thread.currentThread().interrupt();
        } finally {
          this.close();
        }
      }
    
      public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        if (args.length != 1) {
          System.err.println("Usage: ClusterMonitor <Host:Port>");
          System.exit(0);
        }
        String hostPort = args[0];
        new ClusterMonitor(hostPort).run();
      }
    }
    

    ClusterClient.java类定义如下:

    import java.io.IOException;
    import java.lang.management.ManagementFactory;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    public class ClusterClient implements Watcher, Runnable {
    private static String membershipRoot = "/Members";
    ZooKeeper zk;
    public ClusterClient(String hostPort, Long pid) {
      String processId = pid.toString();
      try {
        zk = new ZooKeeper(hostPort, 2000, this);
      } catch (IOException e) {
        e.printStackTrace();
      }
      if (zk != null) {
        try {
          zk.create(membershipRoot + '/' + processId, processId.getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (
          KeeperException | InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    
      public synchronized void close() {
        try {
          zk.close();
        }
        catch (InterruptedException e) {
          e.printStackTrace();
            }
      }
    
      @Override
      public void process(WatchedEvent event) {
        System.out.printf("\nEvent Received: %s", event.toString());
      }
    
      public void run() {
        try {
          synchronized (this) {
            while (true) {
              wait();
              }
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
          Thread.currentThread().interrupt();
        } finally {
          this.close();
        }
      }
    
      public static void main(String[] args) {
        if (args.length != 1) {
          System.err.println("Usage: ClusterClient <Host:Port>");
          System.exit(0);
        }
        String hostPort = args[0];
        //Get the process id
        String name = ManagementFactory.getRuntimeMXBean().getName();
        int index = name.indexOf('@');
        Long processId = Long.parseLong(name.substring(0, index));
        new ClusterClient(hostPort, processId).run();
      }
    }
    

    使用下面命令编译这两个类:

    $ javac -cp $CLASSPATH ClusterMonitor.java
    $ javac -cp $CLASSPATH ClusterClient.java
    

    要执行群集监控模型,打开两个终端。 在其中一个终端中,运行ClusterMonitor类。 在另一个终端中,通过在后台运行ClusterClient类来执行多个实例。

    在第一个终端中,执行ClusterMonitor类:

    $ java -cp $CLASSPATH ClusterMonitorlocalhost:2181
    

    如前面的示例所示,看到来自客户端API的调试日志消息,最后,ClusterMonitor类开始监视事件,输入如下内容:

    ClusterMonitor类输出

    现在,执行ClusterClient类的五个实例来模拟一个集群的五个节点。ClusterClient在ZooKeeper树的/Members路径中使用自己的进程ID创建ephemeral znode:

    $ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
    [1] 4028
    $ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
    [2] 4045
    $ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
    [3] 4057
    $ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
    [4] 4072
    $ java -cp $CLASSPATH ClusterClient localhost:2181 2>&1>/dev/null &
    [5] 4084
    

    与此相对应,将观察到ClusterMonitor类检测到这些新的ClusterClient类实例,因为它正在监视ZooKeeper树的/Members路径上的事件。 这模拟了一个真正的集群中的节点加入事件。 可以在ClusterMonitor类的终端中看到输出,这与下面的截图中显示的类似:

    监视事件

    现在,如果杀死一个ClusterClient.java进程,那么它与ZooKeeper服务器一起维护的会话将被终止。因此,客户端创建的ephemeral znode将被删除。删除将触发NodeChildrenChanged事件,该事件将被ClusterMonitor类捕获。该模拟在集群中一个节点离开的场景。

    让我们用ID 4084终止ClusterClient进程:

    $ kill -9 4084
    

    以下屏幕截图显示了ClusterMonitor类的终端中的输出。 它列出了当前可用的进程及其进程ID,这些进程ID模拟了实时服务器:

    可用进程

    上面的简单而优雅的集群监控模型的示例实现展示了ZooKeeper的真正威力。 在没有ZooKeeper的情况下,开发这样一个能够实时监控节点活跃度的模型将是一项真正的艰巨任务。

    相关文章

      网友评论

        本文标题:9. 使用ZooKeeper Java API编程

        本文链接:https://www.haomeiwen.com/subject/pbpmvxtx.html