都有些啥
- 提供了比 synchronized 更加高级的各种同步结构,包括 CountDownLatch、CyclicBarrier、Semaphore 等,可以实现更加丰富的多线程操作,比如利用 Semaphore 作为资源控制器,限制同时进行工作的线程数量。
- 各种线程安全的容器,比如最常见的 ConcurrentHashMap、有序的 ConcunrrentSkipListMap,或者通过类似快照机制,实现线程安全的动态数组 CopyOnWriteArrayList 等。
- 各种并发队列实现,如各种 BlockedQueue 实现,比较典型的 ArrayBlockingQueue、 SynchorousQueue 或针对特定场景的 PriorityBlockingQueue 等。
- 强大的 Executor 框架,可以创建各种不同类型的线程池,调度任务运行等,绝大部分情况下,不再需要自己从头实现线程池和任务调度器。
- 信号量 Semaphore
它通过控制一定数量的允许(permit)的方式,来达到限制通用资源访问的目的
public class UsualSemaphoreSample {
public static void main(String[] args) throws InterruptedException {
System.out.println("Action...GO!");
Semaphore semaphore = new Semaphore(5); //限定五个信号量
for (int i = 0; i < 10; i++) {
Thread t = new Thread(new SemaphoreWorker(semaphore));
t.start(); //10个线程来跑
}
}
}
class SemaphoreWorker implements Runnable {
private String name;
private Semaphore semaphore;
public SemaphoreWorker(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
try {
log("is waiting for a permit!");
semaphore.acquire(); //获取信号量 获取不到阻塞
log("acquired a permit!");
log("executed!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log("released a permit!");
semaphore.release(); //释放一个信号量回去
}
}
private void log(String msg){
if (name == null) {
name = Thread.currentThread().getName();}
System.out.println(name + " " + msg);
}
}
这样一个线程执行完了释放了信号量(许可), 立即就有阻塞的线程获得许可进入执行
倘若想一次放五个进入, 完了再放五个进入,可以如下(实际不会这么写, 仅演示):
public class AbnormalSemaphoreSample {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(0);
for (int i = 0; i < 10; i++) {
Thread t = new Thread(new MyWorker(semaphore));
t.start();
}
System.out.println("Action...GO!"); // 此时上面起的五个线程阻塞于信号量的获取处
semaphore.release(5); // 释放五个信号量(许可)
System.out.println("Wait for permits off");
while (semaphore.availablePermits()!=0) { //查看可用信号量是否为0
Thread.sleep(100L);
}
System.out.println("Action...GO again!"); // 为0的话就再放五个
semaphore.release(5);
}
}
class MyWorker implements Runnable {
private Semaphore semaphore;
public MyWorker(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Executed!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
总的来看, Semaphore就像一个计数器, 基本逻辑基于acquire和release,并没有太复杂的同步逻辑. 如果 Semaphore 的数值被初始化为 1,那么一个线程就可以通过 acquire 进入互斥状态,本质上和互斥锁是非常相似的。但是区别也非常明显,比如互斥锁是有持有者的,而对于 Semaphore 这种计数器结构,虽然有类似功能,但其实不存在真正意义的持有者,除非我们进行扩展包装。
- CountDownLatch 和 CyclicBarrier
- CountDownLatch 是不可以重置的,所以无法重用;而 CyclicBarrier 则没有这种限制,可以重用。
- CountDownLatch 的基本操作组合是 countDown/await。调用 await 的线程阻塞等待 countDown 足够的次数,不管你是在一个线程还是多个线程里 countDown,只要次数足够await处就不再阻塞。 CountDownLatch 操作的是事件。
- CyclicBarrier 的基本操作组合,则就是 await,当所有的伙伴(parties 线程们)都调用了 await,才会继续进行任务,并自动进行重置。注意,正常情况下,CyclicBarrier 的重置都是自动发生的,如果我们调用 reset 方法,但还有线程在等待,就会导致等待线程被打扰,抛出 BrokenBarrierException 异常。CyclicBarrier 侧重点是线程,而不是调用事件,它的典型应用场景是用来等待并发线程结束。
用CountDownLatch来实现五个一次五个一次
public class LatchSample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(6);
for (int i = 0; i < 5; i++) {
Thread t = new Thread(new FirstBatchWorker(latch));
t.start();}
for (int i = 0; i < 5; i++) {
Thread t = new Thread(new SecondBatchWorker(latch));
t.start();}
// 注意这里也是演示目的的逻辑,并不是推荐的协调方式
while ( latch.getCount() != 1 ){ //在第一个五个没执行完(没countdown)时一直等
Thread.sleep(100L);
}
System.out.println("Wait for first batch finish");
latch.countDown(); // 这里第6个countdown 第二个五个就可以开始了
}
}
class FirstBatchWorker implements Runnable {
private CountDownLatch latch;
public FirstBatchWorker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
System.out.println("First batch executed!");
latch.countDown();
}
}
class SecondBatchWorker implements Runnable {
private CountDownLatch latch;
public SecondBatchWorker(CountDownLatch latch) {
this.latch = latch;}
@Override
public void run() {
try {
latch.await();
System.out.println("Second batch executed!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上例也从侧面体现出了它的局限性,虽然它也能够支持 10 个人排队的情况,但是因为不能重用,如果要支持更多人排队,就不能依赖一个 CountDownLatch 进行了. 使用CyclicBarrier实现如下:
public class CyclicBarrierSample {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("Action...GO again!");
}
});
for (int i = 0; i < 5; i++) {
Thread t = new Thread(new CyclicWorker(barrier));
t.start();
}
}
static class CyclicWorker implements Runnable {
private CyclicBarrier barrier;
public CyclicWorker(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
for (int i=0; i<3 ; i++){
System.out.println("Executed!");
barrier.await();
}
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
CyclicBarrier 其实反映的是线程并行运行时的协调
为了让输出更能表达运行时序,使用了 CyclicBarrier 特有的 barrierAction("Action...GO again!"),当屏障被触发时,Java 会自动调度该动作
输出如下
图片.png
-
并发包里提供的线程安全 Map、List 和 Set
图片.png
总体上种类和结构还是比较简单的,如果我们的应用侧重于 Map 放入或者获取的速度,而不在乎顺序,大多推荐使用 ConcurrentHashMap,反之则使用 ConcurrentSkipListMap
关于两个 CopyOnWrite 容器,其实 CopyOnWriteArraySet 是通过包装了 CopyOnWriteArrayList 来实现的,所以在学习时,我们可以专注于理解一种。
CopyOnWrite 到底是什么意思呢?它的原理是,任何修改操作,如 add、set、remove,都会拷贝原数组,修改后替换原来的数组,通过这种防御性的方式,实现另类的线程安全. 第三章中也有说明
public boolean add(E e) {
synchronized (lock) {
Object[] elements = getArray();
int len = elements.length;
// 拷贝
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
// 替换
setArray(newElements);
return true;
}
}
final void setArray(Object[] a) {
array = a;
}
网友评论