CountDownLatch
-
倒数闭锁
-
用于协调一组线程的工作
-
简单、明了、粗暴的API
countDown()
await() -
用CountDownLatch实现的线程协同
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap result = new ConcurrentHashMap();
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
Thread.sleep(100);
int value = new Random().nextInt();
result.put(value, value);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 线程安全的并发-1
latch.countDown();
}
}).start();
}
// 当计时器=0时,主线程往下走
latch.await();
System.out.println(result);
}
CyclicBarrier
- 可以循环使用的屏障
- API简单粗暴
await() - 用CyclicBarrier实现的线程协同
public static void main(String[] args) throws Throwable {
ConcurrentHashMap result = new ConcurrentHashMap();
// 开了10个新线程,加上主线程,所以是11个
CyclicBarrier barrier = new CyclicBarrier(11);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
Thread.sleep(100);
int value = new Random().nextInt();
result.put(value, value);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
barrier.await();
} catch (Throwable e) {
e.printStackTrace();
}
}
}).start();
}
// 实际场景中,不需要让线程一直等待,只需要将结果给出来就行, 该方法只是为了演示使用
barrier.await();
// CyclicBarrier还可以循环重复使用
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
Thread.sleep(100);
int value = new Random().nextInt();
result.put(value, value);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
barrier.await();
} catch (Throwable e) {
e.printStackTrace();
}
}
}).start();
}
barrier.await();
System.out.println(result);
}
Semaphore
- 信号量(由于资源[信号量]有限, 虽然线程很多, 但只有两个信号量, 所以需要每个线程开始工作前, 需要获取一个指定的信号量, 如果线程没有办法获取信号量, 那么就无法工作, 只能等待)例:只有两把镰刀割麦子,但是有100个人, 割麦子必须使用镰刀, 那么只能等一个人使用完镰刀之后,释放镰刀,下一个人,才能拿到镰刀取割麦子;
而Lock本质上就是数量为1的Semaphore, 如果把Lock改成一个数量为1的Semaphore,他就能完全当重量级锁使用, 与synchronize完全相同; - 信号量的获取和释放
acquire()
tryAcquire() = Lock的tryLock()
release() - 使用Semaphore实现生产者/消费者
public class SemaphoreTest {
static Semaphore emptySlot = new Semaphore(1);
static Semaphore fullSlot = new Semaphore(0);
public static void main(String[] args) throws Throwable {
Queue<Integer> queue = new LinkedList<>();
Thread producer = new Producer(queue);
producer.start();
Thread consumer = new Consumer(queue);
consumer.start();
producer.join();
consumer.join();
}
static class Producer extends Thread {
Queue<Integer> queue;
@Override
public void run() {
try {
// 拿到资源之后,还需要加锁,避免并发同时干活
emptySlot.acquire();
synchronized (SemaphoreTest.class) {
System.out.println("Producing...");
queue.add(2345);
}
// 释放满的资源给生产者用
fullSlot.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Producer(Queue<Integer> queue) {
this.queue = queue;
}
}
static class Consumer extends Thread {
Queue<Integer> queue;
@Override
public void run() {
try {
// 拿到资源之后,还需要加锁,避免并发同时干活
fullSlot.acquire();
synchronized (SemaphoreTest.class) {
System.out.println("consuming..."+queue.remove());
}
// 释放满的资源给生产者用
emptySlot.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Consumer(Queue<Integer> queue) {
this.queue = queue;
}
}
}
Exchanger/SynchronousQueue
/**
* A synchronization point at which threads can pair and swap elements
* within pairs. Each thread presents some object on entry to the
* {@link #exchange exchange} method, matches with a partner thread,
* and receives its partner's object on return. An Exchanger may be
* viewed as a bidirectional form of a {@link SynchronousQueue}.
* Exchangers may be useful in applications such as genetic algorithms
* and pipeline designs.
Exchanger是一个同步点,在这个同步点上,两个线程可以结对编程,然后把自己手中的东西交换给对方;
一个Exchanger可以看成是一个双向形式的同步队列(SynchronousQueue);
**/
/**
* A {@linkplain BlockingQueue blocking queue} in which each insert
* operation must wait for a corresponding remove operation by another
* thread, and vice versa. A synchronous queue does not have any
* internal capacity, not even a capacity of one. You cannot
* {@code peek} at a synchronous queue because an element is only
* present when you try to remove it; you cannot insert an element
* (using any method) unless another thread is trying to remove it;
* you cannot iterate as there is nothing to iterate.
同步队列(SynchronousQueue)是一个特殊的BlockingQueue, 每个insert操作前
都必须等待另一个线程在remove操作把资源拿走之后,他才能继续,反之亦然;
这个同步队列没有容量, 所以你不能看这个资源是什么,因为每当元素丢进去之
后,会被马上拿走;你也不能插入元素, 除非另一个线程正在remove它; 你也不能
迭代它;
**/
- 实现生产者/消费者(只能实现一对一的情况)
public class ExchangerTest {
public static void main(String[] args) throws Throwable {
Exchanger<Integer> exchanger = new Exchanger<>();
Thread producer = new Producer(exchanger);
producer.start();
Thread consumer = new Consumer(exchanger);
consumer.start();
producer.join();
consumer.join();
}
static class Producer extends Thread {
Exchanger<Integer> exchanger;
@Override
public void run() {
System.out.println("producing");
try {
exchanger.exchange(1234);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Producer(Exchanger<Integer> exchanger) {
this.exchanger = exchanger;
}
}
static class Consumer extends Thread {
Exchanger<Integer> exchanger;
@Override
public void run() {
System.out.println("consuming");
try {
exchanger.exchange(null);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Consumer(Exchanger<Integer> exchanger) {
this.exchanger = exchanger;
}
}
}
AtomicInteger和CountDownLatch
虽然从计数的角度上是相同的,但是CountDownLatch存在阻塞方法, 而 AtomicInteger没有阻塞方法
例: 线程卡死,并且不吃CPU的场景
// 这种情况下, 因为没有人把计算器减到0, 所以会阻塞在这里, 但是不吃CPU
CountDownLatch latch = new CountDownLatch(1);
latch.await();
网友评论