美文网首页
ReentrantLock与Condition详解

ReentrantLock与Condition详解

作者: 壹元伍角叁分 | 来源:发表于2022-04-02 13:27 被阅读0次

一、ReentrantLock

从jdk发行1.5版本之后,在原来synchronize的基础上,增加了重入锁 ReentrantLock。

首先来看一个实例:

class ReentrantLockTestDemo {
    public static void main(String[] args) {

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2
                , 10
                , 1
                , TimeUnit.HOURS
                , new ArrayBlockingQueue<>(4)
                , Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
         MyRunnable myRunnable = new MyRunnable();
        threadPoolExecutor.submit(myRunnable);
        threadPoolExecutor.submit(myRunnable);
    }
}

未使用ReentrantLock:

  static class MyRunnable implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 3; i++) {
                System.out.println("index=" + i + " thread=" + Thread.currentThread().getName());
            }
        }
    }

未使用ReentrantLock打印的结果是没有顺序,杂乱无章的

index=0 thread=pool-1-thread-1
index=0 thread=pool-1-thread-2
index=1 thread=pool-1-thread-1
index=1 thread=pool-1-thread-2
index=2 thread=pool-1-thread-1
index=2 thread=pool-1-thread-2

使用ReentrantLock加入锁:

static class MyRunnable implements Runnable {
    @Override
    public void run() {
        ReentrantLock reentrantLock = new ReentrantLock();
        // 加锁
        reentrantLock.lock();
        try {
            for (int i = 0; i < 3; i++) {
                System.out.println("index=" + i + " thread=" + Thread.currentThread().getName());
            }
        } finally {
            // 解锁
            reentrantLock.unlock();
        }
    }
}

打印出的结果,是有顺序的

index=0 thread=pool-1-thread-1
index=1 thread=pool-1-thread-1
index=2 thread=pool-1-thread-1
index=0 thread=pool-1-thread-2
index=1 thread=pool-1-thread-2
index=2 thread=pool-1-thread-2

这就是锁的作用,它是互斥的,当一个线程持有锁的时候,其他线程只有等待,待待线程执行结束,释放锁,等待的线程再通过竞争得到锁。

二、Condition

通常在开发并发程序的时候,会碰到需要停止正在执行业务A,来执行另一个业务B,当业务B执行完成后,业务A继续执行。就可以通过ReentrantLock和Condtion等待/唤醒来完成这样的操作。在LinkedBlockingQueue的put/take操作中就有使用到。

相较于synchronize的wait()、notify()/notifyAll()则更有针对性、灵活性。可以唤醒符合某个条件线程去执行,而notify/notifyAll()则是随机通知的,具有很大的不可控性。

1、使用Condition实现线程等待和唤醒

class ConditionTestDemo {
    public static void main(String[] args) {

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3
                , 10
                , 1
                , TimeUnit.HOURS
                , new ArrayBlockingQueue<>(4)
                , Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());

        MyService service = new MyService();
        // 线程1、2是符合条件A的
        threadPoolExecutor.submit(new RunnableA(service));
        threadPoolExecutor.submit(new RunnableA(service));
        // 线程3是符合条件B的
        threadPoolExecutor.submit(new RunnableB(service));

