美文网首页大数据架构师+大数据微服务架构和实践
使用Zookeeper解决微服务架构下分布式事务问题

使用Zookeeper解决微服务架构下分布式事务问题

作者: A尚学堂Nancy老师 | 来源:发表于2018-09-17 21:53 被阅读12次

    准备工作

    单机调试zookeeper集群的话,我们需要在虚拟机里虚拟出几台“微服务器“,做这一步操作之前需要在系统中预留出来8G以上磁盘空间,4G以上物理内存。

    [if !supportLists]1. [endif]虚拟机

    我们使用virtualbox

    在官网下载最新版并安装

    https://www.virtualbox.org/wiki/Downloads

    [if !supportLists]2. [endif]操作系统

    操作系统使用

    CentOS-6.8-x86_64-minimal版

    [if !supportLists]3. [endif]Zookeeper

    下载Zookeeper 

    http://zookeeper.apache.org

    [if !supportLists]4. [endif]准备SSH连接工具

    Xshell 或Winscp + Putty

    https://winscp.net/eng/download.php

    https://www.putty.org/

    操作系统安装

    [if !supportLists]1. [endif]加载光盘镜像

    选择ISO文件

    [if !supportLists]2. [endif]跳过磁盘测试

    剩下的步骤按照提示下一步就可以了

    Zookeeper集群安装部署

    Zookeeper客户端命令

    Zookeeper Java Api

    使用zookeeper开发分布式锁

    分布式锁算法

    竞争式核心代码

    此方式会在高并发场景下有缺陷

    // 1.检查父节点

    String path = "/locks"; //父节点名称

    Stat stat = zooKeeper.exists(path, false);

      //如果 stat这个对象是空的意味着需要创建节点

    if (stat == null){

    //创建父节点

    // createMode e临时节点 不支持 有子节点

    zooKeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

    System.out.println("DistributedLock -> getLock ->父节点创建成功");

    }

    String lock_path = path + "/" + lockItem; //子节点完整路径

    // 2.创建 带有唯一标识的 子节点

    int try_max = 20;  //允许重试次数

    int try_count = 1; //重试计数

    long try_time = 2000; //重试间隔

    try{

    //创建成功

    zooKeeper.create(lock_path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

    }catch (Exception e) {

    /**创建失败 说明节点已存在

    *接下来使用while循环尝试继续创建节点

    *直到成功或次数超限

     */

    while (try_count < try_max) {

    try_count = try_count + 1;

    Thread.sleep(try_time);

    try{

    zooKeeper.create(lock_path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

    }catch (Exception e1) {

    System.out.println("DistributedLock -> getLock子节点创建中 当前次数  :" +  try_count);

    continue;//重试创建失败 继续

    }

    break; //创建节点成功,退出循环

    }

    }

    System.out.println("DistributedLock -> getLock子节点创建: 成功 使用次数 :" +  try_count);

    队列式

    Curator

    Curator是Netflix公司一个开源的zookeeper客户端,在原生API接口上进行了包装,解决了很多ZooKeeper客户端非常底层的细节开发。同时内部实现了诸如Session超时重连,Watcher反复注册等功能,实现了Fluent风格的API接口,是使用最广泛的zookeeper客户端之一。

    Curator包含了几个包:

    curator-framework:对zookeeper的底层api的一些封装

    curator-client:提供一些客户端的操作,例如重试策略等

    curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等

    Maven依赖

    使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x

    <dependency>

                <groupId>org.apache.curator</groupId>

                <artifactId>curator-framework</artifactId>

                <version>2.12.0</version>

            </dependency>

            <dependency>

                <groupId>org.apache.curator</groupId>

                <artifactId>curator-recipes</artifactId>

                <version>2.12.0</version>

            </dependency>

    建立连接

    retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口

    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

    CuratorFramework client =

    CuratorFrameworkFactory.newClient(

                    connectionString,

                            5000,

                            3000,

                            retryPolicy);

    数据节点操作

    @Test

    /**

    *创建节点

     */

    public void t2()throws Exception {

    String string = client.create().forPath("/ct1");

    System.out.println("string:" + string);

    }

    @Test

    /**

    *展示节点

     */

    public void t3()throws Exception {

    List<String> forPath = client.getChildren().forPath("/");

    for(String node : forPath) {

    System.out.println(node);

    }

    }

    @Test

    /**

    *节点类型

     */

    public void t4()throws Exception {

    String string = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/ct1");

    System.out.println("string:" + string);

    t3();

    }

    @Test

    /**

    *节点数据

     */

    public void t5()throws Exception {

    String string = client.create().withMode(CreateMode.PERSISTENT).forPath("/ct2","aaxxa".getBytes());

    System.out.println("string:" + string);

    }

    @Test

    /**

    *获取数据

     */

    public void t6()throws Exception {

    t5();

    byte[] forPath = client.getData().forPath("/ct2");

    System.out.println("string:" + new String(forPath));

    }

    @Test

    /**

    *创建子节点

     */

    public void t7()throws Exception {

    client.create().forPath("/ct666/xxoo","xxoo".getBytes());

    }

    @Test

    /**

    *创建子节点并自动创建父节点

     */

    public void t8()throws Exception {

    client.create().creatingParentsIfNeeded().forPath("/ct666/xxoo2","xxoo".getBytes());

    }

    @Test

    /**

    *创建子节点并自动创建父节点

     */

    public void t9()throws Exception {

    client.create().creatingParentContainersIfNeeded().forPath("/ct666/xxoo2","xxoo".getBytes());

    }

    @Test

    /**

    *创建子节点并自动创建父节点

     */

    public void t10()throws Exception {

    List<String> forPath = client.getChildren().forPath("/ct666");

    for(String node : forPath) {

    System.out.println(node);

    }

    }

    @Test

    /**

    *删除节点

     */

    public void t11()throws Exception {

    client.delete().forPath("/ct666/xxoo");

    t10();

    }

    @Test

    /**

    *删除节点,有子节点不能删除

     */

    public void t12()throws Exception {

    client.delete().forPath("/ct666");

    t10();

    }

    @Test

    /**

    *删除节点,有子节点 一起删

     */

    public void t13()throws Exception {

    client.delete().deletingChildrenIfNeeded().forPath("/ct666");

    t10();

    }

    @Test

    /**

    *检查节点是否存在

     */

    public void t14()throws Exception {

    Stat forPath = client.checkExists().forPath("/ct666");

    if(null == forPath) {

    System.out.println("不存在");

    }else {

    client.delete().deletingChildrenIfNeeded().forPath("/ct666");

    t10();

    }

    }

    @Test

    /**

    *事务

    * 1.检查节点是否存在

    * 2.创建节点

    * 3.设置数据

    * 4.提交事务

     */

    public void t15()throws Exception {

    Collection<CuratorTransactionResult> commit = client.inTransaction()

    .check().forPath("/")

    .and()

    .create().forPath("/order/order2")

    .and()

    .setData().forPath("/order/order1","xxx".getBytes())

    .and()

    .commit();

    for (CuratorTransactionResult curatorTransactionResult : commit) {

    System.out.println("curatorTransactionResult:" + curatorTransactionResult.getForPath() + curatorTransactionResult.getResultStat());

    }

    }

    事件监听 Cache

    Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。

    Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。

    Curator提供了三种Watcher(Cache)来监听结点的变化。

    Path Cache

    Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。

    @Test

    /**

    PathChildrenCache监听子节点

     */

    public void t16()throws Exception {

    PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/", true );

    pathChildrenCache.start();

    PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {

    @Override

    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {

    System.out.println("事件类型:" + event.getType());

    if(null != event.getData())

                    System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));

    }

    };

    pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);

    Thread.sleep(Long.MAX_VALUE);

    }

    NodeCache

    Node Cache只是监听某一个特定的节点是否存在和数据变化

    @Test

    /**

    NodeCache监听单个节点

     */

    public void t17()throws Exception {

    NodeCache nodeCache = new NodeCache(client, "/order");

    nodeCache.start();

    NodeCacheListener nodeCacheListener = new NodeCacheListener() {

    @Override

    public void nodeChanged() throws Exception {

    ChildData currentData = nodeCache.getCurrentData();

    if(null == currentData ) {

    System.out.println("节点不存在");

    }else {

    System.out.println("currentData:" + new String(currentData.getData()));

    }

    }

    };

    nodeCache.getListenable().addListener(nodeCacheListener);

    Thread.sleep(Long.MAX_VALUE);

    Tree Cache

    @Test

    /**

    TreeCache监听节点下的所有事件

     */

    public void t18()throws Exception {

    TreeCache treeCache = new TreeCache(client, "/order");

    treeCache.start();

    TreeCacheListener treeCacheListener = new TreeCacheListener() {

    @Override

    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {

    System.out.println("事件类型:" + event.getType());

    if(null != event.getData())

                    System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));

    }

    };

    treeCache.getListenable().addListener(treeCacheListener);

    Thread.sleep(Long.MAX_VALUE);

    }

    分布式锁

    可重入共享锁—Shared Reentrant Lock

    不可重入共享锁—Shared Lock

    可重入读写锁—Shared Reentrant Read Write Lock

    多共享锁对象—Multi Shared Lock

    分布式计数器

    分布式队列

    分布式屏障—Barrier

    相关文章

      网友评论

        本文标题:使用Zookeeper解决微服务架构下分布式事务问题

        本文链接:https://www.haomeiwen.com/subject/nzzdnftx.html