美文网首页
zookeeper 详解之API接口篇

zookeeper 详解之API接口篇

作者: mr_酱 | 来源:发表于2018-11-30 14:23 被阅读14次

根据zookeeper API接口开发应用服务端和客户端

项目重点:

  • 利用zookeeper中创建临时节点的客户端断开连接后,临时节点会被删除的特性,通过zookeeper的节点watch,监控集群服务。
服务器动态上下线机制.png

zookeeper 服务端

代码如下:

package cn.itcast.bigdata.zkdist;

import java.io.IOException;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class DistributedServer {

    private static final String CONNECT_STRING = "mini1:2181,mini2:2181,mini3:2181";
    private static final int SESSION_TIMEOUT = 2000;

    private static final String PARENT_NODE = "/servers";
    private ZooKeeper zk = null;

    // 获取连接
    public void getContent() throws Exception {
        zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {

            @Override
            public void process(WatchedEvent event) {
                // TODO Auto-generated method stub
                // 时间通知后的回调函数
                System.out.println(event.getType() + "------" + event.getPath());

                try {
                    zk.getChildren("/", true);
                } catch (KeeperException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
    }

    // 注册服务器信息
    public void registerServer(String hostname) throws Exception {
        String create = zk.create(PARENT_NODE + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname + " is on line " + create);
    }

    // 业务
    public void handleBussiness(String hostname) throws InterruptedException {
        System.out.println(hostname + " is working ...");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
        // 客户端链接
        DistributedServer server = new DistributedServer();
        server.getContent();
        // 注册服务器信息
        server.registerServer(args[0]);
        // 启动业务功能
        server.handleBussiness(args[0]);
    }

}

zookeeper客户端代码

代码如下:

package cn.itcast.bigdata.zkdist;

import java.util.ArrayList;
import java.util.List;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class DistributedClient {
    private static final String CONNECT_STRING = "mini1:2181,mini2:2181,mini3:2181";
    private static final int SESSION_TIMEOUT = 2000;

    private static final String PARENT_NODE = "/servers";
    private ZooKeeper zk = null;
    // volatile 定义的变量只在堆内存不会拷贝到线程的堆内存中
    private volatile List<String> serverList;

    // 获取连接
    public void getContent() throws Exception {
        zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {

            @Override
            public void process(WatchedEvent event) {
                // TODO Auto-generated method stub
                // 时间通知后的回调函数
                try {
                    // 重新更新服务器列表,并注册监听
                    getServerList();
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
    }

    // 获取子节点信息,监听父节点
    public void getServerList() throws Exception {
        List<String> children = zk.getChildren(PARENT_NODE, true);
        List<String> servers = new ArrayList<String>();
        for (String child : children) {
            byte[] data = zk.getData(PARENT_NODE + "/" + child, false, null);
            servers.add(new String(data));
        }
        
        serverList = servers;
        
        //打印服務器列表
        System.out.println(serverList);
        
    }

    // 业务
    public void handleBussiness() throws InterruptedException {
        System.out.println("client is working ...");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
        // 获取连接
        DistributedClient client = new DistributedClient();
        client.getContent();

        // 获取服务器列表
        client.getServerList();

        // 业务线程
        client.handleBussiness();
    }
}


相关文章

网友评论

      本文标题:zookeeper 详解之API接口篇

      本文链接:https://www.haomeiwen.com/subject/zdvccqtx.html