美文网首页
分布式锁

分布式锁

作者: Audience0 | 来源:发表于2019-10-08 21:14 被阅读0次

实现分布式锁的方式
一.基于数据库实现分布式锁
1、在数据库新建一张锁表(lock)
create table lock (
id
method_name (唯一约束)
.......
)
2、获取锁时向表中插入一条数据,由于有唯一约束,只会有一个线程插入成功,插入成功的线程获得锁,可以继续操作,没有插入成功的线程没有获得锁,不能操作;
3、解锁时删除该条数据;
主要问题:
可用性差,数据库故障会导致业务系统不可用;
数据库性能存在瓶颈,不适合高并发场景;
删除锁失败容易造成死锁;
锁的失效时间难以控制;

第二种:基于Redis实现分布式锁
加锁:
setnx命令加锁,并设置锁的有效时间和持有人标识;
expire命令设置锁的过期时间;
解锁:
检查是否持有锁,然后删除锁;
delete命令删除锁
在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。
基于Redis实现分布式锁,采用一个开源项目http://redisson.org

第三种:基于zookeeper实现分布式锁
zookeeper是分布式系统的协调服务,提供配置管理、分布式协同、命名的中心化服务、服务注册发现等;
zookeeper实现分布式锁采用其提供的临时有序节点+监听来实现;
Curator客户端给我们提供了现成的分布式互斥锁来实现分布式锁;
1、创建分布式互斥锁
InterProcessMutex lock = new InterProcessMutex(zookeeperCuratorClient.client, "/storeLock");

2、获取分布式互斥锁
if (lock.acquire(10, TimeUnit.SECONDS))
3、释放分布式互斥锁
lock.release();


curator客户端分布式锁


public class lockService {
    @Autowired
    private ZookeeperDao dao;
    InterProcessMutex lock = new InterProcessMutex(new CuratorClient().getClient(),"/lock");

