为什么要用分布式锁
- 数据库乐观锁
- redis分布式锁
- zookeeper分布式锁
使用分布式锁的场景
实现分布式锁的方案
必要条件
1.互斥。
同一时刻,多台服务器的多个请求只允许一个请求加锁成功
2.避免死锁。
加锁的客户端在持有锁期间由于宕机网络延迟等原因没有主动解锁,也能保证锁会释放,不应想其他请求获取锁成功
3.解铃还须系铃人
加锁和解锁的客户端要保持一致。
数据库乐观锁实现或字段唯一性约束
此处代码省略
redis分布式锁实现
/**
* 尝试获取锁
* @param key
* @param value 为了满足解铃还须系铃人,此处传入requestId,标识哪个客户端加的锁。可以用 UUID.randomUUID().toString()生成
* @param expireTime 过期时间 避免锁持有者后续发生崩溃而未解锁 造成死锁
* @param unit 过期时间单位
* @return 是否加锁成功
*/
public static boolean tryLock(String key, String value, Long expireTime, TimeUnit unit) {
return redisService.setIfAbsent(key, value, expireTime, unit);
}
public<K, V> boolean setIfAbsent(final K key,V value,Long expireTime,TimeUnit unit) {
boolean result = false;
try {
ValueOperations<K, V> operations = redisTemplate.opsForValue();
//原子操作
result = operations.setIfAbsent(key,value,expireTime,unit);
} catch (Exception e) {
logger.error("setIfAbsent error: key {}, value {},expireTime {}",key,value,expireTime,e);
}
return result;
}
执行上面的方法,如果当前锁(key)不能存在,那么就进行加锁操作同时对锁设置有效期返回true。 value是加锁客户端的标识。如果当前锁(key)已存在,不做任何操作返回false。
注意:加锁和设置时间要是一条命令,保证原子性。不能两条命令分开做。如果加锁后客户端突然崩溃,导致锁没有设置过期时间,将会发生死锁
/**
* 解锁
* @param key
* @param value 此处传入requestId,请求标识
* @return 是否释放锁成功
*/
public static boolean unLock(String key,String value) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
return redisService.execute(key,value,script);
}
Lua脚本作用:首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。
zookepeer分布式锁实现
基于临时有序节点的特性实现和wahcher机制实现。 001 002 003
为了避免惊群效应,每个节点只监听比自己大的前一个节点即可。否则会带来巨大的性能开销。
//就不用zkCliend费劲去写了,curator 可以非常方便的使用zk分布式锁
@Override
public void lock() throws DistributedLockException {
try{
interProcessMutex.acquire();
} catch (Exception e){
throw new DistributedLockException("加锁异常: ", e);
}
}
@Override
public boolean tryLock(Long time, TimeUnit timeUnit) throws DistributedLockException {
try {
return interProcessMutex.acquire(time, timeUnit);
} catch (Exception e) {
throw new DistributedLockException("加锁异常: ", e);
}
}
@Override
public void unlock() throws DistributedLockException {
try {
interProcessMutex.release();
} catch (Exception e) {
throw new DistributedLockException("释放锁异常: ", e);
}
}
三者比较:
数据库:
1.这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
2.这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
3.这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
4.这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
5.数据库是宝贵的系统资源,考虑是否会影响 正常业务的使用。
redis:
1.失效时间我设置多长时间为好?如何设置的失效时间太短,方法没等执行完,锁就自动释放了,那么就会产生并发问题。如果设置的时间太长,其他获取锁的线程就可能要平白的多等一段时间。这个问题使用数据库实现分布式锁同样存在
2.非阻塞?while重复执行。
3.非可重入?在一个线程获取到锁之后,把当前主机信息和线程信息保存起来,下次再获取之前先检查自己是不是当前锁的拥有者。
4.可以使用缓存来代替数据库来实现分布式锁,这个可以提供更好的性能,同时,很多缓存服务都是集群部署的,可以避免单点问题。并且很多缓存服务都提供了可以用来实现分布式锁的方法,比如Tair的put方法,redis的setnx方法等。并且,这些缓存服务也都提供了对数据的过期自动删除的支持,可以直接设置超时时间来控制锁的释放。
性能好,实现起来较为方便。
通过超时时间来控制锁的失效时间并不是十分的靠谱。
zk:
1.锁无法释放?使用Zookeeper可以有效的解决锁无法释放的问题,因为在创建锁的时候,客户端会在ZK中创建一个临时节点,一旦客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点就会自动删除掉。其他客户端就可以再次获得锁。
2.非阻塞锁?使用Zookeeper可以实现阻塞的锁,客户端可以通过在ZK中创建顺序节点,并且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是,那么自己就获取到锁,便可以执行业务逻辑了。
3.不可重入?使用Zookeeper也可以有效的解决不可重入的问题,客户端在创建节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的时候和当前最小的节点中的数据比对一下就可以了。如果和自己的信息一样,那么自己直接获取到锁,如果不一样就再创建一个临时的顺序节点,参与排队。
4.单点问题?使用Zookeeper可以有效的解决单点问题,ZK是集群部署的,只要集群中有半数以上的机器存活,就可以对外提供服务。
Curator提供的InterProcessMutex是分布式锁的实现。acquire方法用户获取锁,release方法用于释放锁。
5.使用ZK实现的分布式锁好像完全符合了本文开头我们对一个分布式锁的所有期望。但是,其实并不是,Zookeeper实现的分布式锁其实存在一个缺点,那就是性能上可能并没有缓存服务那么高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。
6.其实,使用Zookeeper也有可能带来并发问题,只是并不常见而已。考虑这样的情况,由于网络抖动,客户端可ZK集群的session连接断了,那么zk以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁了。就可能产生并发问题。这个问题不常见是因为zk有重试机制,一旦zk集群检测不到客户端的心跳,就会重试,Curator客户端支持多种重试策略。多次重试之后还不行的话才会删除临时节点。(所以,选择一个合适的重试策略也比较重要,要在锁的粒度和并发之间找一个平衡。)
总结
上面几种方式,哪种方式都无法做到完美。就像CAP一样,在复杂性、可靠性、性能等方面无法同时满足,所以,根据不同的应用场景选择最适合自己的才是王道。
从理解的难易程度角度(从低到高)
数据库 > 缓存 > Zookeeper
从实现的复杂性角度(从低到高)
Zookeeper >= 缓存 > 数据库
从性能角度(从高到低)
缓存 > Zookeeper >= 数据库
从可靠性角度(从高到低)
Zookeeper > 缓存 > 数据库
坑:
如图所示,curator 加锁成功后leases下回创建临时节点,

