美文网首页
Zookeeper实战

Zookeeper实战

作者: 索伦x | 来源:发表于2019-03-03 17:03 被阅读0次

Springboot整合curator访问zookeeper

POM

        <!-- zookeeper curator客户端 -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>

配置文件application.yml

#============== zookeeper ===================
zookeeper:
  server: 192.168.8.105:2181
  sessionTimeoutMs: 6000
  connectionTimeoutMs: 6000
  maxRetries: 3
  baseSleepTimeMs: 1000

测试代码

@Service
public class CuratorTestServiceImpl implements CuratorTestService {

    @Value("${zookeeper.server}")
    private String zookeeperServer;
    @Value(("${zookeeper.sessionTimeoutMs}"))
    private int sessionTimeoutMs;
    @Value("${zookeeper.connectionTimeoutMs}")
    private int connectionTimeoutMs;
    @Value("${zookeeper.maxRetries}")
    private int maxRetries;
    @Value("${zookeeper.baseSleepTimeMs}")
    private int baseSleepTimeMs;

    @Override
    public void testCurator() {
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(zookeeperServer)
                .sessionTimeoutMs(sessionTimeoutMs)
                .connectionTimeoutMs(connectionTimeoutMs)
                .retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries)).build();

        try {
            /**
             * 创建会话
             */
            client.start();

            /**
             * 创建节点
             * 注意:
             * 1 除非指明创建节点的类型,默认是持久节点
             * 2 ZooKeeper规定:所有非叶子节点都是持久节点,所以递归创建出来的节点,只有最后的数据节点才是指定类型的节点,其父节点是持久节点
             */
            client.create().forPath("/China");//创建一个初始内容为空的节点
            client.create().forPath("/America", "zhangsan".getBytes());
            client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");//创建一个初始内容为空的临时节点
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());//递归创建,/Russia是持久节点

            /**
             * 异步创建节点
             * 注意:如果自己指定了线程池,那么相应的操作就会在线程池中执行,如果没有指定,那么就会使用Zookeeper的EventThread线程对事件进行串行处理
             */
            client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
                            + ",type:" + event.getType());
                }
            }, Executors.newFixedThreadPool(10)).forPath("/async-curator-my");

            client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
                            + ",type:" + event.getType());
                }
            }).forPath("/async-curator-zookeeper");

            /**
             * 获取节点内容
             */
            byte[] data = client.getData().forPath("/America");
            System.out.println(new String(data));
            byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America"); //传入一个旧的stat变量,来存储服务端返回的最新的节点状态信息
            System.out.println(new String(data2));
            /**
             * 更新数据
             */
            Stat stat = client.setData().forPath("/America");
            //client.setData().withVersion(4).forPath("/America", "lisi".getBytes());
            client.setData().forPath("/America", "lisi".getBytes());

            /**
             * 删除节点
             */
            client.delete().forPath("/China");//只能删除叶子节点
            client.delete().deletingChildrenIfNeeded().forPath("/Russia");//删除一个节点,并递归删除其所有子节点
            //client.delete().withVersion(5).forPath("/America");//强制指定版本进行删除
            client.delete().forPath("/America");
            //client.delete().guaranteed().forPath("/America");//注意:由于一些网络原因,上述的删除操作有可能失败,使用guaranteed(),如果删除失败,会记录下来,只要会话有效,就会不断的重试,直到删除成功为止
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

Curator framework实例

使用CuratorFrameworkFactory产生framework实例。 CuratorFrameworkFactory 既提供了factory方法也提供了builder来创建实例。CuratorFrameworkFactory是线程安全的。你应该在应用中为单一的ZooKeeper集群共享唯一的CuratorFramework实例
工厂方法(newClient())提供了一个简单的方式创建实例。Builder可以使用更多的参数控制生成的实例。一旦生成framework实例, 必须调用start方法启动它。应用结束时应该调用close方法关闭它。
Curator framework原理: 单一长链接

单一长链接

zookeeper分布式锁

POM

        <!-- curator分布式锁 -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

配置管理类CuratorConfiguration.java

/**
 *   zk连接配置管理
 *   @author Suoron
 */

@Component
public class CuratorConfiguration {
    @Value("${zookeeper.server}")
    private String zookeeperServer;
    @Value(("${zookeeper.sessionTimeoutMs}"))
    private int sessionTimeoutMs;
    @Value("${zookeeper.connectionTimeoutMs}")
    private int connectionTimeoutMs;
    @Value("${zookeeper.maxRetries}")
    private int maxRetries;
    @Value("${zookeeper.baseSleepTimeMs}")
    private int baseSleepTimeMs;