        // 主线程sleep2s后,主动唤醒符合条件B的线程。再由线程B去唤醒符合条件A的两个线程。
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() + "等待2s 去唤醒符合条件B的所有线程" + "--------" + DateUtils.INSTANCE.getCurrDataStr());
        service.signalB();
    }

    static class RunnableA implements Runnable {
        private MyService service;

        public RunnableA(MyService service) {
            this.service = service;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "符合条件A--------" + DateUtils.INSTANCE.getCurrDataStr());
            service.awaitA();
        }
    }

    static class RunnableB implements Runnable {
        private MyService service;

        public RunnableB(MyService service) {
            this.service = service;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "符合条件B--------" + DateUtils.INSTANCE.getCurrDataStr());
            service.awaitB();
        }
    }

    static class MyService {
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition threadACondition = reentrantLock.newCondition();
        Condition threadBCondition = reentrantLock.newCondition();

        /**
         * 符合添加A的线程进入等待
         */
        public void awaitA() {
            reentrantLock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + "获取到锁。被要求等待--------" + DateUtils.INSTANCE.getCurrDataStr());
                threadACondition.await();

                System.out.println(Thread.currentThread().getName() + "被唤醒--------" + DateUtils.INSTANCE.getCurrDataStr());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                reentrantLock.unlock();
            }
        }

        /**
         * 符合添加B的线程进入等待
         */
        public void awaitB() {
            reentrantLock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + "获取到锁,被要求等待--------" + DateUtils.INSTANCE.getCurrDataStr());
                threadBCondition.await();

                System.out.println(Thread.currentThread().getName() + "被唤醒--------" + DateUtils.INSTANCE.getCurrDataStr());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                reentrantLock.unlock();
            }
        }

        /**
         * 唤醒符合条件A的所有线程
         */
        public void signalA() {
            reentrantLock.lock();
            try {
                threadACondition.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                reentrantLock.unlock();
            }
        }

        public void signalB() {
            reentrantLock.lock();
            try {
                // 唤醒符合条件B的所有线程
                threadBCondition.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                reentrantLock.unlock();
            }

            System.out.println(Thread.currentThread().getName() + "等待2s 再去唤醒符合条件A的所有线程" + "--------" + DateUtils.INSTANCE.getCurrDataStr());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            // 在唤醒符合条件B的所有线程后,2s再去唤醒符合条件A的所有线程
            signalA();
        }
    }
}

打印结果:

pool-1-thread-1符合条件A--------------22/04/01 21:37:21   
pool-1-thread-3符合条件B--------------22/04/01 21:37:21
pool-1-thread-1获取到锁。被要求等待-----22/04/01 21:37:21
pool-1-thread-2符合条件A--------------22/04/01 21:37:21
pool-1-thread-2获取到锁。被要求等待-----22/04/01 21:37:21
pool-1-thread-3获取到锁,被要求等待-----22/04/01 21:37:21  // 三个线程一开启,就被要求等待
main等待2s 去唤醒符合条件B的所有线程-----22/04/01 21:37:23  // 主线程等待2s后,主动去唤醒符合条件B的线程
pool-1-thread-3被唤醒----------------22/04/01 21:37:23  // 符合条件B的线程被唤醒
main等待2s 再去唤醒符合条件A的所有线程---22/04/01 21:37:25  // 符合条件B的线程被唤醒后,再等待2s,去唤醒符合条件A的所有线程
pool-1-thread-1被唤醒----------------22/04/01 21:37:25
pool-1-thread-2被唤醒----------------22/04/01 21:37:25  // 线程1、2符合条件A,被同一时间唤醒

分别实例化了两个Condition对象,都是使用同一个lock注册。注意 conditionA对象的等待和唤醒只对使用了conditionA的线程有用,同理 conditionB对象的等待和唤醒只对使用了conditionB的线程有用。

2、模拟生产/消费者

static class MyService {
    private ReentrantLock mReentrantLock = new ReentrantLock();
    private Condition mCondition = mReentrantLock.newCondition();
    private boolean isFull;
    private int index;

    public void put() {
        mReentrantLock.lock();
        try {
            // 如果队列已满,则进入等待中
            if (isFull) {
                System.out.println("队列已满,生产者进入等待中----" + DateUtils.INSTANCE.getCurrDataStr());
                mCondition.await();
            }
            System.out.println("开始生产,index=" + index + "需要2s----" +DateUtils.INSTANCE.getCurrDataStr());
            // 每隔2s放一个元素
            index++;
            Thread.sleep(2000);
            // 通知取
            isFull = true;
            mCondition.signalAll();
            System.out.println("结束生产,index=" + index + "唤醒消费者进行生产----" + DateUtils.INSTANCE.getCurrDataStr());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            mReentrantLock.unlock();
        }
    }

    public void take() {
        mReentrantLock.lock();
        try {
            // 如果队列已空,则进入等待中
            if (!isFull) {
                System.out.println("队列已空,消费者进入等待中----" + DateUtils.INSTANCE.getCurrDataStr());
                mCondition.await();
            }

            System.out.println("开始消费,index=" + index + "需要3s----" + DateUtils.INSTANCE.getCurrDataStr());
            index--;
            Thread.sleep(3000);
            isFull = false;
            // 提醒生产者
            mCondition.signalAll();
            System.out.println("结束消费,index=" + index + "唤醒生产者进行生产----" + DateUtils.INSTANCE.getCurrDataStr());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            mReentrantLock.unlock();
        }
    }
}