在释放锁后,此目录并不会删掉。

因此需要一个定时任务,定时清理目录。
public class LockBackGroundConf {
/** 执行频率, 默认一小时一次, 单位秒 */
private Long frequency = 60*60L;
/** 删除几天前的数据, 默认1天前的数据, 单位秒 */
private Long beforeTime = 24*60*60L;
}
public class LockBackGroundThread extends Thread{
private Logger logger = LoggerFactory.getLogger(getClass());
CuratorFramework client;
protected LockBackGroundThread(CuratorFramework client){
this.client = client;
this.setDaemon(true);
this.setName("ZkMutexDistributedLock---background");
}
@Override
public synchronized void run() {
super.run();
try {
while (true){
//TODO 后期可以通过配置中心 配置
LockBackGroundConf conf = new LockBackGroundConf();
deleteInvalidNode(conf);
// 默认一小时执行一次(配置中心可配)
Thread.currentThread().wait(conf.getFrequency()*1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void deleteInvalidNode(LockBackGroundConf conf) throws Exception{
String projectDir = ZkMutexDistributedLockFactory.lockPath + ZkMutexDistributedLockFactory.projectName;
Stat exitDir = client.checkExists().forPath(projectDir.substring(0, projectDir.length()-1));
if(exitDir == null){
logger.error("根目录尚未创建,本次清理结束--" + projectDir);
return;
}
List<String> paths = client.getChildren().forPath(projectDir.substring(0, projectDir.length()-1));
Date date = new Date();
paths.forEach(currPath -> {
try{
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(projectDir + currPath);
// 默认删除一天前无效的数据。 子节点为0,说明当前节点无效
if(stat.getMtime()<(date.getTime() - (conf.getBeforeTime()*1000)) && stat.getNumChildren() == 0){
// 只删除空目录
client.delete().forPath(projectDir + currPath);
logger.info("删除路径: " + projectDir + currPath);
}
}catch (Exception e){
logger.error("删除节点失败: ", e);
}
});
}
初始化zk客户端的时候,启动后台线程清理空目录。
private static synchronized void init() {
if(client==null){
String IPAndPort = PropertiesReader.getProperties("zkConfig").getProperty("lockServers");
String projectName = ProjectUtils.PROJECT_NAME.toLowerCase();
if(StringUtils.isEmpty(IPAndPort) || StringUtils.isEmpty(projectName)){
logger.error("zk锁启动失败缺少配置--IP和端口号/项目名");
throw new RuntimeException("zk锁启动异常--缺少配置--IP和端口号/项目名");
}
ZkMutexDistributedLockFatory.projectName = projectName+"/";
client = CuratorFrameworkFactory.builder().connectString(IPAndPort).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
// 启动后台线程
LockBackGroundThread backGroundThread = new LockBackGroundThread(client);
backGroundThread.start();
}
}
其他
分布式锁的各种实现见仁见智,在适当的场景选择合适的实现即可。在开发中,我们可以讲分布式锁的实现封装在公共模块,对专注于业务开发的 程序员大兄弟们 屏蔽 底层实现的差异,让他们用最简单的方式,就可以让某一方法实现分布式锁的效果,没错正是自定义注解+AOP的形式。
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CustomerLock {
/**
* lock key
* eg #arg.id
*
* @return
*/
String lockKey();
/** 后缀
* @return
*/
String lockSuffix() default "";
/** 前缀
* @return
*/
String lockPrefix() default "";
/** 分割符
* @return
*/
String separator() default "#";
/** 实现类对应的名称 默认使用redis
* @return
*/
String lockType() default "";
/**
* 是否使用尝试锁。
*/
boolean tryLock() default false;
/**
* 最长等待时间。
* 该字段只有当tryLock()返回true才有效。
*/
int waitTime() default 0;
/**
* 锁超时时间。
* 超时时间过后,锁自动释放。
* 建议:
* 尽量缩简需要加锁的逻辑。
*/
int leaseTime() default 30;
TimeUnit timeUnit() default TimeUnit.SECONDS;
}
@Component
@Aspect
@EnableAspectJAutoProxy
public class DistributedLockAspect {
public static final Logger logger = LoggerFactory.getLogger(DistributedLockAspect.class);
@Pointcut("@annotation(com.gpmall.commons.lock.annotation.CustomerLock)")
public void distributedLockPointcut() {
}
@Around("distributedLockPointcut()")
public Object doAround(ProceedingJoinPoint pjp) throws Throwable {
//组成key
//切点所在的类
Method method = ((MethodSignature) pjp.getSignature()).getMethod();
final String lockKey = getLockKey(method, pjp.getArgs());
return startLock(lockKey, pjp, method);
}
private Object startLock(final String lockKey, ProceedingJoinPoint pjp, Method method) throws Throwable {
CustomerLock annotation = method.getAnnotation(CustomerLock.class);
boolean tryLock = annotation.tryLock();
if (tryLock) {
return tryLock(pjp, annotation, lockKey);
} else {
return lock(pjp, annotation, lockKey);
}
}
private Object lock(ProceedingJoinPoint pjp, CustomerLock annotation, String lockKey) throws Throwable {
int leaseTime = annotation.leaseTime();
TimeUnit timeUnit = annotation.timeUnit();
String type = annotation.lockType();
DistributedLock distributedLock = getByType(type);
try {
distributedLock.lock(lockKey, timeUnit, leaseTime);
return pjp.proceed();
} finally {
distributedLock.unlock(lockKey);
}
}
private Object tryLock(ProceedingJoinPoint pjp, CustomerLock customerLock, String lockKey) throws Throwable {
int leaseTime = customerLock.leaseTime();
int waitTime = customerLock.waitTime();
TimeUnit timeUnit = customerLock.timeUnit();
String type = customerLock.lockType();
DistributedLock distributedLock = getByType(type);
try {
if (waitTime == 0) {
if (distributedLock.tryLock(lockKey)) {
return pjp.proceed();
}
} else {
distributedLock.tryLock(lockKey, timeUnit, waitTime, leaseTime);
return pjp.proceed();
}
} finally {
distributedLock.unlock(lockKey);
}
return null;
}
/**
* 生成分布式锁key
*
* @param method
* @param args
* @return
*/
public String getLockKey(Method method, Object[] args) {
Objects.requireNonNull(method);
CustomerLock annotation = method.getAnnotation(CustomerLock.class);
String lockKey = parseKey(annotation.lockKey(), method, args),
separator = annotation.separator(),
prefix = annotation.lockPrefix(),
suffix = annotation.lockSuffix();
if (StringUtils.isBlank(lockKey)) {
throw new IllegalArgumentException(String.format("lock [%s] is error", lockKey));
}
StringBuilder keyGenerator = new StringBuilder();
if (StringUtils.isNotBlank(prefix)) {
keyGenerator.append(prefix).append(separator);
}
keyGenerator.append(lockKey.trim());
if (StringUtils.isNotBlank(suffix)) {
keyGenerator.append(separator).append(suffix);
}
lockKey = keyGenerator.toString().trim();
// key不允许为空
if (StringUtils.isBlank(lockKey)) {
throw new IllegalArgumentException("Can't get or generate lock accurately!");
}
logger.info("generator lock_key [" + lockKey + "]");
return lockKey;
}
/**
* 获取缓存的key
* key 定义在注解上,支持SPEL表达式
*/
private String parseKey(String key, Method method, Object[] args) {
//获取被拦截方法参数名列表(使用Spring支持类库)
LocalVariableTableParameterNameDiscoverer u =
new LocalVariableTableParameterNameDiscoverer();
String[] paraNameArr = u.getParameterNames(method);
//使用SPEL进行key的解析
ExpressionParser parser = new SpelExpressionParser();
//SPEL上下文
StandardEvaluationContext context = new StandardEvaluationContext();
//把方法参数放入SPEL上下文中
for (int i = 0; i < paraNameArr.length; i++) {
context.setVariable(paraNameArr[i], args[i]);
}
return parser.parseExpression(key).getValue(context, String.class);
}
//通过 dubbo-SPI 的设计 选择分布式锁 实现
private DistributedLock getByType(String type) {
return (DistributedLock) ExtensionLoader.getExtensionLoader(DistributedLock.class).getExtension(type);
}
通过 dubbo-SPI 的设计 选择分布式锁 实现,默认redis 了解dubbo-SPI
@LockSpi("redis")
public interface DistributedLock {
void lock(String key) throws DistributedLockException;
boolean tryLock(String key) throws DistributedLockException;
void lock(String lockKey, TimeUnit unit, int timeout) throws DistributedLockException;
/**
* 尝试获取锁
*
* @param lockKey
* @param unit 时间单位
* @param waitTime 最多等待时间
* @param leaseTime 上锁后自动释放锁时间
* @return
*/
boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) throws DistributedLockException;
/**
* 释放锁
* @param lockKey
* @throws DistributedLockException
*/
void unlock(String lockKey) throws DistributedLockException;
}
使用
@Override
@CustomerLock(lockKey = "#request.tradeNo",lockType = "zookeeper", tryLock = true)
public PaymentResponse execPay(PaymentRequest request) {
PaymentResponse paymentResponse=new PaymentResponse();
try {
……
}
}
开发过程中 通过spi的方式 实现分布式锁策略,当业务小伙伴 不满足于现有的策略 想要拓展分布式锁时,只需要实现DistributedLock 接口 然后在 META-INF/lock 下建立com.gpmall.commons.lock.DistributedLock文件,配置一下即可,例如:
image.png
工具包开发者都是通过spi搞的, 业务开发者也当然也可以通过spi去拓展, 就像dubbo官网文档上所说,平等对待三方包一样。大家都一样,不搞特殊~~
网友评论