美文网首页
分布式锁的理论与实现

分布式锁的理论与实现

作者: CJ21 | 来源:发表于2021-04-29 00:03 被阅读0次

前言

zookeeper实现优于redis的原因:

    1. redis集群主节点切换时可能导致锁失效,Zookeeper内部的选举机制保证主节点切换时锁不会失效。
    1. 为了防止主机宕机或网络断开之后的死锁,Redis没有ZK那种天然的实现方式,只能依赖设置超时时间来规避。
    1. zk可以使用临时顺序节点来上锁,避免惊群效应。

Redisson的特性:

  • 互斥性和自解锁:使用hset (UUID+threadId, 1) 保证;
  • 不会发生死锁:设置定时事件;
  • 锁不能自己失效:通过看门狗功能,使用定时器判断锁失效时间,并延长事件;
  • 容错性:使用红锁RedissonRedLock解决。

Zookeeper的特性:

  • 互斥性、自解锁和不发生死锁:最小的临时顺序节点拿到锁;
  • 容错性:zk的选举机制;
  • 锁不能自己失效:只有主动删除和结束会话才会删除节点。

一、基于Redis的分布式锁

1.1 为什么要使用分布式锁

先看两个常见的例子:

例1:某服务记录关键数据X,当前值为100。A请求需要将X增加200;同时,B请求需要将X减100。

在理想的情况下,A先读取到X=100,然后X增加200,最后写入X=300。B请求接着从读取X=300,减少100,最后写入X=200。

然而在真实情况下,如果不做任何处理,则可能会出现:A和B同时读取到X=100;A写入之前B读取到X;B比A先写入等情况。

例2:某服务提供一组任务,A请求随机从任务组中获取一个任务;B请求随机从任务组中获取一个任务。

在理想的情况下,A从任务组中挑选一个任务,任务组删除该任务,B从剩下的的任务中再挑一个,任务组删除该任务。

同样的,在真实情况下,如果不做任何处理,可能会出现A和B挑中了同一个任务的情况。

1.2 需要思考的问题

1.2.1 基础问题

问题1:锁状态判断原子性无法保证

从读取锁的状态,到判断该状态是否为被锁,需要经历两步操作。如果不能保证这两步的原子性,就可能导致不止一个请求获取到了锁,这显然是不行的。因此,我们需要保证锁状态判断的原子性。

问题2:网络断开或主机宕机,锁状态无法清除

假设在主机已经获取到锁的情况下,突然出现了网络断开或者主机宕机,如果不做任何处理该锁将仍然处于被锁定的状态。那么之后所有的请求都无法再成功抢占到这个锁。因此,我们需要在持有锁的主机宕机或者网络断开的时候,及时的释放掉这把锁。

问题3:无法保证释放的是自己上锁的那把锁

在解决了问题2的情况下再设想一下,假设持有锁的主机A在临界区遇到网络抖动导致网络断开,分布式锁及时的释放掉了这把锁。之后,另一个主机B占有了这把锁,但是此时主机A网络恢复,退出临界区时解锁。由于都是同一把锁,所以A就会将B的锁解开。此时如果有第三个主机尝试抢占这把锁,也将会成功获得。因此,我们需要在解锁时,确定自己解的这个锁正是自己锁上的。

1.2.2 进阶条件

如果分布式锁的实现,还能再解决上面的三个问题,那么就可以算是一个相对完整的分布式锁了。然而,在实际的系统环境中,还会对分布式锁有更高级的要求。

  • 可重入:线程中的可重入,指的是外层函数获得锁之后,内层也可以获得锁,ReentrantLock和synchronized都是可重入锁;衍生到分布式环境中,一般仍然指的是线程的可重入,在绝大多数分布式环境中,都要求分布式锁是可重入的。
  • 惊群效应(Herd Effect):在分布式锁中,惊群效应指的是,在有多个请求等待获取锁的时候,一旦占有锁的线程释放之后,如果所有等待的方都同时被唤醒,尝试抢占锁。但是这样的情况会造成比较大的开销,那么在实现分布式锁的时候,应该尽量避免惊群效应的产生。
  • 公平锁和非公平锁:不同的需求,可能需要不同的分布式锁。非公平锁普遍比公平锁开销小。但是业务需求如果必须要锁的竞争者按顺序获得锁,那么就需要实现公平锁。
  • 阻塞锁和自旋锁:针对不同的使用场景,阻塞锁和自旋锁的效率也会有所不同。阻塞锁会有上下文切换,如果并发量比较高且临界区的操作耗时比较短,那么造成的性能开销就比较大了。但是如果临界区操作耗时比较长,一直保持自旋,也会对CPU造成更大的负荷。

