美文网首页
CountDownLatch&CyclicBarrier&Sem

CountDownLatch&CyclicBarrier&Sem

作者: 叫我C30混凝土 | 来源:发表于2021-05-12 01:27 被阅读0次

CountDownLatch

  • 倒数闭锁

  • 用于协调一组线程的工作

  • 简单、明了、粗暴的API
    countDown()
    await()

  • 用CountDownLatch实现的线程协同

public static void main(String[] args) throws InterruptedException {
        ConcurrentHashMap result = new ConcurrentHashMap();
        CountDownLatch latch = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                try {
                    Thread.sleep(100);
                    int value = new Random().nextInt();
                    result.put(value, value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 线程安全的并发-1
                    latch.countDown();
                }
            }).start();
        }
        // 当计时器=0时,主线程往下走
        latch.await();
        System.out.println(result);
    }

CyclicBarrier

  • 可以循环使用的屏障
  • API简单粗暴
    await()
  • 用CyclicBarrier实现的线程协同
public static void main(String[] args) throws Throwable {
        ConcurrentHashMap result = new ConcurrentHashMap();
        // 开了10个新线程,加上主线程,所以是11个
        CyclicBarrier barrier = new CyclicBarrier(11);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(100);
                    int value = new Random().nextInt();
                    result.put(value, value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        barrier.await();
                    } catch (Throwable e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        // 实际场景中,不需要让线程一直等待,只需要将结果给出来就行, 该方法只是为了演示使用
        barrier.await();

        // CyclicBarrier还可以循环重复使用
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(100);
                    int value = new Random().nextInt();
                    result.put(value, value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        barrier.await();
                    } catch (Throwable e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        barrier.await();
        System.out.println(result);
    }

Semaphore

  • 信号量(由于资源[信号量]有限, 虽然线程很多, 但只有两个信号量, 所以需要每个线程开始工作前, 需要获取一个指定的信号量, 如果线程没有办法获取信号量, 那么就无法工作, 只能等待)例:只有两把镰刀割麦子,但是有100个人, 割麦子必须使用镰刀, 那么只能等一个人使用完镰刀之后,释放镰刀,下一个人,才能拿到镰刀取割麦子;
    而Lock本质上就是数量为1的Semaphore, 如果把Lock改成一个数量为1的Semaphore,他就能完全当重量级锁使用, 与synchronize完全相同;
  • 信号量的获取和释放
    acquire()
    tryAcquire() = Lock的tryLock()
    release()
  • 使用Semaphore实现生产者/消费者
public class SemaphoreTest {

    static Semaphore emptySlot = new Semaphore(1);
    static Semaphore fullSlot = new Semaphore(0);


    public static void main(String[] args) throws Throwable {
        Queue<Integer> queue = new LinkedList<>();
        Thread producer = new Producer(queue);
        producer.start();

        Thread consumer = new Consumer(queue);
        consumer.start();

        producer.join();
        consumer.join();
    }

    static class Producer extends Thread {
        Queue<Integer> queue;

        @Override
        public void run() {
            try {
                // 拿到资源之后,还需要加锁,避免并发同时干活
                emptySlot.acquire();
                synchronized (SemaphoreTest.class) {
                    System.out.println("Producing...");
                    queue.add(2345);
                }
                // 释放满的资源给生产者用
                fullSlot.release();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public Producer(Queue<Integer> queue) {
            this.queue = queue;
        }
    }


    static class Consumer extends Thread {
        Queue<Integer> queue;

        @Override
        public void run() {
            try {
                // 拿到资源之后,还需要加锁,避免并发同时干活
                fullSlot.acquire();
                synchronized (SemaphoreTest.class) {
                    System.out.println("consuming..."+queue.remove());
                }
                // 释放满的资源给生产者用
                emptySlot.release();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public Consumer(Queue<Integer> queue) {
            this.queue = queue;
        }
    }

}

Exchanger/SynchronousQueue

/**
 * A synchronization point at which threads can pair and swap elements
 * within pairs.  Each thread presents some object on entry to the
 * {@link #exchange exchange} method, matches with a partner thread,
 * and receives its partner's object on return.  An Exchanger may be
 * viewed as a bidirectional form of a {@link SynchronousQueue}.
 * Exchangers may be useful in applications such as genetic algorithms
 * and pipeline designs.
Exchanger是一个同步点,在这个同步点上,两个线程可以结对编程,然后把自己手中的东西交换给对方;
一个Exchanger可以看成是一个双向形式的同步队列(SynchronousQueue);
**/

/**
 * A {@linkplain BlockingQueue blocking queue} in which each insert
 * operation must wait for a corresponding remove operation by another
 * thread, and vice versa.  A synchronous queue does not have any
 * internal capacity, not even a capacity of one.  You cannot
 * {@code peek} at a synchronous queue because an element is only
 * present when you try to remove it; you cannot insert an element
 * (using any method) unless another thread is trying to remove it;
 * you cannot iterate as there is nothing to iterate.
同步队列(SynchronousQueue)是一个特殊的BlockingQueue, 每个insert操作前
都必须等待另一个线程在remove操作把资源拿走之后,他才能继续,反之亦然; 
这个同步队列没有容量, 所以你不能看这个资源是什么,因为每当元素丢进去之
后,会被马上拿走;你也不能插入元素, 除非另一个线程正在remove它; 你也不能
迭代它;
**/
  • 实现生产者/消费者(只能实现一对一的情况)
public class ExchangerTest {

    public static void main(String[] args) throws Throwable {
        Exchanger<Integer> exchanger = new Exchanger<>();
        Thread producer = new Producer(exchanger);
        producer.start();

        Thread consumer = new Consumer(exchanger);
        consumer.start();

        producer.join();
        consumer.join();
    }

    static class Producer extends Thread {
        Exchanger<Integer> exchanger;

        @Override
        public void run() {
            System.out.println("producing");
            try {
                exchanger.exchange(1234);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public Producer(Exchanger<Integer> exchanger) {
            this.exchanger = exchanger;
        }
    }


    static class Consumer extends Thread {
        Exchanger<Integer> exchanger;

        @Override
        public void run() {
            System.out.println("consuming");
            try {
                exchanger.exchange(null);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public Consumer(Exchanger<Integer> exchanger) {
            this.exchanger = exchanger;
        }
    }

}

AtomicInteger和CountDownLatch

虽然从计数的角度上是相同的,但是CountDownLatch存在阻塞方法, 而 AtomicInteger没有阻塞方法
例: 线程卡死,并且不吃CPU的场景

// 这种情况下, 因为没有人把计算器减到0, 所以会阻塞在这里, 但是不吃CPU
CountDownLatch latch = new CountDownLatch(1);
latch.await();

相关文章

网友评论

      本文标题:CountDownLatch&CyclicBarrier&Sem

      本文链接:https://www.haomeiwen.com/subject/ytyodltx.html