美文网首页
Zookepper系列之(4)——ZooKeeper Curat

Zookepper系列之(4)——ZooKeeper Curat

作者: 康康不遛猫 | 来源:发表于2018-03-26 12:36 被阅读0次

一、ZooKeeper Curator

1、创建会话

public class Create_Session_Sample {
    public static void main(String[] args) throws Exception{
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        /*CuratorFrameworkFactory.newClient("domain1.book.zookeeper:2181",
                5000,
                3000,
                retryPolicy);*/
 //Fluent风格
        CuratorFrameworkFactory.builder()
                             .connectString("domain1.book.zookeeper:2181")
                             .sessionTimeoutMs(5000)
                             .retryPolicy(retryPolicy)
                             .namespace("base")//隔离命名空间/base
                             .build();
        client.start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}

2、创建节点
使用Curator的同步接口

public class Create_Node_Sample {
    static String path = "/zk-book/c1";
    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("domain1.book.zookeeper:2181")
            .sessionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();//jianl
        client.create()
              .creatingParentsIfNeeded()//创建子节点时没有父节点则创建
              .withMode(CreateMode.EPHEMERAL)//临时节点
              .forPath(path, "init".getBytes());
    }
}

注意:zk的非叶子节点都必须为持久节点,因此此处c1为临时节点,而父节点为持久节点

使用Curator的异步接口

public class Create_Node_Background_Sample {
    static String path = "/zk-book";
 
    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("domain1.book.zookeeper:2181")
            .sessionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();
    static CountDownLatch semaphore = new CountDownLatch(2);
    static ExecutorService tp = Executors.newFixedThreadPool(2);//自定义的Executor
 
    public static void main(String[] args) throws Exception {
        client.start();
        System.out.println("Main thread: " + Thread.currentThread().getName());
        // 此处传入了自定义的Executor
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]");
                System.out.println("Thread of processResult: " + Thread.currentThread().getName());
                semaphore.countDown();
            }
        }, tp).forPath(path, "init".getBytes());
        // 此处没有传入自定义的Executor
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]");
                System.out.println("Thread of processResult: " + Thread.currentThread().getName());
                semaphore.countDown();
            }
        }).forPath(path, "init".getBytes());
 
        semaphore.await();//等待异步完成
        tp.shutdown();
    }
}

注:Zookeeper中所有异步通知事件都会通过默认EventThread线程来处理,而EventThread线程串行处理所有事件。 为了避免执行事件时间过长,影响其他事件执行,可以通过自定义ExecutorService的执行事件。
inBackground(BackgroundCallback arg0, Executor arg1)

3、删除节点

//使用Curator删除节点

public class Del_Data_Sample {
 
    static String path = "/zk-book/c1";
    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("domain1.book.zookeeper:2181")
            .sessionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();
        client.create()
              .creatingParentsIfNeeded()
              .withMode(CreateMode.EPHEMERAL)
              .forPath(path, "init".getBytes());
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);
        client.delete().deletingChildrenIfNeeded()
                       .withVersion(stat.getVersion()).forPath(path);
    }
}

storingStatIn 把服务器端获取的状态数据存储到stat对象
deletingChildrenIfNeeded 递归删除子节点
withVersion 按版本号删除
client.delete().guaranteed().forPath(path); 强制删除,只有会话有效,就会持续删除,直到删除。

4、获取节点数据

使用Curator获取数据内容

public class Get_Data_Sample {
 
    static String path = "/zk-book";
    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("domain1.book.zookeeper:2181")
            .sessionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();
    public static void main(String[] args) throws Exception {
        client.start();
        client.create()
              .creatingParentsIfNeeded()
              .withMode(CreateMode.EPHEMERAL)
              .forPath(path, "init".getBytes());
        Stat stat = new Stat();
        System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
    }
}

5、更新数据

//使用Curator更新数据内容

public class Set_Data_Sample {
 
