美文网首页
ZK实现的分布式锁

ZK实现的分布式锁

作者: 悠扬前奏 | 来源:发表于2020-06-06 23:45 被阅读0次

    工作需要。写了一个基于ZK的分布式锁,记录一下:

    原理

    zk能保证集群上的路径同一时刻只有一个客户端来创建。因此,通过在集群上顺序创建和删除临时路径,在实现分布式锁的获取和释放。

    代码

    zk上有一个客户端框架Curator已经对分布式互斥锁进行了封装,几乎是开箱即用:

    • 封装一下框架的初始化
    public class ZKCuratorManager {
        private static InterProcessMutex lock;
        private static CuratorFramework cf;
        private static String zkAddr = "*.*.*.*:2181";
        private static String lockPath = "/distribute-lock";
    
        static {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            cf = CuratorFrameworkFactory.builder()
                    .connectString(zkAddr)
                    .sessionTimeoutMs(2000)
                    .retryPolicy(retryPolicy)
                    .build();
            cf.start();
        }
    
        public static InterProcessMutex getLock() {
            lock = new InterProcessMutex(cf, lockPath);
            return lock;
        }
    }
    
    • 封装一下工具类
    public class ZKCuratorLockUtil {
    
        /**
         * 从配置类中获取分布式锁对象
         */
        private static InterProcessMutex lock = ZKCuratorManager.getLock();
    
        /**
         * 加锁
         *
         * @return
         */
        public static boolean acquire() {
            try {
                lock.acquire();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return true;
        }
    
        /**
         * 锁的释放
         */
        public static void release() {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    • 测试
      • 所有线程启动后sleep5秒钟
      • 用CyclicBarrier使四个线程同时尝试获取锁
      • 结果应该是四个线程依次获取-释放锁
    public class ZkLockTest {
    
        public static void main(String[] args) {
            int N = 4;
            CyclicBarrier barrier = new CyclicBarrier(N);
            for (int i = 0; i < N; i++) {
    
                new WriterTest(barrier).start();
            }
    
            System.out.println("END");
        }
    
        static class WriterTest extends Thread {
            private CyclicBarrier cyclicBarrier;
    
            public WriterTest(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
    
            @Override
            public void run() {
                System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
                try {
                    //以睡眠来模拟写入数据操作
                    Thread.sleep(5000);
                    System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("所有线程写入完毕,继续处理其他任务...");
                //加锁
                ZKCuratorLockUtil.acquire();
                System.out.println("线程" + Thread.currentThread().getName() + "获得分布式锁");
                try {
                    Thread.sleep(2000);
                    ZKCuratorLockUtil.release();
                    System.out.println("线程" + Thread.currentThread().getName() + "释放分布式锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("END");
            }
        }
    }
    
    • 测试结果
    线程Thread-0正在写入数据...
    线程Thread-1正在写入数据...
    END
    线程Thread-2正在写入数据...
    线程Thread-3正在写入数据...
    线程Thread-1写入数据完毕,等待其他线程写入完毕
    线程Thread-2写入数据完毕,等待其他线程写入完毕
    线程Thread-0写入数据完毕,等待其他线程写入完毕
    线程Thread-3写入数据完毕,等待其他线程写入完毕
    所有线程写入完毕,继续处理其他任务...
    所有线程写入完毕,继续处理其他任务...
    所有线程写入完毕,继续处理其他任务...
    所有线程写入完毕,继续处理其他任务...
    线程Thread-3获得分布式锁
    线程Thread-3释放分布式锁
    END
    线程Thread-1获得分布式锁
    线程Thread-1释放分布式锁
    END
    线程Thread-2获得分布式锁
    线程Thread-2释放分布式锁
    END
    线程Thread-0获得分布式锁
    线程Thread-0释放分布式锁
    END
    

    相关文章

      网友评论

          本文标题:ZK实现的分布式锁

          本文链接:https://www.haomeiwen.com/subject/ybcqtktx.html