美文网首页
10.Redisson源码-CountDownLatch源码剖析

10.Redisson源码-CountDownLatch源码剖析

作者: T_log | 来源:发表于2020-11-04 23:08 被阅读0次

一、CountDownLatch基本原理

  1. countDownLatch最基本的原理其实就是,现在有4个客户端,分别是A、B、C、D,客户端A进行加锁后,设置三个线程来获取锁,那么,必须让接下来的三个客户端BCD都获取锁成功后,客户端A的逻辑才会继续向下走
  2. 如果说,指定3个客户端获取锁,获取锁的客户端数量没有到达3的话,客户端A是不会逻辑是不会向下走的,会被阻塞住

源码

代码片段一、demo

public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.useClusterServers()
                .addNodeAddress("redis://192.168.0.107:7001")
                .addNodeAddress("redis://192.168.0.107:7002")
                .addNodeAddress("redis://192.168.0.110:7003")
                .addNodeAddress("redis://192.168.0.110:7004")
                .addNodeAddress("redis://192.168.0.111:7005")
                .addNodeAddress("redis://192.168.0.111:7006");

        final RedissonClient redisson = Redisson.create(config);

        RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
        // 这里会设置几个客户端来获取锁成功的数量,代码片段二、
        latch.trySetCount(3);
        System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]设置了必须有3个线程执行countDown,进入等待中。。。");

        for (int i = 0; i < 3; i++) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]在做一些操作,请耐心等待。。。。。。");
                        Thread.sleep(3000);
                        RCountDownLatch localLatch = redisson.getCountDownLatch("anyCountDownLatch”);
                        // 代码片段三、
                        localLatch.countDown();
                        System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]执行countDown操作");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        // 代码片段四、
        latch.await();
        System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]收到通知,有3个线程都执行了countDown操作,可以继续往下走");

    }

代码片段二、RedissonCountDownLatch

  1. 参数:
    KEYS[1]= “anyCountDownLatch”
    ARGV[2] = 3,其实就是count参数的值
@Override
public boolean trySetCount(long count) {
    // 这里设置的客户端获取锁的个数为3
    return get(trySetCountAsync(count));
}

// count = 3
@Override
public RFuture<Boolean> trySetCountAsync(long count) {
    return commandExecutor.evalWriteAsync(getName(), 
    LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            //1.exists anyCountDownLatch 客户端A进来不存在,
            // 进入if逻辑
            "if redis.call('exists', KEYS[1]) == 0 then “
                //1.set anyCountDownLatch 3
                + "redis.call('set', KEYS[1], ARGV[2]); "
                + "redis.call('publish', KEYS[2], ARGV[1]); “   
                //1.返回1代表成功
                + "return 1 "
            + "else "
                + "return 0 "
            + "end",
            Arrays.<Object>asList(getName(), getChannelName()),
             newCountMessage, count);
}

代码片段三、RedissonCountDownLatch

参数
KEYS[1] = “anyCountDownLatch”


@Override
public void countDown() {
    get(countDownAsync());
}


@Override
public RFuture<Void> countDownAsync() {
    return commandExecutor.evalWriteAsync(getName(), 
    LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    //1.decr anyCountDownLatch ,其实就是anyCountDownLatch这KEY对应的值,
                    // 第一次是3,减去1,变成2,
                    // 这样的话,后面还允许2个客户端获取锁
                    "local v = redis.call('decr', KEYS[1]);” +
                    //1.如果v,第一次执行后为2小于等于0的话,就直接删除key,所以可以看到,
                    // 当执行到第三个客户端的时候,这里才会成立
                    "if v <= 0 then redis.call('del', KEYS[1]) end;" +
                    "if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;",
                Arrays.<Object>asList(getName(), getChannelName()), zeroCountMessage);
}

代码片段四、RedissonCountDownLatch


public void await() throws InterruptedException {
    RFuture<RedissonCountDownLatchEntry> future = subscribe();
    try {
        commandExecutor.syncSubscription(future);

        // 这里就是说,如果我们业务逻辑里设置的成功获取锁的客户端为3,如果三个客户端都已经成功获取锁,
        //那么KEY就不存在了,如代码片段三中的分析,而如果获取锁的客户端没有达到3的话,这里其实就会进入到一个死循环
        // 不停的等待,直到KEY的值为0,参会继续走下面的逻辑,否则就会一直阻塞在这里
        while (getCount() > 0) {
            // waiting for open state
            RedissonCountDownLatchEntry entry = getEntry();
            if (entry != null) {
                entry.getLatch().await();
            }
        }
    } finally {
        unsubscribe(future);
    }
}

三、demo执行结果图

  1. 刚开始main线程设置必须有三个线程/客户端执行countDown,也就是获取锁成功
  2. 接下来三个线程/客户端成功的获取了锁
  3. 最后三个线程获取锁执行,执行countDown逻辑后,主线程才会继续执行,否则就会一直阻塞住
CountDownLatch.png

四、总结

  1. 到此为止,Redisson的源码基本上已经分析差不多了,其实还有一些环境没有发出来,因为一篇文章不想太长,所以后面会陆陆续续的把redisson源码的其他部分发出来
  2. 接下来就要分析zk的分布式锁了,其实对比Redis分布式锁和zk的分布式锁的优缺点,全方面的进行对比之后,在我们的实际业务开发中,我们才会知道,到底哪个分布式锁更加适合我们,因为,没有最好,只有最合适。
  3. 大家一起努力,加油。

相关文章

网友评论

      本文标题:10.Redisson源码-CountDownLatch源码剖析

      本文链接:https://www.haomeiwen.com/subject/wnbnvktx.html