    public  void loclMethod(){

        try {
            //获取锁
            //一直等待锁
            //lock.acquire();
            //尝试获取锁,如果在指定时间获取锁,则返回true
            if (lock.acquire(1000, TimeUnit.SECONDS)){
                int check = dao.check();
                if (check > 0){
                    System.out.println("售出");
                    dao.des(--check);
                }else {
                    System.out.println("没有库存");
                }
            }

            //Thread.sleep(50);
        } catch (Exception e) {
            System.out.println(e);
        }finally {
            try {
                //释放锁
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }
}

zookeeper原生客户端分布式锁实现

/**
 * zookeeper原生客户端实现分布式锁
 * 基于有序临时节点
 */
public class ZookeeperLock {

    private ZooKeeper zooKeeper;
    private static final String ZK_ADDRESS = "127.0.0.1";
    //锁根节点
    private static String rootName = "/zklock";
    //锁节点
    private String lockName;
    //当前锁节点名称
    private String cuurentLockName;
    //zookeeper 超时时间
    private int sessionTimeout = 100000;

    private CountDownLatch countDownLatch = new CountDownLatch(1);
    public ZookeeperLock(String lockName) {
        //锁节点名称 初始化
        this.lockName = lockName;
        //zookeeper初始化
        try{
            zooKeeper  = new ZooKeeper(ZK_ADDRESS, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {

                    if (event.getState() == Event.KeeperState.SyncConnected){
                        //zookeeper连接上了
                        countDownLatch.countDown();
                    }
                }
            });
            //等待zookeeper连接成功
            countDownLatch.await();

            //创建一个根节点/lock
            if (zooKeeper.exists(rootName,false) == null){
                zooKeeper.create(rootName,"rootLock".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }



        }catch (Exception e){
            throw new RuntimeException(e);
        }
    }

    /**
     * zookeeper加锁
     * 获取分布式锁
     */
    public void lock(){
        try {
            String node = zooKeeper.create(rootName + "/" + lockName,
                    "lock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            //拿到根节点下的子节点  临时有序的子节点
            List<String> children = zooKeeper.getChildren(rootName, false);
            TreeSet<String> childNodes = new TreeSet<>();
            //childNodes.addAll(children);
            for (String c : children){
                childNodes.add(rootName+ "/" +c);
            }
            //排好顺序的节点,取出最小的
            String minNode = childNodes.first();
            //获取指定节点的前一个节点
            String preNode = childNodes.lower(node);

            //判断是不是最小节点
            if (node.equals(minNode)) {
                //当前进来的这个线程锁创建的节点就是分布式锁节点
                 cuurentLockName = node;
                 return;
            }
            //用于阻塞未获取锁线程
            CountDownLatch countDownLatch = new CountDownLatch(1);


            //其他线程没拿到锁
            if (null != preNode){
                //如果前一个节点不为null 则去监听她
                Stat exists = zooKeeper.exists(preNode, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if (event.getType() == Event.EventType.NodeDeleted){
                            //监听到前一个节点的删除事件,则到计数器减一
                            countDownLatch.countDown();
                        }
                    }
                });
                if (exists != null){
                    //等待获取锁
                    countDownLatch.await();
                    //当前前一个节点删除,到计数器减一之后,当前线程获取锁
                    cuurentLockName = node;
                }

            }

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    /**
     * zookeeper解锁
     * 主要做的就是删除当前锁节点删除
     */
    public void unlock(){

        try{
            if (StringUtils.isNotBlank(cuurentLockName)){
                zooKeeper.delete(cuurentLockName,-1);
            }
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    }

}

zkClient实现分布式锁

/**
 * zkClient客户端实现分布式锁
 * 基于有序临时节点
 */
public class ZkClientLock {

    private ZkClient zooKeeper;
    private static final String ZK_ADDRESS = "127.0.0.1";
    //锁根节点
    private static String rootName = "/zklock";
    //锁节点
    private String lockName;
    //当前锁节点名称
    private String cuurentLockName;
    //zookeeper 超时时间
    private int sessionTimeout = 100000;

    /**
     * 创建连接
     * @param lockName
     */
    public ZkClientLock(String lockName) {
        this.lockName = lockName;

        try{
            zooKeeper = new ZkClient(ZK_ADDRESS);

            //创建一个根节点/lock
            if (!zooKeeper.exists(rootName)){
                zooKeeper.create(rootName,"rootLock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    }

    /**
     * 加锁
     */
    public void lock(){

        String node = zooKeeper.create(rootName + "/" + lockName,
                "zkclient".getBytes(), CreateMode.PERSISTENT_SEQUENTIAL);

        //获取所有子节点
        List<String> subNodes = zooKeeper.getChildren(rootName);
        TreeSet<String> sortNodes = new TreeSet<>();
        for (String subNode : subNodes){
            sortNodes.add(rootName + "/" +subNode);
        }
        //获取最小节点
        String firstNode = sortNodes.first();
        //获取前一个节点
        String preNode = sortNodes.lower(node);

        if (firstNode.equals(node)){
            //则当前节点为锁节点
            cuurentLockName= node;
            return;
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        if (StringUtils.isNotBlank(preNode)){
            //说明非锁节点,需要等待,并监听前一个节点
            boolean exists = zooKeeper.exists(preNode);
            if (exists){
                //监听前一个节点
                zooKeeper.subscribeDataChanges(preNode, new IZkDataListener() {
                    @Override
                    public void handleDataChange(String dataPath, Object data) throws Exception {

                    }

                    @Override
                    public void handleDataDeleted(String dataPath) throws Exception {
                        countDownLatch.countDown();
                    }
                });

                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                cuurentLockName = node;
            }

        }

    }
    /**
     * zookeeper解锁
     * 主要做的就是删除当前锁节点删除
     */
    public void unlock(){

        try{
            if (StringUtils.isNotBlank(cuurentLockName)){
                zooKeeper.delete(cuurentLockName,-1);
            }
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    }
}

相关文章

  • 分布式锁

    为什么要用分布式锁 数据库乐观锁redis分布式锁zookeeper分布式锁 使用分布式锁的场景 实现分布式锁的方...

  • 什么是分布式锁?几种分布式锁分别是怎么实现的?

    一、什么是分布式锁: 1、什么是分布式锁: 分布式锁,即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资...

  • 4:Redis 分布式锁 (文末有项目连接)

    1:什么是缓存分布式锁 2:分布式锁的关键代码 3:业务代码使用分布式缓存锁 4:业务代码使用分布式缓存锁 5:测...

  • 锁(2)-- 分布式锁

    前言: 锁分3种:java锁、分布式锁、DB锁 分布式锁的几种实现方式 目前几乎很多大型网站及应用都是分布式部署...

  • java锁的概念

    参考文档探究分布式并发锁并发编程-锁的发展和主流分布式锁比较总结从构建分布式秒杀系统聊聊分布式锁探索并发编程(六)...

  • Redis实现分布式锁

    分布式下的分布式锁一般实现有三种: 基于数据库的乐观锁 基于redis的分布式锁 基于zookeeper的分布式锁...

  • 分布式锁

    为什么要用分布式锁? 分布式锁是悲观锁的实现; 如果采用乐观锁的方案就用不着分布式锁了。 能用乐观锁的地方尽量用乐...

  • 3.10:分布式锁

    本文将梳理微服务架构下,分布式锁的常用方案。整体包含以下三部分: 分布式锁的提出 分布式锁主流方案 分布式锁选择 ...

  • Redis实现分布式锁

    1. 分布式锁分类 数据库乐观锁 基于Redis的分布式锁 基于ZooKeeper的分布式锁 2. 组件依赖 po...

  • 大佬浅谈分布式锁

    redis 实现 redis 分布锁一、redis 实现分布式锁(可重入锁)redission 实现分布式锁1、对...

网友评论

      本文标题:分布式锁

      本文链接:https://www.haomeiwen.com/subject/hbakuctx.html