1.闭锁CountDownLautch
CountDownLatch的一个非常典型的应用场景是:有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。假如我们这个想要继续往下执行的任务调用一个CountDownLatch对象的await()方法,其他的任务执行完自己的任务后调用同一个CountDownLatch对象上的countDown()方法,这个调用await()方法的任务将一直阻塞等待,直到这个CountDownLatch对象的计数值减到0为止。
特点:只能用一次~~~~
<a href="http://www.jianshu.com/writer#/notebooks/11234585/notes/11516357">源码分析</a>
2.FutureTask
阻塞等待线程执行结果,线程池里大量使用。实现了RunnableFuture接口,所以它既是个线程类,又支持阻塞等待获取结果。
值得注意的是他有一个空方法done
,在结束callable设值或者cancel后会执行,可自行扩展。
这里最最主要需要弄明白的是cancel方法的参数:
public boolean cancel(boolean mayInterruptIfRunning) {...}
参数mayInterruptIfRunning的含义:
需要特别注意参数命名IfRunning,顾名思义它只针对正在运行中的task有特殊照顾。
具体结合源码看,源码链接在下面。
mayInterruptIfRunning为true的使用场景:
如果运行中的线程实现了相应中断的逻辑,不管是代码检查,还是有可响应的阻塞方法,而且中断会不导致一些数据不一致的那个危险后果。
mayInterruptIfRunning为false使用场景:
- 任务不能响应中断信号或者不知道人物能不能响应中断信号
- 任务非常敏感,数据正确性要求高,不能随便中断。
<a href="http://www.jianshu.com/writer#/notebooks/11234585/notes/11518626">源码分析</a>
3.信号量semaphore
<a href="http://www.jianshu.com/writer#/notebooks/11234585">源码分析</a>
4.CyclicBarrier
- CyclicBarrier初始化时规定一个数目,然后计算调用了CyclicBarrier.await()进入等待的线程数。当线程数达到了这个数目时,所有进入等待状态的线程被唤醒并继续。
- CyclicBarrier就象它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍。
- CyclicBarrier初始时还可带一个Runnable的参数, 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。
- 最后一条线程到达栅栏后,会重新开启下一次栅栏。
5.CyclicBarrier,CountDownLautch区别
- CyclicBarrier一组线程相互等待,CountDownLautch一组线程等待另外一组线程
- CountDownLautch执行countDown后不会阻塞,会继续执行;CyclicBarrier会阻塞
- CyclicBarrier复用的,CountDownLautch不能复用
- CyclicBarrier提供一个到达栅栏时的runnable,有很大的便利,他能保证在下一代执行前先执行完这个runnable。(鸟鱼池塘那个题目。。。)
6.Exchanger交互数据(jdk8源码太难看懂了。。。。)
7. 完美的本地缓存实现(防止缓存击穿等问题),其实分布式系统中也是一毛一样的思路
public class Memoizer <A, V> implements Computable<A, V> {
// 首先缓存的增删改查必须是线程安全的,所以需要用ConcurrentHashMap
// 其次,缓存的结果不能是直接的结果,需要是一个Future,这样能防止缓存击穿问题(就是某个时间点很多很多线程来缓存拿值,此时值刚好过期或者还没有,这些线程将会全部涌入计算逻辑中。。。所以先生成一个Future,就只会有一个线程真正去计算了,其他的等待结果就好)
private final ConcurrentMap<A, Future<V>> cache
= new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;
public Memoizer(Computable<A, V> c) {
this.c = c;
}
public V compute(final A arg) throws InterruptedException {
while (true) { // 这个循环是希望在被cancel后,重新又生成一个任务继续
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
public V call() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
// 这里能避免两个线程击穿缓存的情况(虽然概率很低,还是可能发生,所以改用原子操作)
f = cache.putIfAbsent(arg, ft);//原子方法putIfAbsent
if (f == null) {//putIfAbsent会把put之前的值返回,如果该值为空(f==null)
f = ft; //则说明之前没有线程put过,才应该执行计算(ft.run)
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
// 避免缓存污染,被取消的任务需要直接清除
// 其实发生runtimeexception也应该清除
cache.remove(arg, f); //如果该计算被取消,则应该从缓存中去掉这个占位符
} catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e.getCause());
}
}
}
}
网友评论