可靠的分布式锁的特征

  • 互斥性:只能有一个线程持有锁;
  • 不会发生死锁:即是客户端崩溃没有释放锁,也不会造成其他线程一直拿不到锁
  • 具有容错性:大部分redis节点运行正常,客户端就可以正常加解锁
  • 解铃还须系铃人:加锁和解锁必须是同一个客户端;
  • 锁不能自己失效:正常执行程序中,锁不能因为某些原因失效。

1.3 实现

1.3.1 自定义分布式锁

步骤

  1. 在执行业务代码前添加上redis操作指令,通过setnx向redis中插入一条k-v值同时设置超时时间,key保持一致,value通过UUID获取唯一值。(注:setnx操作如果存在相同key,则插入失败)
  2. 如果插入成功,说明没有分布式锁,则进行业务操作
  3. 最后需要删除分布式锁,直接通过finally进行删除。

伪代码

try{
  Boolean result =  redisTemplate.opsForValue().setIfAbsent(lockKey,uuid,10,TimeUnit.SECONDS);  
  if(!result){
    return "锁被其他线程占用,请稍后再试";
  }
  
  //TODO 业务代码

} finally {
  if(uuid.equals(redisTemplate.opsForValue().get(lockKey))){
    redisTemplate.delete(lockKey);
  }
}

return result;

注意点

  1. 需要设置过期时间,防止锁删除失败导致进程阻塞。
  2. value需要设置为不同值,避免线程一执行时间过长,导致锁过期,然后线程二放入锁但是马上被线程一删除,导致锁永久失效的情况发生。
  3. 开启线程监控进程启动时间,执行到锁失效时间的1/3时,业务未执行完成,需要为锁重置过期时间。防止进程执行中锁失效问题。
  4. 可以使用GETSET(先写新值,返回旧值,原子性操作,可以用于分辨是不是首次操作)操作来实现重入锁。

1.3.2 redisson分布式锁框架实现

加锁流程 解锁流程

注意:使用lua脚本执行命令保证操作的原子性。


加锁脚本 解锁脚本

依赖

<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.6.5</version>
</dependency>

配置类

@Configuration
public class RedissonConfig {

    @Bean
    public Redisson redisson(){
        //此处设置为单机模式
        Config config = new Config();
        config.useSingleServer().setAddress("redis://116.62.148.11:6380").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }

}

伪代码

//lockKey是hset时的key,每次getLock都会生成一个uuid+threadId作为field。
RLock redissonLock = redisson.getLock(lockKey);
try{
  //设置redis锁并设置过期时间
  redissonLock.lock(10,TimeUnit.SECONDS);
  if(!result){
    return "锁被其他线程占用,请稍后再试";
  }
  
  //TODO 业务代码

} finally {
  // 同一个redissonLock有相同的field,可以解锁
  redissonLock.unlock();
  }
}

return result;

原理

image.png

Redisson的特性:

  • 互斥性和自解锁:使用hset (key, UUID+threadId, 1) 保证;
  • 不会发生死锁:设置定时事件;
  • 锁不能自己失效:通过看门狗功能,使用定时器判断锁失效时间,并延长事件。

拓展
如何将系统的性能提高十倍。
只需要将加锁的数据拆成十份,然后每份可以有一个线程执行业务逻辑。将并发量提高了十倍。

二、基于Zookeeper的分布式锁(Menagerie框架)

