美文网首页
Lock/Condition

Lock/Condition

作者: 叫我C30混凝土 | 来源:发表于2021-04-25 23:49 被阅读0次

java1.5以前
synchronized <---> 同步
wait / notify <---> 协同

1.5引入
lock <---> 同步(确保共享变量不会被错误访问)
condition <---> 协同(多个线程能以正确的顺序和正确的方式进行交互)

引入原因
condition文档中

{@code Condition} factors out the {@code Object} monitor methods ({@link Object#wait() wait}, {@link Object#notify notify} and {@link Object#notifyAll notifyAll}) into distinct objects to give the effect of having multiple wait-sets per object,by combining them with the use of arbitrary {@link Lock} implementations.
把object对象中自带的wait/notify/notifyAll拆成单独的condition对象;

  • Lock/Condition与synchronize/wait/notify机制的差别;
  1. synchronize不够灵活,方法必须在同一个类中,无法拆解;
  2. 一个对象只能有一个等待集合;
  3. 无法实现读写分离;(因为synchronize是一个非常重的悲观锁);
  4. 无法提供测试方法,是否上锁,而Lock中有tryLock();
  5. 优先级/公平性;(synchronize的调度完全取决于操作系统)
    公平锁/非公平锁
    具体实现
    java.util.concurrent.locks.ReentrantLock.NonfairSync
    java.util.concurrent.locks.ReentrantLock.FairSync
  • 重入和不可重入
    重入锁: 如果已经持有锁,可以无穷次的进入和退出;
Lock lock = new ReentrantLock();
    void fooWithLock() {
        try {
            lock.lock();
            lock.lock();
        } finally {
            lock.unlock();
            lock.unlock();
        }
    }

synchronize是可重入的;并且由虚拟机保证,无论以什么方式退出,都会解锁;

/**
 * 生产者消费者
 */
public class ProducerConsumer {
    private static final Integer MAX_QUEUE_SIZE = 100;
    private static Lock lock = new ReentrantLock();
    private static Condition queueEmpty = lock.newCondition();
    private static Condition queueFull = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        Queue<Integer> queue = new LinkedList();
        Producer producer = new Producer(queue);
        producer.start();
        Consumer consumer = new Consumer(queue);
        consumer.start();
        Producer producer2 = new Producer(queue);
        producer2.start();
        Consumer consumer2 = new Consumer(queue);
        consumer2.start();

        producer.join();
        consumer.join();
        producer2.join();
        consumer2.join();
    }

    public static class Producer extends Thread {
        Queue<Integer> queue;

        @Override
        public void run() {
            lock.lock(); // 等价于synchronized(Condition)
            try {
                while (queue.size() >= MAX_QUEUE_SIZE) {
                    queueEmpty.await(); //等价于Object的wait,但由于wait()为final,所以重写await()
                }
                queue.add(new Random().nextInt());
                queueFull.signalAll(); // 等价于Object的notify,也为final

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }

        public Producer(Queue<Integer> queue) {
            this.queue = queue;
        }
    }

    public static class Consumer extends Thread {
        Queue<Integer> queue;

        @Override
        public void run() {
            lock.lock(); // 等价于synchronized(Condition)
            try {
                while (queue.size() <= 0) {
                    queueFull.await(); //等价于Object的wait,但由于wait()为final,所以重写await()
                }
                Integer removeNum = queue.remove();
                System.out.println("取出" + removeNum);
                queueEmpty.signalAll(); // 等价于Object的notify,也为final

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }

        public Consumer(Queue<Integer> queue) {
            this.queue = queue;
        }
    }
}
/**
 * 线程协同机制
 */
public class ThreadCoordination {


    public static void main(String[] args) {
        ConcurrentHashMap<Integer, Integer> results = new ConcurrentHashMap<>();
        Lock lock = new ReentrantLock();
        Condition allThreadFinished = lock.newCondition();
        AtomicInteger howManyThreadRunning = new AtomicInteger(10);

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                int value = new Random().nextInt();
                results.put(value, value);

                lock.lock();
                try {
                    howManyThreadRunning.decrementAndGet();
                    allThreadFinished.signal();
                } finally {
                    lock.unlock();
                }

            }).start();
        }

        lock.lock();
        try {
            while (howManyThreadRunning.get() > 0) {
                allThreadFinished.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

        System.out.println(results);
    }
}

相关文章

网友评论

      本文标题:Lock/Condition

      本文链接:https://www.haomeiwen.com/subject/ejfsrltx.html