    static String path = "/zk-book";
    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("domain1.book.zookeeper:2181")
            .sessionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {
        client.start();
        client.delete().deletingChildrenIfNeeded().forPath( path );
        client.create()
              .creatingParentsIfNeeded()
              .withMode(CreateMode.EPHEMERAL)
              .forPath(path, "init".getBytes());
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);
        System.out.println("Success set node for : " + path + ", new version: "
                + client.setData().withVersion(stat.getVersion()).forPath(path).getVersion());
        try {
            client.setData().withVersion(stat.getVersion()).forPath(path);
        } catch (Exception e) {
            //版本过期,抛出异常KeepErrorCode=BadVersion
            System.out.println("Fail set node due to " + e.getMessage());
        }
    }
}

6、事件监听

Curator引入NCache来实现对Zookeeper服务端事件的监听,不需要原生那样反复注册Watcher。
Cache分为节点监听NodeCache和子节点监听PathChildrenCache。

#NodeCache 监听节点内容和节点是否存在
public class NodeCache_Sample {
 
    static String path = "/zk-book/nodecache";
    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("domain1.book.zookeeper:2181")
            .sessionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();
     
    public static void main(String[] args) throws Exception {
        client.start();
        client.create()
              .creatingParentsIfNeeded()
              .withMode(CreateMode.EPHEMERAL)
              .forPath(path, "init".getBytes());
        final NodeCache cache = new NodeCache(client,path,false);
        cache.start(true);
        cache.getListenable().addListener(new NodeCacheListener() {//添加NodeCacheListener
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("Node data update, new data: " + 
                new String(cache.getCurrentData().getData()));
            }
        });
        client.setData().forPath( path, "u".getBytes() );
        Thread.sleep( 1000 );
        client.delete().deletingChildrenIfNeeded().forPath( path );
        Thread.sleep( Integer.MAX_VALUE );
    }
}

注:new NodeCache(client,path,false) 默认设置false,如果设置true,则NodeCache在第一次启动
时会立刻从Zookeeper上读取对应节点的数据,保存在NodeCache中。
NodeCache可以监听数据内容变化,也可以监听节点是否存在。如果节点不存在,创建节点之后会触发NodeCacheListener。但删除节点则无法触发NodeCacheListener

#PathChildrenCache 监听子节点新增、数据更新、删除
public class PathChildrenCache_Sample_ExecutorService {
 
