美文网首页
JUC (06)ReentrantLock-Condition

JUC (06)ReentrantLock-Condition

作者: rock_fish | 来源:发表于2020-08-05 13:46 被阅读0次

Condition

利用锁可以让线程以同步的方式来执行一段代码,而Condition则是用来实现线程之间协作的。

功能概述:

多个线程之间协作 工作时,在获取锁之后,还可能需要判断一些条件是否满足,才可以继续执行,如果不满足则需要挂机等待直到条件满足,这里的条件就对应到Condition对象。

  1. 配合Lock使用,附属于Lock实例
  2. 一个Lock实例,可以创建多个Contion变量,根据自己的应用场景
方法介绍

1.Lock#newCondition :创建一个Condition实例

  1. Condition#await :将持锁的当前线程加入到condition变量的等待队列后,挂起当前线程
  2. Condition#sinal : 激活条件变量等待队列中,队首的线程,线程激活后抢占锁,抢不到就进入锁的等待队列中排队。
  3. Condition#sinalAll : 激活条件变量等待队列中,所有的线程。线程激活后抢占锁,抢不到就进入锁的等待队列中排队。
设计
  • ReentrantLock的Condition,如左图可以有多个条件变量(Condition)。

  • synchronize,如右图只有一个条件变量。


    image.png
  • 每个Condition内都有独立的等待队列,线程在等待变量信号时排队;得到信号后从Condition的队列中移除,参与到Lock的同步队列中去竞争锁:


    image.png
Demo

经典的生产者消费者实例,利用2个Condition:

  • 不满就生产
  • 不空就消费
    BoundedBuffer
package com.rock.multithread.juc.lock;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

CakeFactory

package com.rock.multithread.juc.lock;

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class CakeFactory {
    private   ReentrantLock lock  = new ReentrantLock();
    private  Condition not_empty = lock.newCondition();

    private  Condition not_full = lock.newCondition();
    private  int queueSize = 5;

    private  Queue<Long> queue = new ArrayDeque<>(queueSize);

    private  long cakeCount = 1000;
    private   long counter =0;


    public  long makeCake() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == queueSize) {
                not_full.await();
            }

            long no = counter++;
            queue.add(no);
            not_empty.signal();



            return no;
        } finally{
            lock.unlock();
        }
    }

    public  long getCake(){

        lock.lock();
        try {
            while (queue.isEmpty()){
                try {
                    not_empty.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Long no = queue.poll();
            not_full.signal();
            return no;

        }  finally {
            lock.unlock();
        }

    }
}

ConditionDemo

package com.rock.multithread.juc.lock;

import sun.invoke.empty.Empty;

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionDemo {




    public static BoundedBuffer boundedBuffer = new BoundedBuffer();
    public static CakeFactory cakeFactory = new CakeFactory();
    public static long counter =0;

    public static void main(String[] args) throws InterruptedException {

        //Thread makeThread = new Thread(ConditionDemo::put);
       // Thread eatThread = new Thread(ConditionDemo::poll);

        Thread makeThread = new Thread(ConditionDemo::makeCake);
        Thread eatThread = new Thread(ConditionDemo::eatCake);
        makeThread.start();
        TimeUnit.SECONDS.sleep(1);
        eatThread.start();

        makeThread.join();
        eatThread.join();

    }


    public static void put(){
        while (true){
            try {
                long no = counter ++;
                System.out.println("put " + no);
                boundedBuffer.put(no);

                TimeUnit.SECONDS.sleep(1);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    public static void poll(){
        while (true){
            try {

                Object take = boundedBuffer.take();
                System.out.println("take " + take);
                TimeUnit.SECONDS.sleep(1);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }


    public static  void makeCake() {

        while (true) {

            try {
                long cakeNo = cakeFactory.makeCake();
                System.out.println("make cake" + cakeNo);
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void eatCake(){

        while (true){
            long cakeNo = cakeFactory.getCake();
            System.out.println("eat cake" + cakeNo);

            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

相关文章

网友评论

      本文标题:JUC (06)ReentrantLock-Condition

      本文链接:https://www.haomeiwen.com/subject/gymbrktx.html