美文网首页
4、Zookeeper的客户端实例

4、Zookeeper的客户端实例

作者: yannhuang | 来源:发表于2017-05-19 21:30 被阅读91次

    Zookeeper的常用开源客户端有ZkClient 和 Curator,网上可以找到很多关于这两个客户端的资料。本文将要讲述的不是这两个客户端,而是一个更加轻量级的Zookeeper客户端:de-zookeeper

    完整代码已经提交到github上:

    https://github.com/huangyanxiong/de-framework/tree/master/de-zookeeper

    下面将详细讲述de-zookeeper的代码细节和使用方法。

    1、ZookeeperProperty.java

    该类用于封装 Zookeeper 的连接属性,包括IP和端口、路径、超时时间三个属性,并提供带参构造方法和对应的 get/set 方法:

    package com.dataeye.zookeeper;
    
    public class ZookeeperProperty {
      private String zkConnnectionStr;   // 连接的IP和端口,比如: localhost:2181
      private String zNodePath;   //  zk路径,比如:/data/work
      private int sessionTimeout;  //  超时时间,毫秒,比如  60000
    
      public ZookeeperProperty(String zkConnnectionStr, String zNodePath, int sessionTimeout) {
        this.zkConnnectionStr = zkConnnectionStr;
        this.zNodePath = zNodePath;
        this.sessionTimeout = sessionTimeout;
      }
    
      public String getZkConnnectionStr() {
        return zkConnnectionStr;
      }
    
      public void setZkConnnectionStr(String zkConnnectionStr) {
        this.zkConnnectionStr = zkConnnectionStr;
      }
    
      public String getzNodePath() {
        return zNodePath;
      }
    
      public void setzNodePath(String zNodePath) {
        this.zNodePath = zNodePath;
      }
    
      public int getSessionTimeout() {
        return sessionTimeout;
      }
    
      public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
      }
    }
    
    

    2、ZooKeeperListener.java

    这是描述客户端行为的接口。这里Zookeeper监听三种行为:

    connected:建立连接
    nodeValueChange:节点的值发生改变
    nodeChildChange:子节点发生变化

    package com.dataeye.zookeeper;
    
    import java.util.Map;
    
    public interface ZooKeeperListener {
    
      void nodeChildChange(Map<String, String> newChildrenValue);
    
      void nodeValueChange(String newValue);
    
      void connected();
    }
    

    3、NodeValueCodec.java

    节点的编码和解码类,提供 decode 和 encode 接口

    package com.dataeye.zookeeper;
    
    import java.nio.charset.StandardCharsets;
    import java.util.Set;
    
    import static java.util.Objects.requireNonNull;
    
    public interface NodeValueCodec<T> {
    
      NodeValueCodec DEFAULT = DefaultNodeValueCodec.INSTANCE;
    
      default Set<T> decodeAll(byte[] zNodeValue) {
        requireNonNull(zNodeValue, "zNodeValue");
        return decodeAll(new String(zNodeValue, StandardCharsets.UTF_8));
      }
    
      Set<T> decodeAll(String zNodeValue);
    
      default T decode(byte[] zNodeValue) {
        requireNonNull(zNodeValue, "zNodeValue");
        return decode(new String(zNodeValue, StandardCharsets.UTF_8));
      }
    
      T decode(String zNodeValue);
    
      byte[] encodeAll(Iterable<T> entries);
    
      byte[] encode(T entry);
    }
    

    4、DefaultNodeValueCodec.java

    NodeValueCodec接口的实现类,具体实现了编码和解码的过程。

    这里的编码过程比较简单,只是把Worker类的两个属性: id 和 biz使用冒号分割开来,返回类似于 id:biz 这样的字符串;解码过程则相反。

    package com.dataeye.zookeeper;
    
    import com.dataeye.crawler.thrift.Worker;
    
    import java.nio.charset.StandardCharsets;
    import java.util.HashSet;
    import java.util.Set;
    import java.util.regex.Pattern;
    
    import static java.util.Objects.requireNonNull;
    
    public final class DefaultNodeValueCodec implements NodeValueCodec<Worker> {
      public static final DefaultNodeValueCodec INSTANCE = new DefaultNodeValueCodec();
      private static final String segmentDelimiter = ",";
      private static final String fieldDelimiter = ":";
      private static final Pattern SEGMENT_DELIMITER = Pattern.compile("\\s*" + segmentDelimiter + "\\s*");
    
      @Override
      public Worker decode(String segment) {
        final String[] tokens = segment.split(fieldDelimiter);
    
        String workerId = tokens[0];
        String biz = tokens[1];
        final Worker worker = new Worker(workerId, biz);
    
        return worker;
      }
    
      @Override
      public Set<Worker> decodeAll(String valueString) {
        Set<Worker> workers = new HashSet<>();
        try {
          for (String segment : SEGMENT_DELIMITER.split(valueString)) {
            workers.add(decode(segment));
          }
        } catch (Exception e) {
          throw new RuntimeException("invalid worker list: " + valueString, e);
        }
        if (workers.isEmpty()) {
          throw new RuntimeException("ZNode does not contain any workers.");
        }
        return workers;
      }
    
      @Override
      public byte[] encodeAll(Iterable<Worker> workers) {
        requireNonNull(workers, "workers");
        StringBuilder nodeValue = new StringBuilder();
        workers.forEach(worker -> nodeValue.append(worker.getId()).append(fieldDelimiter).append(
                worker.getBiz()).append(segmentDelimiter));
        //delete the last unused segment delimiter
        if (nodeValue.length() > 0) {
          nodeValue.deleteCharAt(nodeValue.length() - 1);
        }
        return nodeValue.toString().getBytes(StandardCharsets.UTF_8);
      }
    
      @Override
      public byte[] encode(Worker worker) {
        return (worker.getId() + fieldDelimiter + worker.getBiz()).getBytes(StandardCharsets.UTF_8);
      }
    }
    

    5、zk.thrift

    该客户端使用thrift生成序列化对象:

    namespace java com.dataeye.crawler.thrift
    
    struct Worker {
        //worker的标识
        1: required string id,
        //worker所属的业务
        2: required string biz
    }
    

    6、ZooKeeperConnector.java

    该类负责ZooKeeper的连接,节点操作,节点监听等功能,是该客户端的核心类

    package com.dataeye.zookeeper;
    
    import com.google.common.annotations.VisibleForTesting;
    import org.apache.zookeeper.AsyncCallback.StatCallback;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException.Code;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.nio.charset.StandardCharsets;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.CountDownLatch;
    
    import static java.util.Objects.requireNonNull;
    import static org.apache.zookeeper.KeeperException.Code.get;
    
    public final class ZooKeeperConnector {
      private static final Logger logger = LoggerFactory.getLogger(ZooKeeperConnector.class);
      private final String zkConnectionStr;
      private final String zNodePath;
      private final int sessionTimeout;
      private final ZooKeeperListener listener;
      private ZooKeeper zooKeeper;
      private BlockingQueue<KeeperState> stateQueue;
      private CountDownLatch latch;
      private boolean activeClose;
      private String prevNodeValue;
      private Map<String, String> prevChildValue;
    
      public ZooKeeperConnector(String zkConnectionStr, String zNodePath, int sessionTimeout,
                                ZooKeeperListener listener
      ) {
        this.zkConnectionStr = requireNonNull(zkConnectionStr, "zkConnectionStr");
        this.zNodePath = requireNonNull(zNodePath, "zNodePath");
        this.sessionTimeout = sessionTimeout;
        this.listener = requireNonNull(listener, "listener");
      }
    
      /**
       * Do connect.
       */
      public void connect() {
        try {
          activeClose = false;
          latch = new CountDownLatch(1);
          zooKeeper = new ZooKeeper(zkConnectionStr, sessionTimeout, new ZkWatcher());
          latch.await();
          notifyConnected();
          notifyChange();
          if (stateQueue != null) {
            //put a fake stat to ensure method postConnected finished completely
            //(so that it won't throw exception under ZooKeeper connection recovery test)
            stateQueue.put(KeeperState.Disconnected);
          }
        } catch (Exception e) {
          throw new ZooKeeperException(
                  "failed to connect to ZooKeeper:  " + zkConnectionStr + " (" + e + ')', e);
        }
      }
    
      /**
       * Utility method to create a node.
       * @param nodePath node name
       * @param value    node value
       */
      public void createChild(String nodePath, byte[] value, CreateMode createMode) {
        // CreateMode.EPHEMERAL 临时节点
        // CreateMode.PERSISTENT 持久化节点
        try {
          //first check the parent node
          if (zooKeeper.exists(zNodePath, false) == null) {
            //parent node not exist, create it
            zooKeeper.create(zNodePath, zNodePath.getBytes(StandardCharsets.UTF_8),
                    Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
          }
          if (zooKeeper.exists(zNodePath + '/' + nodePath, true) == null) {
            zooKeeper.create(zNodePath + '/' + nodePath, value, Ids.OPEN_ACL_UNSAFE, createMode);
          } else {
            throw new ZooKeeperException("failed to create ZooKeeper Node:" + zNodePath + '/' + nodePath +
                    ",because the path exist already.");
          }
        } catch (Exception e) {
          throw new ZooKeeperException(
                  "failed to create ZooKeeper Node:  " + zkConnectionStr + " (" + e + ')', e);
        }
      }
    
      /**
       * Closes the underlying Zookeeper connection.
       * @param active if it is closed by user actively ? or passively by program because of underlying
       *               connection expires
       */
      public void close(boolean active) {
        try {
          activeClose = active;
          zooKeeper.close();
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    
      /**
       * Notify listener that ZooKeeper connection has been established.
       */
      private void notifyConnected() {
        listener.connected();
      }
    
      /**
       * Notify listener that a node value or node children has changed.
       */
      private void notifyChange() {
        //forget it if event was triggered by user's actively closing EndpointGroup
        if (activeClose) {
          return;
        }
        List<String> children;
        byte[] newValueBytes;
        try {
          if (zooKeeper.exists(zNodePath, true) == null) {
            return;
          }
          children = zooKeeper.getChildren(zNodePath, true);
          newValueBytes = zooKeeper.getData(zNodePath, false, null);
          if (newValueBytes != null) {
            String newValue = new String(newValueBytes, StandardCharsets.UTF_8);
            if (prevNodeValue == null || !prevNodeValue.equals(newValue)) {
              listener.nodeValueChange(newValue);
              prevNodeValue = newValue;
            }
          }
          //check children status
          if (children != null) {
            Map<String, String> newChildValue = new HashMap<>();
            children.forEach(child -> {
              try {
                newChildValue.put(child,
                        new String(zooKeeper.getData(zNodePath + '/' + child,
                                false, null), StandardCharsets.UTF_8));
              } catch (Exception e) {
                throw new ZooKeeperException(e);
              }
            });
            if (prevChildValue == null || !prevChildValue.equals(newChildValue)) {
              listener.nodeChildChange(newChildValue);
              prevChildValue = newChildValue;
            }
          }
        } catch (Exception ex) {
          throw new ZooKeeperException("Failed to notify ZooKeeper listener", ex);
        }
      }
    
      /**
       * A ZooKeeper watch.
       */
      final class ZkWatcher implements Watcher, StatCallback {
        @Override
        public void process(WatchedEvent event) {
          if (stateQueue != null) {
            enqueueState(event.getState());
          }
          String path = event.getPath();
          if (event.getType() == Event.EventType.None) {
            // Connection state has been changed. Keep retrying until the connection is recovered.
            switch (event.getState()) {
              case Disconnected:
                break;
              case SyncConnected:
                // We are here because of one of the following:
                // - initial connection,
                // - reconnection due to session timeout or
                // - reconnection due to session expiration
                // Once connected, reset the retry delay.
                latch.countDown();
                break;
              case Expired:
                // Session expired usually happens when a client reconnected to the server after
                // long time network partition, exceeding the configured
                // session timeout. We need to reconstruct the ZooKeeper client.
                // First, clean the original handle.
                close(false);
                zooKeeper = null;
                try {
                  if (!activeClose) {
                    connect();
                  }
                } catch (ZooKeeperException e) {
                  logger.warn("Failed to attempt to recover a ZooKeeper connection", e);
                }
                break;
            }
          } else {
            if (path != null && path.startsWith(zNodePath)) {
              // Something has changed on the node, let's find out.
              try {
                zooKeeper.exists(path, true, this, null);
              } catch (Exception e) {
                e.printStackTrace();
              }
            }
          }
        }
    
        @Override
        public void processResult(int responseCodeInt, String path, Object ctx, Stat stat) {
          Code responseCode = get(responseCodeInt);
          switch (responseCode) {
            case OK:
              break;
            case NONODE:
              break;
            case SESSIONEXPIRED:
              // Ignore this and let the zNode Watcher process it first.
            case NOAUTH:
              // It's possible that this happens during runtime. We ignore this and wait for the ACL
              // configuration returns to normal. If it happens when the ZooKeeper client is initially
              // constructed, the constructor will throw an exception.
              return;
            default:
              // Retry on recoverable errors. Fatal errors go to the process() method above.
              try {
                zooKeeper.exists(path, true, this, null);
              } catch (Exception ex) {
                throw new ZooKeeperException("Failed to process ZooKeeper callback event", ex);
              }
              return;
          }
          if (!activeClose) {
            notifyChange();
            //enqueue an end flag to force the main thread to wait until this callback finished  before exit
            if (stateQueue != null) {
              enqueueState(KeeperState.Disconnected);
            }
          }
        }
    
        /**
         * Enqueue the state.
         * @param state ZooKeeper state
         */
        private void enqueueState(KeeperState state) {
          if (stateQueue == null) {
            return;
          }
          try {
            stateQueue.put(state);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    
    

    7、ZKClient.java

    Zookeeper客户端

    package com.dataeye.zookeeper.client;
    
    import com.dataeye.crawler.thrift.Worker;
    import com.dataeye.zookeeper.DefaultNodeValueCodec;
    import com.dataeye.zookeeper.ZooKeeperConnector;
    import com.dataeye.zookeeper.ZooKeeperListener;
    import com.dataeye.zookeeper.ZookeeperProperty;
    
    import java.util.*;
    import java.util.stream.Collectors;
    
    
    public class ZKClient {
    
        public static void main(String[] args) {
            String zkConn = "localhost:2181";
    
            DefaultNodeValueCodec nodeValueCodec = DefaultNodeValueCodec.INSTANCE;
    
            ZookeeperProperty zookeeperProperty = new ZookeeperProperty(zkConn, "/de-spider/works", 60000);
    
            ZooKeeperConnector zooKeeperConnector = new ZooKeeperConnector(
                    zookeeperProperty.getZkConnnectionStr(),
                    zookeeperProperty.getzNodePath(),
                    zookeeperProperty.getSessionTimeout(),
                    new ZooKeeperListener() {
                        @Override
                        public void nodeChildChange(Map<String, String> newChildrenValue) {
                            List<Worker> newWorkers = newChildrenValue.values().stream().map(nodeValueCodec::decode).filter(Objects::nonNull).collect(Collectors.toList());
                            Map<String, Set<Worker>> newBizWorkers = new HashMap<>();
    
                            if (newWorkers != null) {
                                for (Worker worker : newWorkers) {
                                    String biz = worker.biz;
    
                                    if (!newBizWorkers.containsKey(biz)) {
                                        newBizWorkers.put(biz, new HashSet<>());
                                    }
                                    newBizWorkers.get(biz).add(worker);
                                }
                            }
                        }
    
                        @Override
                        public void nodeValueChange(String newValue) {
    
                        }
    
                        @Override
                        public void connected() {
    
                        }
                    });
            zooKeeperConnector.connect();
        }
    }
    
    

    相关文章

      网友评论

          本文标题:4、Zookeeper的客户端实例

          本文链接:https://www.haomeiwen.com/subject/gbnhxxtx.html