美文网首页
5.并发基础构建模块2---并发工具类

5.并发基础构建模块2---并发工具类

作者: 炫迈哥 | 来源:发表于2017-04-20 11:18 被阅读0次

    1.闭锁CountDownLautch

    CountDownLatch的一个非常典型的应用场景是:有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。假如我们这个想要继续往下执行的任务调用一个CountDownLatch对象的await()方法,其他的任务执行完自己的任务后调用同一个CountDownLatch对象上的countDown()方法,这个调用await()方法的任务将一直阻塞等待,直到这个CountDownLatch对象的计数值减到0为止。

    特点:只能用一次~~~~
    <a href="http://www.jianshu.com/writer#/notebooks/11234585/notes/11516357">源码分析</a>

    2.FutureTask

    阻塞等待线程执行结果,线程池里大量使用。实现了RunnableFuture接口,所以它既是个线程类,又支持阻塞等待获取结果。

    值得注意的是他有一个空方法done,在结束callable设值或者cancel后会执行,可自行扩展。

    这里最最主要需要弄明白的是cancel方法的参数:

    public boolean cancel(boolean mayInterruptIfRunning) {...}
    

    参数mayInterruptIfRunning的含义:
    需要特别注意参数命名IfRunning,顾名思义它只针对正在运行中的task有特殊照顾。

    具体结合源码看,源码链接在下面。

    mayInterruptIfRunning为true的使用场景:
    如果运行中的线程实现了相应中断的逻辑,不管是代码检查,还是有可响应的阻塞方法,而且中断会不导致一些数据不一致的那个危险后果。
    mayInterruptIfRunning为false使用场景:

    • 任务不能响应中断信号或者不知道人物能不能响应中断信号
    • 任务非常敏感,数据正确性要求高,不能随便中断。

    <a href="http://www.jianshu.com/writer#/notebooks/11234585/notes/11518626">源码分析</a>

    3.信号量semaphore

    <a href="http://www.jianshu.com/writer#/notebooks/11234585">源码分析</a>

    4.CyclicBarrier

    • CyclicBarrier初始化时规定一个数目,然后计算调用了CyclicBarrier.await()进入等待的线程数。当线程数达到了这个数目时,所有进入等待状态的线程被唤醒并继续。
    • CyclicBarrier就象它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍。
    • CyclicBarrier初始时还可带一个Runnable的参数, 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。
    • 最后一条线程到达栅栏后,会重新开启下一次栅栏。

    5.CyclicBarrier,CountDownLautch区别

    • CyclicBarrier一组线程相互等待,CountDownLautch一组线程等待另外一组线程
    • CountDownLautch执行countDown后不会阻塞,会继续执行;CyclicBarrier会阻塞
    • CyclicBarrier复用的,CountDownLautch不能复用
    • CyclicBarrier提供一个到达栅栏时的runnable,有很大的便利,他能保证在下一代执行前先执行完这个runnable。(鸟鱼池塘那个题目。。。)

    6.Exchanger交互数据(jdk8源码太难看懂了。。。。)

    7. 完美的本地缓存实现(防止缓存击穿等问题),其实分布式系统中也是一毛一样的思路

    public class Memoizer <A, V> implements Computable<A, V> {
        // 首先缓存的增删改查必须是线程安全的,所以需要用ConcurrentHashMap
        // 其次,缓存的结果不能是直接的结果,需要是一个Future,这样能防止缓存击穿问题(就是某个时间点很多很多线程来缓存拿值,此时值刚好过期或者还没有,这些线程将会全部涌入计算逻辑中。。。所以先生成一个Future,就只会有一个线程真正去计算了,其他的等待结果就好)
        private final ConcurrentMap<A, Future<V>> cache
                = new ConcurrentHashMap<A, Future<V>>();  
        private final Computable<A, V> c;
     
        public Memoizer(Computable<A, V> c) {
            this.c = c;
        }
     
        public V compute(final A arg) throws InterruptedException {
            while (true) { // 这个循环是希望在被cancel后,重新又生成一个任务继续
                Future<V> f = cache.get(arg);
                if (f == null) {
                    Callable<V> eval = new Callable<V>() {
                        public V call() throws InterruptedException {
                            return c.compute(arg);
                        }
                    };
                    FutureTask<V> ft = new FutureTask<V>(eval);
                    // 这里能避免两个线程击穿缓存的情况(虽然概率很低,还是可能发生,所以改用原子操作)
                    f = cache.putIfAbsent(arg, ft);//原子方法putIfAbsent
                    if (f == null) {//putIfAbsent会把put之前的值返回,如果该值为空(f==null)
                        f = ft;     //则说明之前没有线程put过,才应该执行计算(ft.run)
                        ft.run();
                    }
                }
                try {
                    return f.get();
                } catch (CancellationException e) {
                    // 避免缓存污染,被取消的任务需要直接清除
                    // 其实发生runtimeexception也应该清除
                    cache.remove(arg, f); //如果该计算被取消,则应该从缓存中去掉这个占位符
                } catch (ExecutionException e) {
                    throw LaunderThrowable.launderThrowable(e.getCause());
                }
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:5.并发基础构建模块2---并发工具类

          本文链接:https://www.haomeiwen.com/subject/zuhhzttx.html