生产者类:

static class PutRunnable implements Runnable {
    MyService myService;

    public PutRunnable(MyService myService) {
        this.myService = myService;
    }

    @Override
    public void run() {
        while (true) {
            myService.put();
        }
    }
}

消费者类:

static class TakeRunnable implements Runnable {
    MyService myService;

    public TakeRunnable(MyService myService) {
        this.myService = myService;
    }

    @Override
    public void run() {
        while (true) {
            myService.take();
        }
    }
}

启动类:

class ConditionDemo {
    public static void main(String[] args) {
        MyService myService = new MyService();
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        executorService.execute(new PutRunnable(myService));
        executorService.execute(new TakeRunnable(myService));
    }
}

打印结果:

开始生产,index=0需要2s----22/04/02 13:21:15
结束生产,index=1唤醒消费者进行生产----22/04/02 13:21:17
队列已满,生产者进入等待中----22/04/02 13:21:17
开始消费,index=1需要3s----22/04/02 13:21:17
结束消费,index=0唤醒生产者进行生产----22/04/02 13:21:20
队列已空,消费者进入等待中----22/04/02 13:21:20
开始生产,index=0需要2s----22/04/02 13:21:20
结束生产,index=1唤醒消费者进行生产----22/04/02 13:21:22
队列已满,生产者进入等待中----22/04/02 13:21:22
开始消费,index=1需要3s----22/04/02 13:21:22
结束消费,index=0唤醒生产者进行生产----22/04/02 13:21:25
队列已空,消费者进入等待中----22/04/02 13:21:25
开始生产,index=0需要2s----22/04/02 13:21:25
...

3、顺序执行线程

充分发掘Condition的灵活性,可以用它来实现顺序执行线程。

 class MyService {
        private ReentrantLock mReentrantLock = new ReentrantLock();
        // 有三个线程,所有注册三个Condition
        private Condition mConditionA = mReentrantLock.newCondition();
        private Condition mConditionB = mReentrantLock.newCondition();
        private Condition mConditionC = mReentrantLock.newCondition();
        // 通过index控制下一个执行的线程
        private int index;

        public void actionA() {
            mReentrantLock.lock();
            try {
                // 只有index 不等于 0,则进入等待中
                if (index != 0) {
                    mConditionA.await();
                }
                System.out.println("A执行");
                Thread.sleep(1000);
                index = 1;
                mConditionB.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                mReentrantLock.unlock();
            }
        }

        public void actionB() {
            mReentrantLock.lock();
            try {
                // 只有index 不等于 1,则进入等待中
                if (index != 1) {
                    mConditionB.await();
                }
                System.out.println("B执行");
                Thread.sleep(1000);
                index = 2;
                mConditionC.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                mReentrantLock.unlock();
            }
        }

        // 只有index==2时,才执行下面操作,否则休眠
        public void actionC() {
            mReentrantLock.lock();
            try {
                // 只有index 不等于 2,则进入等待中
                if (index != 2) {
                    mConditionC.await();
                }
                System.out.println("C执行");
                Thread.sleep(1000);
                index = 0;
                mConditionA.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                mReentrantLock.unlock();
            }
        }
    }

业务类:

class RunnableA implements Runnable {
    MyService myService;

    public RunnableA(MyService myService) {
        this.myService = myService;
    }

    @Override
    public void run() {
        while (true) {
            myService.actionA();
        }
    }
}

class RunnableB implements Runnable {
    MyService myService;

    public RunnableB(MyService myService) {
        this.myService = myService;
    }

    @Override
    public void run() {
        while (true) {
            myService.actionB();
        }
    }
}

class RunnableC implements Runnable {
    MyService myService;

    public RunnableC(MyService myService) {
        this.myService = myService;
    }

    @Override
    public void run() {
        while (true) {
            myService.actionC();
        }
    }
}

启动类:

class ConditionDemo {
    public static void main(String[] args) {
        MyService myService = new MyService();
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        // 这边故意打乱启动顺序
        executorService.execute(new RunnableB(myService));
        executorService.execute(new RunnableA(myService));
        executorService.execute(new RunnableC(myService));
    }
}

打印结果:

A执行
B执行
C执行
A执行
B执行
C执行
...

相关文章

网友评论

      本文标题:ReentrantLock与Condition详解

      本文链接:https://www.haomeiwen.com/subject/qkytsrtx.html