zk节点分类(节点都是唯一的)

  • 临时节点:创建后一直存在,直到主动删除此节点;
  • 临时有序节点:创建后一直存在,直到主动删除此节点;每个父节点会为它的第一级子节点维护一份时序,记录每个子节点创建的先后顺序。
  • 持久节点:临时节点在客户端会话失效后节点自动清除。临时节点下面不能创建子节点。
  • 持久有序节点:临时节点在客户端会话失效后节点自动清除。临时节点下面不能创建子节点。父节点getChildren会获得顺序的节点列表。

zk事件监听watch

  • 节点创建
  • 节点删除
  • 节点数据修改
  • 子节点变更

zk实现分布式锁:

image.png

2.1 实现过程

2.1.1 会产生惊群效应的实现

  1. 使用zk的临时节点,因为临时节点是唯一的。多个线程争抢锁,即去zk上创建相同名字的临时节点。
  2. 第一个创建的临时节点的线程抢到了锁。开始执行业务。
  3. 其他线程阻塞,通过watch监听节点的变化。
  4. 直到执行业务的线程结束后结束与zk的会话,临时节点自动删除,锁被释放。
  5. 监听到节点删除事件后,所有的线程都会再次竞争这个锁。
    缺点:每次释放锁,所有的线程都会重新开始竞争锁,如果等待线程过多,zk的压力会过大。

2.1.2 避免惊群效应的实现

2.1.2.1 概念

  1. 使用临时有序节点,每个线程创建有序的临时节点。
  2. 每个线程按顺序创建节点,创建节点之后会判断自己是否是当前最小的节点,如果最小的节点获取到锁,开始执行业务;如果不是则通过watch监听自己的前一个节点。
  3. 直到执行业务的线程结束后结束与zk的会话,临时节点自动删除。
  4. 线程监听到前一个节点被删除,则判断它创建的节点是否是最小的,是则获取到锁开始执行业务。
    优点:线程释放锁后只会唤醒监听其创建的节点的线程,不会产生惊群效应。
上锁过程 image.png

2.1.2.2 实现

依赖

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>

代码

public class CuratorDistrLock implements Runnable {

    private static final String ZK_ADDRESS = "192.168.32.242:2181";
    private static final String ZK_LOCK_PATH = "/zkLock";

    static CuratorFramework client;

    static {
        // 创建连接zk的客户端
        client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,
                new RetryNTimes(10, 5000));
        client.start();
    }

    private void curatorLockTest() {
        // 创建锁对象
        InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
        try {
            // 设置获取锁的超时时间
            if (lock.acquire(6 * 1000, TimeUnit.SECONDS)) {
                System.out.println("===== " + Thread.currentThread().getName() + "抢到锁 =====");
            }
            // 执行业务
            Thread.sleep(5000);
        } catch (Exception e) {
            System.out.println("业务异常");
        } finally {
            try {
                lock.release();
                System.out.println("===== " + Thread.currentThread().getName() + "释放锁 =====");
            } catch (Exception e) {
                System.out.println("锁释放异常");
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new CuratorDistrLock()).start();
        new Thread(new CuratorDistrLock()).start();
    }

    public void run() {
        curatorLockTest();
    }
}

2.1.2.2 zk锁种类

  • InterProcessMutex 分布式可重入排它锁
  • InterProcessSemaphoreMutex 分布式排它锁
  • InterProcessReadWriteLock 分布式读写锁
  • InterProcessMultiLock 用多个锁去进行一组操作
  • InterProcessSemaphoreV2 共享信号量





开源的基于ZK的Menagerie框架:
Menagerie中的lock首先实现了可重入锁,利用ThreadLocal存储进入的次数,每次加锁次数加1,每次解锁次数减1。如果判断出是当前线程持有锁,就不用走获取锁的流程。

通过tryAcquireDistributed方法尝试获取锁,循环判断前序节点是否存在,如果存在则监视该节点并且返回获取失败。如果前序节点不存在,则再判断更前一个节点。如果判断出自己是第一个节点,则返回获取成功。