    static String path = "/zk-book";
    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("domain1.book.zookeeper:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .sessionTimeoutMs(5000)
            .build();
    static ExecutorService tp = Executors.newFixedThreadPool(2);
     
    public static void main(String[] args) throws Exception {
        client.start();
        System.out.println( Thread.currentThread().getName() );
        PathChildrenCache cache = new PathChildrenCache(client, path,true,false,tp);
        cache.start(StartMode.NORMAL);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, 
                                   PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("CHILD_ADDED," + event.getData().getPath());
                    System.out.println("CHILD_ADDED," + event.getData().getData());
                    System.out.println( "tname: " + Thread.currentThread().getName() );
                    break;
                case CHILD_UPDATED:
                    System.out.println("CHILD_UPDATED," + event.getData().getPath());
                    System.out.println("CHILD_ADDED," + event.getData().getData());
                    break;
                case CHILD_REMOVED:
                    System.out.println("CHILD_REMOVED," + event.getData().getPath());
                    System.out.println("CHILD_ADDED," + event.getData().getData());
                    break;
                default:
                    break;
                }
            }
        });
        Thread.sleep( 1000 );
        client.create().withMode(CreateMode.PERSISTENT).forPath(path);
        Thread.sleep( 1000 );
        client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1");
        Thread.sleep( 1000 );
        client.delete().forPath(path+"/c1");
        Thread.sleep( 1000 );
        client.delete().forPath(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

注:
new PathChildrenCache(client, path,true,false,tp); 如果入参不传tp则使用默认的线程处理。


PathChildrenCache 构建参数

Cache调用start方法的模式
NORMAL: 初始时为空。
BUILD_INITIAL_CACHE: 在这个方法返回之前调用rebuild()。
POST_INITIALIZED_EVENT: 当Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件

Cuartor无法对二级子节点进行事件的监听,如果对/zk/ci监听,那么当/zk/ci/c2节点创建或者删除时候是无法触发节点变化

7、master选举

原理:对接节点/master_select节点,多台机器同时在节点下创建子节点/master_select/lock,只有一个机器能创建成功,该机器也被选中为Master。一个机器成为Master之后,其他机器进入等待,直到该机器完成业务操作,释放Master权利。

public class Recipes_MasterSelect {
 
    static String master_path = "/curator_recipes_master_path";
     
    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("domain1.book.zookeeper:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main( String[] args ) throws Exception {
        client.start();
        LeaderSelector selector = new LeaderSelector(client, 
                master_path, 
                new LeaderSelectorListenerAdapter() {
                    public void takeLeadership(CuratorFramework client) throws Exception {
                        //一个应用程序完成Master逻辑后,另一个程序的takeLeadership方法才被调用
                        System.out.println("成为Master角色");
                        Thread.sleep( 3000 );
                        System.out.println( "完成Master操作,释放Master权利" );
                    }
                });
        selector.autoRequeue();
        selector.start();
        Thread.sleep( Integer.MAX_VALUE );
    }
}

原生API实现节点选举

public class TestMaster implements Watcher{

    String serverId = Integer.toString(2);
    private ZooKeeper zk;
    private String hostPort;
    private volatile boolean connected = false;
    private volatile boolean expired = false;
    boolean isLeader = false; // returns true if there is a master

    void startZK() throws IOException {
        zk = new ZooKeeper(hostPort, 15000, this);
    }

    void stopZK() throws InterruptedException, IOException {
        zk.close();
    }
    
    boolean checkMaster() throws KeeperException, InterruptedException {
        while (true) {
            try {
                Stat stat = new Stat();
                byte data[] = zk.getData("/master", false, stat);
                isLeader = new String(data).equals(serverId);//检查master节点是否由当前serverId创建
                return true;
            } catch (NoNodeException e) {
                // no master, so try create again
                return false;
            } catch (ConnectionLossException e) {

            }
        }
    }
    
    void runForMaster() throws InterruptedException, KeeperException {
        while (true) {
            try {
                zk.create("/master", serverId.getBytes(), Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL);
                isLeader = true;
                takeLeadership();//获取Master节点,执行任务
                break;
            } catch (NodeExistsException e) {
                 isLeader = false;//master节点已经存在                
                 break;
            } catch (ConnectionLossException e) {
                
            }       
            if (checkMaster()) break;
        }
    }
    
    void takeLeadership() {
        //getWorkers();
        (new RecoveredAssignments(zk)).recover( new RecoveryCallback() {
            public void recoveryComplete(int rc, List<String> tasks) {
                if(rc == RecoveryCallback.FAILED) {
                    System.out.println("Recovery of assigned tasks failed.");
                } else {
                    System.out.println( "Assigning recovered tasks" );
                    //getTasks();
                }
            }
        });
    }

    @Override
    public void process(WatchedEvent e) {  
        System.out.println("Processing event: " + e.toString());
        if(e.getType() == Event.EventType.None){
            switch (e.getState()) {
            case SyncConnected:
                connected = true;
                break;
            case Disconnected:
                connected = false;
                break;
            case Expired:
                expired = true;
                connected = false;
                System.out.println("Session expiration");
            default:
                break;
            }
        }
    }
}

8、分布式锁

public class Recipes_Lock {
 
    static String lock_path = "/curator_recipes_lock_path";
    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("domain1.book.zookeeper:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {
        client.start();
        final InterProcessMutex lock = new InterProcessMutex(client,lock_path);
        final CountDownLatch down = new CountDownLatch(1);
        for(int i = 0; i < 30; i++){
            new Thread(new Runnable() {
                public void run() {
                    try {
                        down.await();//等待线程都创建结束
                        lock.acquire();
                    } catch ( Exception e ) {
                    }
                    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                    String orderNo = sdf.format(new Date());
                    System.out.println("生成的订单号是 : "+orderNo);
                    try {
                        lock.release();
                    } catch ( Exception e ) {
                    }
                }
            }).start();
        }
        down.countDown();
    }
}

相关文章

网友评论

      本文标题:Zookepper系列之(4)——ZooKeeper Curat

      本文链接:https://www.haomeiwen.com/subject/hbodxxtx.html