美文网首页
分布式锁

分布式锁

作者: Audience0 | 来源:发表于2019-10-08 21:14 被阅读0次

    实现分布式锁的方式
    一.基于数据库实现分布式锁
    1、在数据库新建一张锁表(lock)
    create table lock (
    id
    method_name (唯一约束)
    .......
    )
    2、获取锁时向表中插入一条数据,由于有唯一约束,只会有一个线程插入成功,插入成功的线程获得锁,可以继续操作,没有插入成功的线程没有获得锁,不能操作;
    3、解锁时删除该条数据;
    主要问题:
    可用性差,数据库故障会导致业务系统不可用;
    数据库性能存在瓶颈,不适合高并发场景;
    删除锁失败容易造成死锁;
    锁的失效时间难以控制;

    第二种:基于Redis实现分布式锁
    加锁:
    setnx命令加锁,并设置锁的有效时间和持有人标识;
    expire命令设置锁的过期时间;
    解锁:
    检查是否持有锁,然后删除锁;
    delete命令删除锁
    在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。
    基于Redis实现分布式锁,采用一个开源项目http://redisson.org

    第三种:基于zookeeper实现分布式锁
    zookeeper是分布式系统的协调服务,提供配置管理、分布式协同、命名的中心化服务、服务注册发现等;
    zookeeper实现分布式锁采用其提供的临时有序节点+监听来实现;
    Curator客户端给我们提供了现成的分布式互斥锁来实现分布式锁;
    1、创建分布式互斥锁
    InterProcessMutex lock = new InterProcessMutex(zookeeperCuratorClient.client, "/storeLock");

    2、获取分布式互斥锁
    if (lock.acquire(10, TimeUnit.SECONDS))
    3、释放分布式互斥锁
    lock.release();


    curator客户端分布式锁

    
    public class lockService {
        @Autowired
        private ZookeeperDao dao;
        InterProcessMutex lock = new InterProcessMutex(new CuratorClient().getClient(),"/lock");
    
        public  void loclMethod(){
    
            try {
                //获取锁
                //一直等待锁
                //lock.acquire();
                //尝试获取锁,如果在指定时间获取锁,则返回true
                if (lock.acquire(1000, TimeUnit.SECONDS)){
                    int check = dao.check();
                    if (check > 0){
                        System.out.println("售出");
                        dao.des(--check);
                    }else {
                        System.out.println("没有库存");
                    }
                }
    
                //Thread.sleep(50);
            } catch (Exception e) {
                System.out.println(e);
            }finally {
                try {
                    //释放锁
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
        }
    }
    

    zookeeper原生客户端分布式锁实现

    /**
     * zookeeper原生客户端实现分布式锁
     * 基于有序临时节点
     */
    public class ZookeeperLock {
    
        private ZooKeeper zooKeeper;
        private static final String ZK_ADDRESS = "127.0.0.1";
        //锁根节点
        private static String rootName = "/zklock";
        //锁节点
        private String lockName;
        //当前锁节点名称
        private String cuurentLockName;
        //zookeeper 超时时间
        private int sessionTimeout = 100000;
    
        private CountDownLatch countDownLatch = new CountDownLatch(1);
        public ZookeeperLock(String lockName) {
            //锁节点名称 初始化
            this.lockName = lockName;
            //zookeeper初始化
            try{
                zooKeeper  = new ZooKeeper(ZK_ADDRESS, sessionTimeout, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
    
                        if (event.getState() == Event.KeeperState.SyncConnected){
                            //zookeeper连接上了
                            countDownLatch.countDown();
                        }
                    }
                });
                //等待zookeeper连接成功
                countDownLatch.await();
    
                //创建一个根节点/lock
                if (zooKeeper.exists(rootName,false) == null){
                    zooKeeper.create(rootName,"rootLock".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
    
    
    
            }catch (Exception e){
                throw new RuntimeException(e);
            }
        }
    
        /**
         * zookeeper加锁
         * 获取分布式锁
         */
        public void lock(){
            try {
                String node = zooKeeper.create(rootName + "/" + lockName,
                        "lock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL_SEQUENTIAL);
                //拿到根节点下的子节点  临时有序的子节点
                List<String> children = zooKeeper.getChildren(rootName, false);
                TreeSet<String> childNodes = new TreeSet<>();
                //childNodes.addAll(children);
                for (String c : children){
                    childNodes.add(rootName+ "/" +c);
                }
                //排好顺序的节点,取出最小的
                String minNode = childNodes.first();
                //获取指定节点的前一个节点
                String preNode = childNodes.lower(node);
    
                //判断是不是最小节点
                if (node.equals(minNode)) {
                    //当前进来的这个线程锁创建的节点就是分布式锁节点
                     cuurentLockName = node;
                     return;
                }
                //用于阻塞未获取锁线程
                CountDownLatch countDownLatch = new CountDownLatch(1);
    
    
                //其他线程没拿到锁
                if (null != preNode){
                    //如果前一个节点不为null 则去监听她
                    Stat exists = zooKeeper.exists(preNode, new Watcher() {
                        @Override
                        public void process(WatchedEvent event) {
                            if (event.getType() == Event.EventType.NodeDeleted){
                                //监听到前一个节点的删除事件,则到计数器减一
                                countDownLatch.countDown();
                            }
                        }
                    });
                    if (exists != null){
                        //等待获取锁
                        countDownLatch.await();
                        //当前前一个节点删除,到计数器减一之后,当前线程获取锁
                        cuurentLockName = node;
                    }
    
                }
    
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }
    
        /**
         * zookeeper解锁
         * 主要做的就是删除当前锁节点删除
         */
        public void unlock(){
    
            try{
                if (StringUtils.isNotBlank(cuurentLockName)){
                    zooKeeper.delete(cuurentLockName,-1);
                }
            }catch (Exception e){
                throw new RuntimeException(e);
            }
        }
    
    }
    

    zkClient实现分布式锁

    /**
     * zkClient客户端实现分布式锁
     * 基于有序临时节点
     */
    public class ZkClientLock {
    
        private ZkClient zooKeeper;
        private static final String ZK_ADDRESS = "127.0.0.1";
        //锁根节点
        private static String rootName = "/zklock";
        //锁节点
        private String lockName;
        //当前锁节点名称
        private String cuurentLockName;
        //zookeeper 超时时间
        private int sessionTimeout = 100000;
    
        /**
         * 创建连接
         * @param lockName
         */
        public ZkClientLock(String lockName) {
            this.lockName = lockName;
    
            try{
                zooKeeper = new ZkClient(ZK_ADDRESS);
    
                //创建一个根节点/lock
                if (!zooKeeper.exists(rootName)){
                    zooKeeper.create(rootName,"rootLock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
            }catch (Exception e){
                throw new RuntimeException(e);
            }
        }
    
        /**
         * 加锁
         */
        public void lock(){
    
            String node = zooKeeper.create(rootName + "/" + lockName,
                    "zkclient".getBytes(), CreateMode.PERSISTENT_SEQUENTIAL);
    
            //获取所有子节点
            List<String> subNodes = zooKeeper.getChildren(rootName);
            TreeSet<String> sortNodes = new TreeSet<>();
            for (String subNode : subNodes){
                sortNodes.add(rootName + "/" +subNode);
            }
            //获取最小节点
            String firstNode = sortNodes.first();
            //获取前一个节点
            String preNode = sortNodes.lower(node);
    
            if (firstNode.equals(node)){
                //则当前节点为锁节点
                cuurentLockName= node;
                return;
            }
            CountDownLatch countDownLatch = new CountDownLatch(1);
            if (StringUtils.isNotBlank(preNode)){
                //说明非锁节点,需要等待,并监听前一个节点
                boolean exists = zooKeeper.exists(preNode);
                if (exists){
                    //监听前一个节点
                    zooKeeper.subscribeDataChanges(preNode, new IZkDataListener() {
                        @Override
                        public void handleDataChange(String dataPath, Object data) throws Exception {
    
                        }
    
                        @Override
                        public void handleDataDeleted(String dataPath) throws Exception {
                            countDownLatch.countDown();
                        }
                    });
    
                    try {
                        countDownLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    cuurentLockName = node;
                }
    
            }
    
        }
        /**
         * zookeeper解锁
         * 主要做的就是删除当前锁节点删除
         */
        public void unlock(){
    
            try{
                if (StringUtils.isNotBlank(cuurentLockName)){
                    zooKeeper.delete(cuurentLockName,-1);
                }
            }catch (Exception e){
                throw new RuntimeException(e);
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:分布式锁

          本文链接:https://www.haomeiwen.com/subject/hbakuctx.html