美文网首页
多线程笔记 三

多线程笔记 三

作者: 骑着乌龟追小兔 | 来源:发表于2018-05-11 10:03 被阅读46次

    1. ThreadPoolExecutor

    ThreadPoolExecutor是一种比较常见的处理多线程的执行器。你可以配置线程池的最小线程数,当执行器没有太多的任务要处理的时候。亦可以配置最大线程size,如果有很多任务需要处理。一旦当工作负载降下来,线程池就会慢慢的减少线程数量,知道线程数量达到最小值。

        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                1, // keep at least one thread ready,
                // even if no Runnables are executed
                5, // at most five Runnables/Threads
                // executed in parallel
                1, TimeUnit.MINUTES, // idle Threads terminated after one
                // minute, when min Pool size exceeded
                new ArrayBlockingQueue<Runnable>(10)); // outstanding Runnables are kept here
             pool.execute(new Runnable() {
                @Override 
                public void run () {
                    //code to run
                }
        });
        
    
    • 备注:如果你配置了一个无界队列的线程池。那么线程数量将不会超过corePoolSize .
    • 线程池参数介绍
    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
    TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
    RejectedExecutionHandler handler)
    
    

    如果线程池的数量大于corePoolSize并且小于maximumPoolSize,那么只有在线程池队列满了的情况下才会创建新线程。

    线程池的优点:

    1. BlockingQueue 能够避免内存溢出的场景。应用的表现不会被队列的尺寸限制。

    2. 可以采用不同的Rejection Handler 策略。

      1. 默认策略:抛出RejectedExecutionException
      2. CallerRunsPolicy :如果线程池没有被关闭,那么就执行,否则丢弃
      3. DiscardPolicy: 无法执行,直接丢弃
      4. DiscardOldestPolicy :丢弃队首的任务。
    3. 配置自定义的ThreadFactory的好处.

      1. 定义更有描述性的名称
      2. 设置进程的状态
      3. 设置线程优先权

    2.获取计算任务的值-callable

    • 如果你的运算会产生一些以后需要用到的数据,一个简单的runnable是肯定无法满足你的。在这种场景下,你可以使用 ExecutorService.submit(Callable<T>) 。这个方法会在任务执行完毕之后返回一个值。
    • 这个方法返回一个Future对象,你可以从中获取任务的值。
    // Submit a callable for execution
        ExecutorService pool = anExecutorService;
        Future<Integer> future = pool.submit(new Callable<Integer>() {
        @Override public Integer call() {
        //do some computation
        return new Random().nextInt();
        }
        });
    // ...  perform other tasks while future is executed in a different thread
    
    • 当你要获得执行结果时候,调用future.get()

    • 不确定等到时间的方法 get

        try {
        // Blocks current thread until future is completed
        Integer result = future.get();
        catch (InterruptedException || ExecutionException e) {
        // handle appropriately
        }
    
    • 在指定时间等待结果
        try {
        // Blocks current thread for a maximum of 500 milliseconds.
        // If the future finishes before that, result is returned,
        // otherwise TimeoutException is thrown.
        Integer result = future.get(500, TimeUnit.MILLISECONDS);
        catch (InterruptedException || ExecutionException || TimeoutException e) {}
    
    如果计算结果不在需要了,你可以调用Future.cancel(boolean)
    • cancel(false) 将只会将任务从队列里面一处
    • cancel(true) 还会打断当前运行的任务。

    3. submit()与execute() 异常处理的区别

    • 普通的 execute() 命令一般是用来执行不要结果的任务,submit() 一般是用来分析Future 对象。
    • 我们应该关心着两种异常处理机制不同之处。
    • submit() 方法如果不处理就会被框架处理。

    示例代码如下:

    案例1:用excute 命令来执行 runnable 任务,然后上报异常
    import java.util.concurrent .*;
    import java.util .*;
    
        public class ExecuteSubmitDemo {
            public ExecuteSubmitDemo() {
                System.out.println("creating service");
                ExecutorService service = Executors.newFixedThreadPool(2);
    //ExtendedExecutor service = new ExtendedExecutor();
                for (int i = 0; i < 2; i++) {
                    service.execute(new Runnable() {
                        public void run() {
                            int a = 4, b = 0;
                            System.out.println("a and b=" + a + ":" + b);
                            System.out.println("a/b:" + (a / b));
                            System.out.println("Thread Name in Runnable after divide by
                                    zero:"+Thread.currentThread().getName());
                        }
                    });
                }
                service.shutdown();
            }
    
            public static void main(String args[]) {
                ExecuteSubmitDemo demo = new ExecuteSubmitDemo();
            }
        }
    
        class ExtendedExecutor extends ThreadPoolExecutor {
            public ExtendedExecutor() {
                super(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
            }
    
            // ...
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
                if (t == null && r instanceof Future<?>) {
                    try {
                        Object result = ((Future<?>) r).get();
                    } catch (CancellationException ce) {
                        t = ce;
                    } catch (ExecutionException ee) {
                        t = ee.getCause();
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt(); // ignore/reset
                    }
                }
                if (t != null)
                    System.out.println(t);
            }
        }
    

    输出:

    creating service
    a and b=4:0
    a and b=4:0
    Exception in thread "pool-1-thread-1" Exception in thread "pool-1-thread-2"
    java.lang.ArithmeticException: / by zero
    at ExecuteSubmitDemo$1.run(ExecuteSubmitDemo.java:15)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
    java.lang.ArithmeticException: / by zero
    at ExecuteSubmitDemo$1.run(ExecuteSubmitDemo.java:15)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
    
    案例2:用submit 替换excute ,service.submit(new Runnable() 在这个案例中,异常被框架吃了。

    输出

    creating service
    a and b=4:0
    a and b=4:0
    

    案例3 将newFixedThreadPool换成ExtendedExecutor

    ExtendedExecutor service = new ExtendedExecutor();
    

    输出:

    creating service
    a and b=4:0
    java.lang.ArithmeticException: / by zero
    a and b=4:0
    java.lang.ArithmeticException: / by zero
    
    • 我们评估上面两个案例,得出答案,使用自定义的线程池来处理异常。
    • 其他解决上述问题的方法,如果你使用普通的 ExecutorService & submit ,使用get来获取结果。那么请捕获上述代码里面捕获的三个异常。自定义ThreadPoolExecutor 有一个好处,即是只需要在一个地方捕捉异常。

    4. 处理拒绝执行

    如果:

    1. 你试图向一个一个关闭的Executor 提交任务
    2. 队列已经满了,线程数已经达到最大值
    • 那么RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)将会被调用。
    • 上述方法的默认行为是抛出一个RejectedExecutionException异常。但是我们有更多的策略可以选择:
      1. ThreadPoolExecutor.AbortPolicy (default, will throw REE)
      2. ThreadPoolExecutor.CallerRunsPolicy (executes task on caller's thread - blocking it)
      3. ThreadPoolExecutor.DiscardPolicy (silently discard task)
      4. ThreadPoolExecutor.DiscardOldestPolicy (silently discard oldest task in queue and retry execution of the
        new task)
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) // <--
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) // <
    
    • 你也可以自己实现RejectedExecutionHandler 接口来自定义执行的行为方式。
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
    

    5 :发射后不管 - Runnable Tasks

    Executors接受一个 java.lang.Runnable 参数对象,用来处理耗时或者计算量较大的任务。

    • 使用方法如下:
     Executor exec = Executors.newCachedThreadPool();
        exec.ex(new Runnable() {
                @Override public void run () {
        //offloaded work, no need to get result backJava® Notes for Professionals 696
                }
            });
            
    

    注意使用这个excutor ,你获取不到任何数据返回值。在Java8 中你可以应用lamda 来简化代码

        Executor exec = anExecutor;
        exec.execute(() -> {
        //offloaded work, no need to get result back
        });
    

    6:不同类型的并发的构造的使用案例:

    1. ExecutorService
    • ExecutorService executor = Executors.newFixedThreadPool(50);
    • 这种用法事发简单。它隐藏了很多线程池的底层实现。
    • 在任务数量较小我倾向使用这种方式,它不会让内存增长过快也不会降低系统的性能。如果你有cpu/内存 方面的约束,我建议使用线程池的时候对线程的容量以及 处理拒绝执行的任务。
    1. CountDownLatch
    • CountDownLatch 使用固定的数初始化。这个数会随着countdown 方法被调用而减少。我们可以通过调用await方法让当前 线程等待线程执行直至数量降至0。

    使用条件:

    •     1. 实现最大数量的异步任务:在某些情况,我们可以在同时启动一组线程
      
    •     2. 在开始执行之前等待其他线程执行
      
    •     3. 死锁检测
      
    1. ThreadPoolExecutor
    • 提供了更多的控制。如果程序被限制了要执行一组延迟Runnable/Callable任务,你可以通过设置最大容积来使用有界数组。一旦队列达到最大容量,你可以定义一个 RejectionHandler 来处理拒绝的任务。Java提供了四种类型的RejectionHandler 不同的实现策略。上述已经提及了,这里就不进行赘述了。
    1. 你可能不知道 ForkJoinPool
    • ForkJoinPool是Java 7 引入的。ForkJoinPool与 ExecutorService类似,但是还是有一点区别。ForkJoinPool与非常容易实现任务分割成一些小任务。当某些队列任务执行完毕之后,他便会从其他队列的尾部偷取一个任务进行执行。
      
    • Java 8 引入了一个新的api ,你不需要新建RecursiveTask 任务就只可以直接使用ForkJoinPool;
      
     public static ExecutorService newWorkStealingPool()
    

    创建work-stealing线程池会根据处理器的并行程度最大程度的利用处理器

    • 默认来说,他会采用cpu 的核心来作为参数。
    上述提到的四种原理互不影响。你可以根据你自己的需求,选用合适的框架进行使用

    7. 使用ExecutorService等待所有任务执行完毕

    • 我们先来看一眼方法:
    • ExecutorService invokeAll()
      

      当所有任务都执行完毕会返回一个Futures list。

    示例代码:

        import java.util.concurrent .*;
        import java.util .*;
    
        public class InvokeAllDemo {
            public InvokeAllDemo() {
                System.out.println("creating service");
                ExecutorService service =
                        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
                List<MyCallable> futureList = new ArrayList<MyCallable>();
       
                for (int i = 0; i < 10; i++) {
                    MyCallable myCallable = new MyCallable((long) i);
                    futureList.add(myCallable);
                }
                System.out.println("Start");
                try {
                    List<Future<Long>> futures = service.invokeAll(futureList);
                } catch (Exception err) {
                    err.printStackTrace();
                }
                System.out.println("Completed");
                service.shutdown();
            }
    
            public static void main(String args[]) {
                InvokeAllDemo demo = new InvokeAllDemo();
            }
    
            class MyCallable implements Callable<Long> {
                Long id = 0L;
    
                public MyCallable(Long val) {
                    this.id = val;
                }
    
                public Long call() {
    // Add your business logic
                    return id;
                }
            }
        }
    

    8. 使用不同类型的ExecutorService

    Executors 返回不同类型的线程池来满足不同需求

    1. 1. public static ExecutorService newSingleThreadExecutor()

    创建一个单工作线程来操作一个无界队列

    它与 newFixedThreadPool(1) 和 newSingleThreadExecutor()的区别Java doc 是这样说的:

    与类似的 newFixedThreadPool(1)相比其不保证重新配置异常线程来使用替代线程。

    • 这就意味着newFixedThreadPool可以被程序重新配置类似如下:((ThreadPoolExecutor) fixedThreadPool).setMaximumPoolSize(10),这在newSingleThreadExecutor 是不可能的。
    • 使用场景:
      1. 你打算按照提交顺序来执行任务
      2. 你需要一个线程来执行你的所有请求
      缺点:无界队列有风险
    2. public static ExecutorService newFixedThreadPool(int nThreads)

    创建一个固定数量的线程池,复用线程来操作一个共享的无界队列。在任何时刻,大多数线程都会在执行任务的时候被激活。如果提交了额外的任务,在所有线程都是激活状态的时候,那么他们将会阻塞知道有空闲线程。

    使用场景:

    1.         可以通过获取cpu的数量来提高线程运行情况。
      
    2.          你可以选择线程池的最大线程数
      

    缺点:无界队列有风险

    3. public static ExecutorService newCachedThreadPool()

    创建一个按需分配的线程池,会重复利用之前已经创建的线程。

    使用条件:

    1.   一些执行时间段的异步任务
      
    • 缺点:
      1. 无界队列有风险
      2. 如果所有的存在的线程都在busy 那么每个任务都会新建一个线程。如果任务长期执行,将会创建大量的线程,这将会降低系统的性能。在这种情况下推荐newFixedThreadPool。
    4. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

    创建一个线程池,是的能够在指定时间段之后执行任务,或者每隔一段时间执行

    使用场景:

    1.     处理周期性的事件
      

    缺点:无界队列有风险。

    5. public static ExecutorService newWorkStealingPool()

    创建任务偷取型的线程池,取决于处理器的并发水平

    使用场景
    1.     将任务分割成很多子任务
      
    2.     对于空闲线程处理能力较高
      

    缺点:无界队列有风险

    • 你可能发现了一个通用的缺陷:无界队列。它会与ThreadPoolExecutor一起绑定使用。
        ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
        RejectedExecutionHandler handler)
    

    使用线程池你可以:

    1. 动态控制线程池尺寸
    2. 设置BlockingQueue 容积
    3. 定了拒绝策略
    4. 自定义 CustomThreadFactory 可以有一些附带功能

    9. 调度线程在固定的时间执行,或者在延迟一段时间之后,或者重复执行

    • ScheduledExecutorService 提供了一个方法用来调度执行一次性或者重复的任务。
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
    
    • 在普通的线程池方法之外,ScheduledExecutorService添加了4个方法来调度任务,然后返回ScheduledFuture 对象。
    在固定时间之后开始一个任务
    • 如下代码展示了在十分钟之后执行一个任务:
    ScheduledFuture<Integer> future = pool.schedule(new Callable<>() {
        @Override 
        public Integer call() {
        // do something
        return 42;
        }
    },10, TimeUnit.MINUTES);
    
    以固定的频率来执行任务
    • 如下代码展示了在十分钟之后开始执行一个任务,然后每隔一分钟开始重复任务:
    ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
        @Override 
        public void run() {
        // do something
        }
    },10, 1, TimeUnit.MINUTES);
    

    任务会一直执行到线程池被关闭,future 被去小,或者某个任务发生了异常。


    10. 线程池的使用

    • 线程池大多数情况下都是通过调用 ExcutorService 的方法创建
    • 如下方法可以用来提交任务
    1. submit: 执行提交的任务返回一个future 对象
    2. execute: 执行任务并不期望返回任何值
    3. invokeAll:执行一组任务,并且得到一个返回值列表
    4. invokeAny:执行所有任务,获得其中一个正确执行的(没有异常的),其余没有执行的任务或被取消。
    • 一旦你使用了 shutdown() 来终止线程池。这个操作会阻塞任务的提交。如果向等到所有任务都被执行,你可以用 awaitTermination 或isShutdown() 来包裹代码。

    相关文章

      网友评论

          本文标题:多线程笔记 三

          本文链接:https://www.haomeiwen.com/subject/zgoxdftx.html