    @Bean
    public CuratorFramework createZkClient(){
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(zookeeperServer)
                .sessionTimeoutMs(sessionTimeoutMs)
                .connectionTimeoutMs(connectionTimeoutMs)
                .retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries)).build();
        /**
         * 创建会话
         */

        client.start();

        return client;
    }
}

分布式锁测试Service


/**
 *  火车票抢票模拟
 * @author Suoron
 */

@Service
public class CuratorLockServiceImpl implements CuratorLockService {

    @Resource
    CuratorFramework zkclient;

    @Override
    public void testDistribute() {
        //创建分布式锁
        final InterProcessMutex lock = new InterProcessMutex(zkclient, "/locktest");

        //模拟多线程,可以从多个应用(tomcat)发起请求
        for(int i = 0; i < 10; i++){
            new Thread(new Runnable() {
                private int id;

                public Runnable setArgs(int x){
                    id = x;
                    return this;
                }
                @Override
                public void run() {
                    try {
                        //加锁
                        lock.acquire();
                        //lock.acquire(5, TimeUnit.SECONDS) == true;
                        //-------------业务处理开始
                        System.out.println(System.currentTimeMillis()+":当前抢票用户:"+id);
                        //TODO 数据库操作
                        //-------------业务处理结束
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            //释放
                            lock.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.setArgs(i),"t" + i).start();
        }
        try {
            Thread.sleep(100*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

zookeeper 栅栏

应用场景:

马场赛马,每匹马相当于一个应用程序,必须等全部数量的马准备就绪才能开始赛马

赛马封装HorseRace.java

/**
 *  
 *   赛马算法
 *   多个进程同一时间开始
 *   @author Suoron
 *   2018/09/27
 * 
 */

public interface HorseRace {

    /* 创建赛马 */
    public void createRace(String rootPath) throws Exception ;
    
    /**
     *  进入比赛 
     *  @param rootPath 比赛根路径
     *  @param name 马匹名称
     *  @param size 本次赛马数
     *  @return boolean 返回是否准备就绪状态 
     * 
     */
    public boolean enterRace(String rootPath,String name,int size);
    /* 退出比赛 */
    public  boolean exitRace(String rootPath,String name);
    /* 退出比赛(等待所有人一起退出) */
    public  boolean togetherExitRace(String rootPath,String name);  
}

赛马算法实现类HorseRaceImpl.java

public class HorseRaceImpl implements HorseRace {

    @Resource
    CuratorFramework zkClient;

    @Override
    public  boolean togetherExitRace(String rootPath,String name){
        //先删除自己的节点
        ZkClientHelper.deleteNode(zkClient,rootPath+"/"+name,0);
        //此时要等待所有的线程都删除了节点,即所有线程都做完了该做的事情,才结束线程。确保所有的线程同时结束。
        while(true){
            int size = ZkClientHelper.getChildrenSize(zkClient, rootPath);
            if(size != 0) {
                System.out.println("The current children node under "+rootPath+" is " + size+", still need waiting");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                return true;
            }
        }
    }

    @Override
    public boolean enterRace(String rootPath,String name,int size) {

        try{
            this.createRace(rootPath);
        }catch(Exception ex){
            ex.printStackTrace();
            return false;
        }

        String node = ZkClientHelper.createTempNode(zkClient, rootPath+"/"+name, "1");
        while(true) {
            int n = ZkClientHelper.getChildrenSize(zkClient, rootPath);
            if (n != size) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                return true;
            }
        }
    }

    @Override
    public void createRace(String rootPath) throws Exception {
        if(!ZkClientHelper.checkExists(zkClient,rootPath)){
            String rs = ZkClientHelper.creatingParentsNode(zkClient, rootPath, "1");
            //System.out.println(rs);           
        }
    }

    @Override
    public boolean exitRace(String rootPath, String name) {
        //先删除自己的节点
        ZkClientHelper.deleteNode(zkClient,rootPath+"/"+name,0);
        return true;
    }

}

工具类ZkClientHelper

public class ZkClientHelper {

    public  static boolean deleteNode(CuratorFramework zkClient,String path,int version) {
        try {
            zkClient.delete().forPath(path);
            //System.out.println("delete path:"+ path + "success");
            return true;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return false;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

    }
    public static int getChildrenSize(CuratorFramework zkClient,String path) {
        try {
            return zkClient.getChildren().watched().forPath(path).size();
        } catch (KeeperException e) {
            e.printStackTrace();
            return -1;
        } catch (Exception e) {
            e.printStackTrace();
            return  -1;
        }
    }
    public static String createTempNode(CuratorFramework zkClient,String path,String data) {
        try {
            String node = zkClient.create().creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .forPath(path,data.getBytes());
            //System.out.println(rs);

            //System.out.println("node "+ path +" with data is created,return node "+node);
            return node;
        } catch (KeeperException e) {
            e.printStackTrace();
            return "ERROR";
        } catch (Exception e) {
            e.printStackTrace();
            return "ERROR";
        }
    }
    public static String creatingParentsNode(CuratorFramework zkClient, String path, String data){
        try {
            String node = zkClient.create().creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT).forPath(path,data.getBytes());
            //System.out.println(rs);
            return node;
        } catch (Exception e) {
            e.printStackTrace();
            return "ERROR";
        }
    }
    public static boolean checkExists(CuratorFramework zkClient,String path){
        try {
            Stat stat = zkClient.checkExists().forPath(path);
            if(stat!=null) {
                return true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
}

测试类

@RunWith(SpringRunner.class)
@SpringBootTest(classes = MyApplication.class,webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class HorseRaceTest {

    @Resource
    HorseRace horseRace;

    class MyThread extends Thread{
        private int horseId;

        private MyThread(int horseId){
            this.horseId = horseId;
        }
        public void run(){

            //先业务处理,后准备赛马
            System.out.println("do something before!!!"+horseId);

            horseRace.enterRace("/hotel/download", "MyHorse"+horseId,  6);
            horseRace.exitRace("/hotel/download", "MyHorse"+horseId);
        }
    }
    class MyThread1 extends Thread{
        private int horseId;

        private MyThread1(int horseId){
            this.horseId = horseId;
        }
        public void run(){
            horseRace.enterRace("/hotel/download", "MyHorse"+horseId,  6);
            horseRace.exitRace("/hotel/download", "MyHorse"+horseId);
            //先赛马,后业务处理
            System.out.println("do something after!!!"+horseId);
        }
    }

    @Test
    public void testRace(){

        MyThread mythread1 = new MyThread(1);  //1号赛马
        MyThread mythread2 = new MyThread(2);  //2号赛马
        MyThread mythread3 = new MyThread(3);  //3号赛马
        MyThread mythread4 = new MyThread(4);  //4号赛马
        MyThread mythread5 = new MyThread(5);  //5号赛马


        MyThread1 mythread6 = new MyThread1(6);  //特殊的6号赛马


        mythread6.start();
        mythread5.start();
        mythread4.start();
        mythread3.start();
        mythread2.start();
        mythread1.start();

        try{
            Thread.sleep(10*1000);
        }catch(Exception ex){
            ex.printStackTrace();
        }

    }
}

相关文章

  • zookeeper系列

    zookeeper系列(一)zookeeper必知 √zookeeper系列(二)实战master选举 √zo...

  • 从零开始学Hadoop大数据分析之ZooKeeper实战

    导言 上一节我学习了关于ZooKeeper基础知识,接下开始ZooKeeper的实战,只有从实战中学习才能进步更快...

  • 【实战】ZooKeeper 实战

    1. 前言 这篇文章简单给演示一下 ZooKeeper 常见命令的使用以及 ZooKeeper Java客户端 C...

  • ZooKeeper实战

    在学习技术的过程中,我发现程序的版本一直在变,每个人的情况也不一样,在参考网上的教程的时候,都或多或少有这样或者那...

  • Zookeeper实战

    Springboot整合curator访问zookeeper POM 配置文件application.yml 测试...

  • Zookeeper实战

    1:# The number of milliseconds of each ticktickTime=2000 ...

  • netty实战

    Netty Zookeeper 亿级流量 高并发 - 实战(修正版) - crazymakercircle...

  • 「从入门到放弃-ZooKeeper」ZooKeeper实战-分布

    前言 上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列中,我们一起写了下如何通过ZooK...

  • Connection reset by peer 秒懂

    疯狂创客圈 经典图书 : 《Netty Zookeeper Redis 高并发实战》[https://www....

  • Zookeeper 扩容实战

    场景描述: zookeeper 版本 3.4.6 现有zk集群是五台, myid分别为 0, 1, 2, 3, 4...

网友评论

      本文标题:Zookeeper实战

      本文链接:https://www.haomeiwen.com/subject/dffmuqtx.html