美文网首页
JAVA分布式锁介绍

JAVA分布式锁介绍

作者: tuacy | 来源:发表于2019-08-22 19:20 被阅读0次

           上一篇咱们讲到同一个进程,不同的线程之间我们可以通过synchronized、ReentrantLock、ReadWriteLock、Semaphore、CountDownLatch这些来实现锁机制。现在情况不一样了,咱们的程序高大上了,咱们可以部署多个服务端了,上了分布式系统了。在这个时候锁就要上升一个档次了,现在就叫分布式锁了。

           分布式锁就是在分布式系统中(多服务端,多进程)中保证数据的最终唯一性。当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。现在市面上常见的有三种实现分布式锁的方案:基于数据库做分布式锁、基于redis做分布式锁、基于zookeeper做分布式锁。

    一 基于数据库做分布式锁

    1.1 基于主键的唯一性做分布式锁(悲观锁)

           既然是通过数据库来做分布式锁,那咱们的先建一张表先,稍后我们在做解释。

    unionkeylock表(resource_name唯一索引)

    CREATE TABLE `unionkeylock` (
        `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
        `resource_name` varchar(128) NOT NULL DEFAULT '' COMMENT '锁定的资源',
        `node_info` varchar(128) DEFAULT NULL COMMENT '机器信息',
        `count` int(11) NOT NULL DEFAULT '0' COMMENT '锁的次数,统计可重入锁',
        `des` varchar(128) DEFAULT null comment '额外的描述信息',
        `update_time` timestamp NULL DEFAULT NULL COMMENT '更新时间',
        `create_time` timestamp null default null comment '创建时间',
        primary key (`id`),
        unique key (`resource_name`)
    ) engine=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库分布式锁表-主键唯一';;
    

           unionkeylock表,我们对resource_name做了唯一性约束,如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,在这种情况下我们认为操作成功的那个线程获得了该方法的锁。

           我们干脆一点,直接上代码。

    public class DbDistributedUnionKeyLock extends AbstractDbDistributedLock {
        /**
         * 因为数据库里面限制了最大长度了
         */
        private static final int NODE_INFO_MAX_LENGTH = 120;
    
        /**
         * 计算机唯一标识 为可重入锁做准备(一定要确保每个机器的值不同)
         */
        private static final String COMPUTER_UUID = ComputerIdentifierUtil.getComputerIdentifier();
        /**
         * 线程变量
         */
        private ThreadLocal<String> threadFlag = new ThreadLocal<>();
        /**
         * 操作数据库的dao
         */
        private IUnionKeyLockDao unionKeyLockDao;
    
        public DbDistributedUnionKeyLock(IUnionKeyLockDao unionKeyLockDao) {
            this.unionKeyLockDao = unionKeyLockDao;
        }
    
        /**
         * 加锁
         */
        @Override
        public boolean lock(String key, int retryTimes, long sleepMillis) {
            boolean lockSuccess = false;
            // 机器码+线程uuid -- 唯一标识(保证同一台电脑的同一个线程是一样的)
            if (threadFlag.get() == null || threadFlag.get().isEmpty()) {
                String nodeTemp = COMPUTER_UUID + "#" + String.format("%08x", UUID.randomUUID().hashCode()) + "#" + Thread.currentThread().getId();
                if (nodeTemp.length() > NODE_INFO_MAX_LENGTH) {
                    nodeTemp = nodeTemp.substring(0, NODE_INFO_MAX_LENGTH);
                }
                threadFlag.set(nodeTemp);
            }
            int retry = 0;
            while (!lockSuccess && retry < retryTimes) {
                try {
                    UnionKeyLock lockInfo = unionKeyLockDao.getLockInfoByResourceName(key);
                    if (lockInfo == null) {
                        // 当前资源没有被加锁
                        lockSuccess = unionKeyLockDao.insertLockInfo(key, threadFlag.get());
                    } else {
                        // 当前资源已经被加锁,这个时候需要考虑是否可重入
                        // 可重入锁
                        if (lockInfo.getNodeInfo() != null && lockInfo.getNodeInfo().equals(threadFlag.get())) {
                            // 重入
                            lockSuccess = unionKeyLockDao.reentrantLock(lockInfo.getResourceName(), threadFlag.get(), lockInfo.getCount());
                        } else {
                            // 如果出现这种情况代表数据已经有问题了
                            lockSuccess = false;
                        }
                    }
                    if (!lockSuccess) {
                        // 等待200毫秒
                        Uninterruptibles.sleepUninterruptibly(sleepMillis, TimeUnit.MILLISECONDS);
                    }
                } catch (Exception e) {
                    // 等待指定时间
                    Uninterruptibles.sleepUninterruptibly(sleepMillis, TimeUnit.MILLISECONDS);
                } finally {
                    retry++;
                }
            }
            return lockSuccess;
        }
    
    
        /**
         * 释放
         */
        @Override
        public void unlock(String key) {
            if (threadFlag.get() == null || threadFlag.get().isEmpty()) {
                return;
            }
            boolean unlockSuccess = false;
            while (!unlockSuccess) {
                try {
                    UnionKeyLock lockInfo = unionKeyLockDao.getLockInfoByResourceName(key);
                    if (lockInfo == null) {
                        return;
                    }
                    if (lockInfo.getNodeInfo() == null) {
                        unionKeyLockDao.deleteLockInfo(lockInfo.getResourceName(), lockInfo.getNodeInfo());
                        return;
                    }
                    if (!lockInfo.getNodeInfo().equals(threadFlag.get())) {
                        return;
                    }
                    // 可重入锁
                    if (lockInfo.getCount() == 1) {
                        unionKeyLockDao.deleteLockInfo(lockInfo.getResourceName(), threadFlag.get());
                        unlockSuccess = true;
                    } else {
                        if (lockInfo.getNodeInfo() != null && lockInfo.getNodeInfo().equals(threadFlag.get())) {
                            // 重入
                            unlockSuccess = unionKeyLockDao.reentrantUnLock(lockInfo.getResourceName(), threadFlag.get(), lockInfo.getCount());
                        } else {
                            // 如果出现这种情况,代表这时候数据已经有问题了
                            unionKeyLockDao.deleteLockInfo(lockInfo.getResourceName(), lockInfo.getNodeInfo());
                            unlockSuccess = true;
                        }
                    }
                    if (!unlockSuccess) {
                        // 等待200毫秒
                        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
                    }
                } catch (Exception e) {
    //                e.printStackTrace();
                    // 等待200毫秒
                    Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
                }
    
            }
    
        }
    
    }
    

    因为悲观锁大多数情况下依靠数据库的锁机制实现,以保证操作最大程度的独占性。如果加锁的时间过长,其他用户长时间无法访问,影响了程序的并发访问性,同时这样对数据库性能开销影响也很大,特别是对长事务而言,这样的开销往往无法承受。所以与悲观锁相对的,我们有了乐观锁。

    1.2 基于版本号字段做分布式锁(乐观锁)

           基于表字段版本号做分布式锁(乐观锁):很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,当读取数据时,将version字段的值一同读出,数据每更新一次,对此version值加一。当我们提交更新的时候,需要数据库表对应记录的当前版本信息与第一次取出来的version值进行比对,如果数据库表当前版本号与第一次取出来的version值相等,则予以更新,否则重试。乐观锁适用于读多写少的应用场景,这样可以提高吞吐量。

           通过版本号做分布式锁我们不太好去抽象出lock()、unlock()这样的函数了。得把他们写到业务逻辑里面去。下面我们以一个简单的例子来说明。比如有这么个场景:我们模拟一个存钱的场景,我们可以在各个地方存钱(分布式)。我们创建一个表,字段value就表示我们的余额。

    CREATE TABLE `optimisticlock` (
        `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
        `resource_name` varchar(128) NOT NULL DEFAULT '' COMMENT '锁定的资源',
        `value` int NOT NULL COMMENT '锁定的资源对应的值',
        `version` int NOT NULL COMMENT '版本信息',
        PRIMARY KEY (`id`),
        UNIQUE KEY `uiq_idx_resource` (`resource_name`) 
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库分布式锁表-乐观锁';
    

           直接上代码。(可以在https://github.com/tuacy/java-study里面找到更加完整的代码,注意哦代码在distributedlock模块下面哦)

    public class SqlOptimisticLock {
    
        /**
         * 因为数据库里面限制了最大长度了
         */
        private static final int LOCK_LOOP_TIME_WAIT = 200;
        /**
         * lock对应的资源名字
         */
        private final String resourceName;
        /**
         * 操作数据库的dao
         */
        private IOptimisticLockDao optimisticLockDao;
    
        /**
         * 用于注入无法自动装配的Bean,处理@Autowired无效的问题
         */
        private void inject() {
            if (null == this.optimisticLockDao) {
                this.optimisticLockDao = ResourceApplicationContext.getApplicationContext().getBean(OptimisticLockDaoImpl.class);
            }
        }
    
        /**
         * 构造函数
         *
         * @param resourceName lock对应资源名字
         */
        public SqlOptimisticLock(String resourceName) {
            this.resourceName = resourceName;
            inject();
        }
    
        /**
         * 我们简单的模拟一个存钱的操作
         *
         * @param money 存入金额
         */
        public void depositMoney(int money) {
            boolean success = false;
            while (!success) {
                try {
                    // 第一步:版本号信息很重要,从表里面取出数据(包括版本号)
                    OptimisticLock dbItemInfo = optimisticLockDao.selectLockResourceInfo(resourceName);
                    // 第二步:todo:做相应的逻辑处理
                    Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
                    // 第三步:把数据存到数据库里面去
                    if (dbItemInfo == null) {
                        success = optimisticLockDao.insertLockResourceValue(resourceName, money);
                    } else {
                        // 更新的时候会去做下版本的判断,相同才更新,不相同不更新
                        success = optimisticLockDao.updateLockResourceValue(resourceName, dbItemInfo.getVersion(), dbItemInfo.getValue() + money);
                    }
                    if (!success) {
                        Uninterruptibles.sleepUninterruptibly(LOCK_LOOP_TIME_WAIT, TimeUnit.MILLISECONDS);
                    }
                } catch (Exception e) {
    //                e.printStackTrace();
                    success = false;
                    Uninterruptibles.sleepUninterruptibly(LOCK_LOOP_TIME_WAIT, TimeUnit.MILLISECONDS);
    
                }
    
            }
        }
    
    }
    

    1.3 基于数据库排他锁做分布式锁(悲观锁)

           借助数据中自带的排他锁来实现分布式的锁。在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。特别不建议用这种方式来实现,对数据库影响太大了。

           正对这种情况,我们也写一个简单的代码,还是先创建表,建表语句如下。

    CREATE TABLE `exclusivelock` (
        `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
        `resource_name` varchar(128) NOT NULL DEFAULT '' COMMENT '锁定的资源',
        PRIMARY KEY (`id`),
        UNIQUE KEY `uiq_idx_resource` (`resource_name`) 
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库分布式锁表-排他锁';
    

           加锁,释放锁代码实现。(可以在https://github.com/tuacy/java-study里面找到更加完整的代码,注意哦代码在distributedlock模块下面哦)

    public class SqlExclusiveLock implements IDistributedLock {
    
        /**
         * 因为数据库里面限制了最大长度了
         */
        private static final int LOCK_LOOP_TIME_WAIT = 200;
    
        private JdbcTemplate jdbcTemplate;
        /**
         * lock对应的资源名字
         */
        private final String resourceName;
    
        private Connection sqlConnection;
    
        /**
         * 用于注入无法自动装配的Bean,处理@Autowired无效的问题
         */
        private void inject() {
            if (null == this.jdbcTemplate) {
                this.jdbcTemplate = ResourceApplicationContext.getApplicationContext().getBean(JdbcTemplate.class);
            }
        }
    
        /**
         * 构造函数
         *
         * @param resourceName lock对应资源名字
         */
        public SqlExclusiveLock(String resourceName) {
            this.resourceName = resourceName;
            inject();
        }
    
        @Override
        public void lock() {
            if (jdbcTemplate.getDataSource() == null) {
                throw new IllegalArgumentException("数据库配置失败!");
            }
            boolean success = false;
            while (!success) {
                PreparedStatement preparedStatement = null;
                try {
                    sqlConnection = jdbcTemplate.getDataSource().getConnection();
                    sqlConnection.setAutoCommit(false);//设置手动提交
                    String prepareSql = "select * from exclusivelock where resource_name = ? for update";
                    preparedStatement = sqlConnection.prepareStatement(prepareSql);
                    preparedStatement.setString(1, this.resourceName);
                    success = preparedStatement.executeQuery() != null;
                    if (!success) {
                        Uninterruptibles.sleepUninterruptibly(LOCK_LOOP_TIME_WAIT, TimeUnit.MILLISECONDS);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    success = false;
                    Uninterruptibles.sleepUninterruptibly(LOCK_LOOP_TIME_WAIT, TimeUnit.MILLISECONDS);
                } finally {
                    if (preparedStatement != null) {
                        try {
                            preparedStatement.close();
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    
        @Override
        public void unlock() {
            if (sqlConnection != null) {
                try {
                    sqlConnection.commit();
                } catch (Exception e) {
                    // ignore
                    e.printStackTrace();
                }
            }
        }
    
        @Override
        public boolean tryLock() {
            return false;
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) {
            return false;
        }
    }
    

    二 基于Redis做分布式锁

           基于Redis做分布式锁。Redis里面的记录可以设置为当记录不存在的时候才可以插入进入。记录存在则插入不进去。Redis分布式锁就是用这个做的。当获取锁的时候则在Redis里面插入一条这样的记录。释放锁的时候就删除这条记录。如果锁没有释放(记录没有被删除)其他的让你再去获取锁的时候(插入记录)是不会成功的。而且为了防止僵尸我们可以给这条记录设置过期时间。

           讲什么都没带来来的实在点。我们就直接贴代码了(可以在https://github.com/tuacy/java-study里面找到更加完整的代码,注意哦代码在distributedlock模块下面哦)

    public class RedisDistributedLockImpl extends AbstractDistributedLock {
    
        private final Logger logger = LoggerFactory.getLogger(RedisDistributedLockImpl.class);
    
        private RedisTemplate<Object, Object> redisTemplate;
    
        private ThreadLocal<String> lockFlag = new ThreadLocal<>();
    
        /**
         * Lua
         */
        private static final String UNLOCK_LUA;
    
        static {
            StringBuilder sb = new StringBuilder();
            sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
            sb.append("then ");
            sb.append("    return redis.call(\"del\",KEYS[1]) ");
            sb.append("else ");
            sb.append("    return 0 ");
            sb.append("end ");
            UNLOCK_LUA = sb.toString();
        }
    
        /**
         * 构造函数
         */
        public RedisDistributedLockImpl(RedisTemplate<Object, Object> redisTemplate) {
            super();
            this.redisTemplate = redisTemplate;
        }
    
        /**
         * 加锁
         */
        @Override
        public boolean lock(String key, long expire, int retryTimes, long sleepMillis) {
            boolean result = setRedis(key, expire);
            // 如果获取锁失败,按照传入的重试次数进行重试
            while ((!result) && retryTimes-- > 0) {
                try {
                    logger.debug("lock failed, retrying..." + retryTimes);
                    Thread.sleep(sleepMillis);
                } catch (InterruptedException e) {
                    return false;
                }
                result = setRedis(key, expire);
            }
            return result;
        }
    
        private boolean setRedis(String key, long expire) {
            try {
                String result = redisTemplate.execute((RedisCallback<String>) connection -> {
                    JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                    // value生成
                    String uuid = UUID.randomUUID().toString();
                    lockFlag.set(uuid);
                    /**
                     * 存储数据到缓存中,并指定过期时间和当Key存在时是否覆盖。
                     *
                     * @param key 键
                     * @param key 键值
                     * @param nxxx
                     *            nxxx的值只能取NX或者XX,如果是NX的时候,则只有当key不存在是才进行set,如果是XX,则只有当key已经存在时才进行set
                     *
                     * @param expx expx的值只能取EX或者PX,代表数据过期时间的单位,EX代表秒,PX代表毫秒。
                     * @param time 过期时间,单位是expx所代表的单位。
                     * @return 成功返回“ok”,失败则返回 null。
                     */
                    return commands.set(key, uuid, "NX", "PX", expire);
                });
                return !StringUtils.isEmpty(result);
            } catch (Exception e) {
                logger.error("set redis occured an exception", e);
            }
            return false;
        }
    
        /**
         * 释放锁
         */
        @Override
        public boolean unlock(String key) {
            // 释放锁的时候,有可能因为持锁之后方法执行时间大于锁的有效期,此时有可能已经被另外一个线程持有锁,所以不能直接删除
            try {
                List<String> keys = new ArrayList<>();
                keys.add(key);
                List<String> args = new ArrayList<>();
                args.add(lockFlag.get());
    
                // 使用lua脚本删除redis中匹配value的key,可以避免由于方法执行时间过长而redis锁自动过期失效的时候误删其他线程的锁
                // spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,所以只能拿到原redis的connection来执行脚本
    
                Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
                    Object nativeConnection = connection.getNativeConnection();
                    // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
                    // 集群模式
                    if (nativeConnection instanceof JedisCluster) {
                        // Redis Eval 命令使用 Lua 解释器执行脚本
                        return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    }
                    // 单机模式
                    else if (nativeConnection instanceof Jedis) {
                        // Redis Eval 命令使用 Lua 解释器执行脚本
                        return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    }
                    return 0L;
                });
    
                return result != null && result > 0;
            } catch (Exception e) {
                logger.error("release lock occured an exception", e);
            }
            return false;
        }
    }
    

    三 基于ZooKeeper做分布式锁

           Zookeeper实现分布式锁的流程,假设锁空间的根节点为/zklock:

    • 客户端连接zookeeper,并在/zklock下创建临时的且有序的子节点。
      第一个客户端对应的子节点为:/zklock/test_lock_0000000000,第二个为:/zklock/test_lock_0000000001。以此类推。
    • 客户端获取/zklock下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁,否则监听/zklock的子节点变更消息,获得子节点变更通知后重复此步骤直至获得锁;
    • 执行业务代码。
    • 完成业务流程后,删除对应的子节点并释放锁。

           这里我不准备用zookeeper的原生api来实现这一套流程来实现分布式锁。因为我找到更简单的方式,这里我准备用Curator(Curator是Netflix公司开源的一套zookeeper客户端框架)来实现分布式锁。因为Curator已经帮我们实现了分布式锁InterProcessMutex的实现。关于Curator的使用大家有兴趣可以参考下ZooKeeper客户端Curator使用

           直接代码实现,(可以在https://github.com/tuacy/java-study里面找到更加完整的代码,注意哦代码在distributedlock模块下面哦)

    public class ZookeeperDistributedLockImpl implements IZookeeperDistributedLock {
    
        /**
         * curator给封住好的一个分布式锁对象和ReentrantLock类似
         */
        private CuratorFramework curatorFramework;
    
        public ZookeeperDistributedLockImpl(ZkClient zkClient) {
            curatorFramework = zkClient.getClient();
        }
    
        @Override
        public boolean lock(String key) {
            try {
                InterProcessMutex  interProcessMutex = new InterProcessMutex(curatorFramework, key);
                interProcessMutex.acquire();
                return true;
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
        }
    
        @Override
        public boolean lock(String key, long time, TimeUnit unit) {
            try {
                InterProcessMutex  interProcessMutex = new InterProcessMutex(curatorFramework, key);
                interProcessMutex.acquire(time, unit);
                return true;
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
        }
    
        @Override
        public void unlock(String key) {
            try {
                InterProcessMutex  interProcessMutex = new InterProcessMutex(curatorFramework, key);
                interProcessMutex.release();
            } catch (Exception e) {
                e.printStackTrace();
                // ignore
            }
        }
    
    
    

           分布式锁相关的内容就介绍到这里,每种分布式锁的时候我们都写了对应的实现代码。大家可以在https://github.com/tuacy/java-study里面找到更加完整的代码,在distributedlock模块下面。而且每种锁的实现都用想用的AOP。

    相关文章

      网友评论

          本文标题:JAVA分布式锁介绍

          本文链接:https://www.haomeiwen.com/subject/tbposctx.html