美文网首页
zookeeper入门之发布订阅模式简单实现

zookeeper入门之发布订阅模式简单实现

作者: 0x70e8 | 来源:发表于2019-02-11 22:02 被阅读0次

    zookeeper通过使用watcher可以实现发布订阅的功能,实际上就是基于监听的事件触发。

    示例

    以下是在zk上创建一个Node存储app的配置信息,然后监听配置变化来做出相应的动作。

    模拟配置信息类

    public class SampleConf {
        private String url;
        private int port;
        private String name;
        private String password;
    
        public String getUrl() {
            return url;
        }
    
        public void setUrl(String url) {
            this.url = url;
        }
    
        public int getPort() {
            return port;
        }
    
        public void setPort(int port) {
            this.port = port;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getPassword() {
            return password;
        }
    
        public void setPassword(String password) {
            this.password = password;
        }
    
        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder("{");
            sb.append("\"url\":\"")
                    .append(url).append('\"');
            sb.append(",\"port\":")
                    .append(port);
            sb.append(",\"name\":\"")
                    .append(name).append('\"');
            sb.append(",\"password\":\"")
                    .append(password).append('\"');
            sb.append('}');
            return sb.toString();
        }
    }
    

    简单封装的zk工具类

    
    import com.alibaba.fastjson.JSON;
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    public class SimpleZKUtils {
        private SimpleZKUtils() {
        }
    
        private static final Logger LOGGER = LoggerFactory.getLogger(SimpleZKUtils.class);
    
        private static final String hostPort = "localhost:2181";
        private static ZooKeeper zk;
    
        static {
            try {
                zk = new ZooKeeper(hostPort, 3000, event -> System.out.println(JSON.toJSONString(event)));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public static Stat set(String path, String data) {
            try {
                Stat stat = zk.exists(path, false);
                if (null != stat) {
                    return zk.setData(path, data.getBytes(), stat.getVersion());
                }
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        public static String get(String path) {
            try {
                Stat stat = zk.exists(path, false);
                if (null != stat) {
                    return new String(zk.getData(path, true, stat));
                }
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        public static String get(String path, Watcher watcher) {
            try {
                Stat stat = zk.exists(path, false);
                if (null != stat) {
                    return new String(zk.getData(path, watcher, stat));
                }
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        public static void create(String path, String data) {
            try {
                Stat stat = zk.exists(path, false);
                if (null == stat) {
                    zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                } else {
                    throw new RuntimeException("node is already existed");
                }
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void del(String path) {
            try {
                Stat stat = zk.exists(path, false);
                if (null != stat) {
                    zk.delete(path, stat.getVersion());
                } else {
                    throw new RuntimeException("node is already existed");
                }
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    测试

    1. 创建ZNode
    
        public static final String basePath = "/app1";
        public static final String path = "/app1/conf";
        private static SampleConf sampleConf = null;
        @Test
        public void testCreate() {
            SimpleZKUtils.create(basePath,"");
            SimpleZKUtils.create(path, "");
        }
    
        @Test
        public void configure() {
            SampleConf sampleConf = new SampleConf();
            sampleConf.setName("jk");
            sampleConf.setUrl("localhost");
            sampleConf.setPort(2181);
            sampleConf.setPassword("helloworld");
            SimpleZKUtils.set(path, JSON.toJSONString(sampleConf));
            System.out.println(SimpleZKUtils.get(path));
        }
    
    
    1. 监听
    @Test
        public void testPubSub() {
            String conf = SimpleZKUtils.get(path, new ConfigWatcher((watcher) -> {
                System.out.println("watcher execute");
                sampleConf = JSON.parseObject(SimpleZKUtils.get(path, watcher), SampleConf.class);
                System.out.println(JSON.toJSONString(sampleConf));
            }));
            System.out.println(conf);
            //  阻塞线程以查看监听触发的动作
            LockSupport.park();
        }
        // Watcher实现类
        static class ConfigWatcher implements Watcher {
    
            private Consumer<Watcher> myWatch;
    
            ConfigWatcher(Consumer<Watcher> myWatch) {
                this.myWatch = myWatch;
            }
    
            @Override
            public void process(WatchedEvent event) {
                if (event.getType().equals(Watcher.Event.EventType.NodeDataChanged)) {
                    // 使用此方式是为了把watcher实例设置到zk的get方法里面去
                    myWatch.accept(this);
                }
            }
        }
    
    

    因为zk的watcher是一次性的,所以每次在触发事件时需要设置watcher才能在后续的事件发生时继续响应,此处我套了个Consumer接口来复用最外层的watcher实例,因为在lambda表达式里面没法直接传this。使用匿名内部类可以解决:

        @Test
        public void testPubSub2() {
            String conf = SimpleZKUtils.get(path, new Watcher(){
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("watcher execute");
                    sampleConf = JSON.parseObject(SimpleZKUtils.get(path, this), SampleConf.class);
                    System.out.println(JSON.toJSONString(sampleConf));
                }
            });
            System.out.println(conf);
            LockSupport.park();
        }
    

    相关文章

      网友评论

          本文标题:zookeeper入门之发布订阅模式简单实现

          本文链接:https://www.haomeiwen.com/subject/qfuceqtx.html