为了在别的线程占有锁的时候阻塞,代码中使用JUC的condition来完成。如果获取尝试锁失败,则进入等待且放弃localLock,等待前序节点唤醒。而localLock是一个本地的公平锁,使得condition可以公平的进行唤醒,配合循环判断前序节点,实现了一个公平锁。关注Java技术栈微信公众号,在后台回复关键字:zookeeper,可以获取更多栈长整理的zk系列技术干货。

这种实现方式非常类似于ReentrantLock的CHL队列,而且zk的临时节点可以直接避免网络断开或主机宕机,锁状态无法清除的问题,顺序节点可以避免惊群效应。这些特性都使得利用ZK实现分布式锁成为了最普遍的方案之一。

三、通过AOP和注解优化代码

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.6.5</version>
        </dependency>

自定义注解代码

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RlockRepeatSubmit {

    /**
     * 通过反射来判断使用的场景
     * @return
     */
    LockConstant lockConstant() default LockConstant.COMMON_LOCK;

}

aop代码

@Configuration
@Aspect
public class RedissonAop {

    @Resource
    private Redisson redisson;

    @Pointcut("@annotation(com.cj.redisson.annotation.RlockRepeatSubmit)")
    public void pointCut() {
    }

    @Around("pointCut()")
    public Object redissonLock(ProceedingJoinPoint pjp) {
        // 获取方法的注解
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        Method method = signature.getMethod();
        RlockRepeatSubmit annotation = method.getAnnotation(RlockRepeatSubmit.class);
        LockConstant lockConstant = annotation.lockConstant();
        Assert.notNull(lockConstant, "服务器异常");

        // 获取方法的参数
        Object[] args = pjp.getArgs();

        String keyPrefix = lockConstant.getKeyPrefix();
        int leaseTime = lockConstant.getLeaseTime();
        int waitTime = lockConstant.getWaitTime();
        TimeUnit timeUnit = lockConstant.getTimeUnit();
        String message = lockConstant.getMessage();

        RLock lock = redisson.getLock(keyPrefix);
        try {
            boolean ifLock = lock.tryLock(waitTime, leaseTime, timeUnit);
            if (ifLock) {
                System.out.println("上锁");
                return pjp.proceed();
            } else {
                throw new IllegalStateException(message);
            }
        } catch (Throwable throwable) {
            throw new IllegalStateException("服务器aop代理异常");
        } finally {
            System.out.println("解锁");
            lock.unlock();
        }
    }

}

枚举类

public enum LockConstant {

    CASHIER("cashierLock:", 8, 10, TimeUnit.SECONDS, "请勿重复操作点击收银操作"),
    SUBMIT_ORDER("submitOrderLock", 3, 30, TimeUnit.SECONDS, "请勿重复点击下单"),
    COMMON_LOCK("commonLock:", 3, 120, TimeUnit.SECONDS, "请勿重复点击");

    private String keyPrefix;
    private int waitTime;
    private int leaseTime;
    private TimeUnit timeUnit;
    private String message;

    LockConstant(String keyPrefix, int waitTime, int leaseTime, TimeUnit timeUnit, String message) {
        this.keyPrefix = keyPrefix;
        this.waitTime = waitTime;
        this.leaseTime = leaseTime;
        this.timeUnit = timeUnit;
        this.message = message;
    }

    public String getKeyPrefix() {
        return keyPrefix;
    }

    public int getWaitTime() {
        return waitTime;
    }

    public int getLeaseTime() {
        return leaseTime;
    }

    public TimeUnit getTimeUnit() {
        return timeUnit;
    }

    public String getMessage() {
        return message;
    }

}

service代码

@Service
public class RedissonServiceImpl implements RedissonService {

    @Override
    @RlockRepeatSubmit(lockConstant = LockConstant.CASHIER)
    public void cashier() {
        System.out.println("enter cashier");
        try {
            TimeUnit.SECONDS.sleep(6);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("finish cashier");
    }

    @Override
    @RlockRepeatSubmit(lockConstant = LockConstant.SUBMIT_ORDER)
    public void submitOrder() {
        System.out.println("enter submitOrder");
    }

}

相关文章

网友评论

      本文标题:分布式锁的理论与实现

      本文链接:https://www.haomeiwen.com/subject/ncbzwhtx.html