美文网首页
JAVA线程及线程池使用

JAVA线程及线程池使用

作者: 我知他风雨兼程途径日暮不赏 | 来源:发表于2020-04-03 15:24 被阅读0次

    参考如下文章进行整理:

    1. JAVA几种线程使用

    1.1 Runnable同时执行任务模式
    1.2 Join等待某个线程执行完开始执行模式
    1.3 Object lock+synchronized资源竞争与释放模式
    1.4 CountDownLatch等待几个线程执行完再执行某一个线程模式
    1.5 CyclicBarrier多个线程执行到某个点后同时执行模式
    1.6 FutureTask获取线程执行结果模式
    1.7 Semaphore信号量

    1. 线程池

    2.1 线程池介绍

    2.1.1 总体设计
    2.1.2 ThreadPoolExecutor生命周期及运行状态
    2.1.3 任务调度

    2.2 线程池四种模式

    2.2.1 newSingleThreadExecutor单线程线程池
    2.2.2 newFixedThreadPool定长线程池
    2.2.3 newScheduledThreadPool定时线程池
    2.2.4 newCachedThreadPool 缓存线程池

    1. JAVA几种线程使用

    1.1 Runnable同时执行任务模式

    package sync;
    
    // 线程同时运行
    public class Sync1Runable {
    
        public static void main(String args[]){
            demo1();
        }
        private static void demo1(){
            Thread A = new Thread(new Runnable() {
                @Override
                public void run() {
                    printNumber("A");
                }
            });
            Thread B = new Thread(new Runnable() {
                @Override
                public void run() {
                    printNumber("B");
                }
            });
            A.start();
            B.start();
        }
    
        private static void printNumber(String threadName) {
            int i=0;
            System.out.println(threadName + " print: " + i);
            while (i++ < 3) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(threadName + " print: " + i);
            }
        }
    }
    

    1.2 Join等待某个线程执行完开始执行模式

    package sync;
    
    public class Sync2Join {
    
        public static void main(String args[]){
            demo1();
        }
    
        // 通过join函数,等待某个线程结束,再执行,等A执行完后再执行B
        private static void demo1(){
            Thread A = new Thread(new Runnable() {
                @Override
                public void run() {
                    printNumber("A");
                }
            });
            Thread B = new Thread(new Runnable() {
                @Override
                public void run() {
                    try{
                        A.join();
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                    printNumber("B");
                }
            });
            A.start();
            B.start();
        }
    
    
        private static void printNumber(String threadName) {
            int i=0;
            while (i++ < 3) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(threadName + " print: " + i);
            }
        }
    }
    
    

    1.3 Object lock+synchronized资源竞争与释放模式

    这里要注意的是,这个demo仅是作为大家作为该功能的展示,如果你把A.start()放在B.start()后面会发生死锁,这里大家可以思考下具体原因。

    package sync;
    
    import org.omg.CORBA.OBJ_ADAPTER;
    
    /**
     * @ClassName sync.Sync3ObjectWaitNotify
     * @Description wait() notify()实现线程交叉执行
     * @Author Lczy-Huang
     * @Date 2018/10/9 11:19
     * @Version 1.0
     **/
    public class Sync3ObjectWaitNotify {
    
        public  static void main(String arg[]){
            demo3();
        }
    
        /*
         * @description 通过对一个对象的锁,wait休眠,notify重新唤醒达到线程交叉执行
         * @params []
         * @return void
         * @author Huang Bing
         * @time 2018/10/9 11:26
         */
        private static void demo3(){
    
            Object lock = new Object();
            Thread A = new Thread(new Runnable() {
                @Override
                public void run() {
                    synchronized (lock){
                        System.out.println("A1");
                        try{
                            lock.wait();
                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }
                        System.out.println("A2");
                        System.out.println("A3");
                    }
                }
            });
    
            Thread B = new Thread(new Runnable() {
                @Override
                public void run() {
                    synchronized (lock){
                        System.out.println("B1");
                        System.out.println("B2");
                        System.out.println("B3");
                        lock.notify();
                    }
                }
            });
            A.start();
            B.start();
    
        }
    }
    
    

    1.4 CountDownLatch等待几个线程执行完再执行某一个线程模式

    package sync;
    
    import java.util.concurrent.CountDownLatch;
    
    /**
     * @ClassName sync.Sync4CountDownLatch
     * @Description 等待3个线程全部执行完后,执行第4个线程
     * @Author Lczy-Huang
     * @Date 2018/10/9 11:37
     * @Version 1.0
     **/
    public class Sync4CountDownLatch {
        public static void main(String arg[]){
            runDAfterABC();
        }
    
        private static void runDAfterABC(){
            int worker =3;
            CountDownLatch countDownLatch = new CountDownLatch(worker);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("D is waiting for other three threads");
                    try{
                        countDownLatch.await();
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                    System.out.println("D is working");
                }
            }).start();
            for (char threadName='A'; threadName <= 'C'; threadName++) {
                final String tN = String.valueOf(threadName);
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(tN + " is working");
                        try {
                            Thread.sleep(100);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        System.out.println(tN + " finished");
                        countDownLatch.countDown();
                    }
                }).start();
            }
        }
    
    }
    
    

    1.5 CyclicBarrier多个线程执行到某个点后同时执行模式

    package sync;
    
    import java.util.Random;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    /**
     * @ClassName Sync5CyclicBarrierDemo
     * @Description 所有线程到某个点一起等待进行
     * @Author Lczy-Huang
     * @Date 2018/10/9 11:52
     * @Version 1.0
     **/
    public class Sync5CyclicBarrierDemo {
        public static void main(String arg[]){
            runABCWhenAllReady();
        }
        private static void runABCWhenAllReady(){
            int runner = 3;
            CyclicBarrier cyclicBarrier = new CyclicBarrier(runner);
            final Random random = new Random();
            for(char runnerName='A';runnerName<='C';runnerName++){
                final String rN = String.valueOf(runnerName);
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        long prepareTime = random.nextInt(10000)+100;
                        System.out.println(rN+"is preparing for time:"+prepareTime);
                        try{
                            Thread.sleep(prepareTime);
                        }catch (Exception ex){
                            ex.printStackTrace();
                        }
                        try{
                            System.out.println(rN + " is prepared, waiting for others");
                            cyclicBarrier.await(); // 当前运动员准备完毕,等待别人准备好
                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }catch (BrokenBarrierException e){
                            e.printStackTrace();
                        }
                        System.out.println(rN+"start running");// 所有运动员都准备好了,一起开始跑
                    }
                }).start();
            }
        }
    
    }
    
    

    1.6 FutureTask获取线程执行结果模式

    该模式看似很鸡肋,但是需要配合线程池去使用,获得一群线程的返回结果存放在list中。

    package sync;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
    
    /**
     * @ClassName Sync6FutureTask
     * @Description 获取线程执行返回值
     * @Author Lczy-Huang
     * @Date 2018/10/9 12:26
     * @Version 1.0
     **/
    public class Sync6FutureTask {
    
        public static void main(String arg[]){
            doTaskWithResultInWorker();
        }
        private static void doTaskWithResultInWorker() {
            Callable<Integer> callable = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    System.out.println("Task starts");
                    Thread.sleep(1000);
                    int result = 0;
                    for (int i=0; i<=100; i++) {
                        result += i;
                    }
                    System.out.println("Task finished and return result");
                    return result;
                }
            };
            FutureTask<Integer> futureTask = new FutureTask<>(callable);
            new Thread(futureTask).start();
            new Thread(futureTask).start();
            try {
                System.out.println("Before futureTask.get()");
                System.out.println("Result: " + futureTask.get());
                System.out.println("After futureTask.get()");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    配合线程池使用:

    package sync;
    
    import java.util.ArrayList;
    import java.util.concurrent.*;
    
    /**
     * @author Lczy-Huang
     * @version 1.0
     * @classname Sync7ParamFutureTask
     * @description TODO
     * @date 2019/1/17 10:43
     **/
    public class Sync7ParamFutureTask implements Callable<String> {
        private int id;
        public Sync7ParamFutureTask(int id){
            this.id=id;
        }
        public String call() throws Exception{
            return "result:"+id;
        }
    
    
        public static class CallableDemo{
            public static void main(String[] args){
                ExecutorService executorService = Executors.newCachedThreadPool();
                ArrayList<Future<String>> futures = new ArrayList<Future<String>>();
                for(int i=0;i<10;i++){
                    futures.add(executorService.submit(new Sync7ParamFutureTask(i)));
                }
                for(Future<String> fs:futures){
                    try{
                        System.out.println(fs.get());
                    }catch (InterruptedException e){
                        System.out.println(e);
                        e.printStackTrace();
                    }catch (ExecutionException ex){
                        System.err.println(ex);
                        ex.printStackTrace();
                    }finally {
                        executorService.shutdown();
                    }
                }
    
            }
        }
    }
    

    1.7 Semaphore信号量

    如果你认真的看完以上几个例子,看不懂,没关系,你完全可以用Semaphore代替它们。有两个重要的方法:

    方法名 作用
    acquire() 消耗一个semaphore的值,如果值为0则堵塞等待有值
    release() 释放一个semaphore的值,即semaphore++;

    交替执行例子:

    class FooBar {
        private int n;
    
        public FooBar(int n) {
            this.n = n;
        }
        Semaphore foo = new Semaphore(1);
        Semaphore bar = new Semaphore(0);
        public void foo(Runnable printFoo) throws InterruptedException {
            
            for (int i = 0; i < n; i++) {
                foo.acquire();
                // printFoo.run() outputs "foo". Do not change or remove this line.
                printFoo.run();
                bar.release();
            }
        }
    
        public void bar(Runnable printBar) throws InterruptedException {
            
            for (int i = 0; i < n; i++) {
                bar.acquire();
                // printBar.run() outputs "bar". Do not change or remove this line.
                printBar.run();
                foo.release();
            }
        }
    }
    

    通过这里例子你可以发现,只要有足够多的信号量存在配合runable接口,可以实现大部分的线程模式。

    2. 线程池

    2.1 线程池设计说明

    2.1.1 总体设计

    Java中的线程池核心实现类是ThreadPoolExecutor,本章基于JDK 1.8的源码来分析Java线程池的核心设计与实现。我们首先来看一下ThreadPoolExecutor的UML类图,了解下ThreadPoolExecutor的继承关系。


    图ThreadPoolExecutor UML类图

    ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。
    AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。


    图ThreadPoolExecutor运行流程

    线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

    2.1.2 ThreadPoolExecutor生命周期及运行状态

    ThreadPoolExecutor的运行状态有5种,分别为:

    运行状态 状态描述
    RUNNING 能接受新提交的任务,并且也能处理阻塞队列中任务
    SHUTDOWN 关闭状态,不能接受新提交任务,但可以继续处理阻塞队列中的任务
    STOP 不能接受新任务,也不处理队列中的任务,会中断正在指向的任务线程
    TIDYING 所有任务都已经终止,workerCount(有效)
    TERMINATED 在terminated()方法执行后进入该状态
    图线程池生命周期

    2.1.3 任务调度

    首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
    首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。

    • 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
    • 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
    • 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
    • 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
      其执行流程如下图所示:


      任务调度流程

    2.2 线程池四种模式

    2.2.1 newSingleThreadExecutor单线程线程池

    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

     /** 
         *创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
         */
        public static void singleTheadPoolTest() {
            ExecutorService pool = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 10; i++) {
                final int ii = i;
                pool.execute(() -> out.println(Thread.currentThread().getName() + "=>" + ii));
            }
        }
    
    -----output-------
     线程名称:pool-1-thread-1,执行0
     线程名称:pool-1-thread-1,执行1
     线程名称:pool-1-thread-1,执行2
     线程名称:pool-1-thread-1,执行3
     线程名称:pool-1-thread-1,执行4
     线程名称:pool-1-thread-1,执行5
     线程名称:pool-1-thread-1,执行6
     线程名称:pool-1-thread-1,执行7
     线程名称:pool-1-thread-1,执行8
     线程名称:pool-1-thread-1,执行9
    

    2.2.2 newFixedThreadPool定长线程池

    • 底层:返回ThreadPoolExecutor实例,接收参数为所设定线程数量nThread,corePoolSize为nThread,maximumPoolSize为nThread;keepAliveTime为0L(不限时);unit为:TimeUnit.MILLISECONDS;WorkQueue为:new LinkedBlockingQueue<Runnable>() 无界阻塞队列
    • 通俗:创建可容纳固定数量线程的池子,每隔线程的存活时间是无限的,当池子满了就不在添加线程了;如果池中的所有线程均在繁忙状态,对于新任务会进入阻塞队列中(无界的阻塞队列)
    • 适用:执行长期的任务,性能好很多
      /**
         * 1.创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小<br>
         * 2.线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程<br>
         * 3.因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字,和线程名称<br>
         */
        public static void fixTheadPoolTest() {
            ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 10; i++) {
                final int ii = i;
                fixedThreadPool.execute(() -> {
                    out.println("线程名称:" + Thread.currentThread().getName() + ",执行" + ii);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    ------output-------
    线程名称:pool-1-thread-3,执行2
    线程名称:pool-1-thread-1,执行0
    线程名称:pool-1-thread-2,执行3
    线程名称:pool-1-thread-3,执行4
    线程名称:pool-1-thread-1,执行5
    线程名称:pool-1-thread-2,执行6
    线程名称:pool-1-thread-3,执行7
    线程名称:pool-1-thread-1,执行8
    线程名称:pool-1-thread-3,执行9
    

    2.2.3 newScheduledThreadPool定时线程池

    • 底层:创建ScheduledThreadPoolExecutor实例,corePoolSize为传递来的参数,maximumPoolSize为Integer.MAX_VALUE;keepAliveTime为0;unit为:TimeUnit.NANOSECONDS;workQueue为:new DelayedWorkQueue() 一个按超时时间升序排序的队列
    • 通俗:创建一个固定大小的线程池,线程池内线程存活时间无限制,线程池可以支持定时及周期性任务执行,如果所有线程均处于繁忙状态,对于新任务会进入DelayedWorkQueue队列中,这是一种按照超时时间排序的队列结构
    • 适用:周期性执行任务的场景
    /**
         * 创建一个定长线程池,支持定时及周期性任务执行。延迟执行
         */
        public static void sceduleThreadPool() {
            ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
            Runnable r1 = () -> out.println("线程名称:" + Thread.currentThread().getName() + ",执行:3秒后执行");
            scheduledThreadPool.schedule(r1, 3, TimeUnit.SECONDS);
            Runnable r2 = () -> out.println("线程名称:" + Thread.currentThread().getName() + ",执行:延迟2秒后每3秒执行一次");
            scheduledThreadPool.scheduleAtFixedRate(r2, 2, 3, TimeUnit.SECONDS);
            Runnable r3 = () -> out.println("线程名称:" + Thread.currentThread().getName() + ",执行:普通任务");
            for (int i = 0; i < 5; i++) {
                scheduledThreadPool.execute(r3);
            }
        }
    ----output------
    线程名称:pool-1-thread-1,执行:普通任务
    线程名称:pool-1-thread-5,执行:普通任务
    线程名称:pool-1-thread-4,执行:普通任务
    线程名称:pool-1-thread-3,执行:普通任务
    线程名称:pool-1-thread-2,执行:普通任务
    线程名称:pool-1-thread-1,执行:延迟2秒后每3秒执行一次
    线程名称:pool-1-thread-5,执行:3秒后执行
    线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次
    线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次
    线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次
    线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次
    

    2.2.4 newCachedThreadPool 缓存线程池

    • 底层:返回ThreadPoolExecutor实例,corePoolSize为0;maximumPoolSize为Integer.MAX_VALUE;keepAliveTime为60L;unit为TimeUnit.SECONDS;workQueue为SynchronousQueue(同步队列)
    • 通俗:当有新任务到来,则插入到SynchronousQueue中,由于SynchronousQueue是同步队列,因此会在池中寻找可用线程来执行,若有可以线程则执行,若没有可用线程则创建一个线程来执行该任务;若池中线程空闲时间超过指定大小,则该线程会被销毁。
    • 适用:执行很多短期异步的小程序或者负载较轻的服务器
    /**
         * 1.创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程<br>
         * 2.当任务数增加时,此线程池又可以智能的添加新线程来处理任务<br>
         * 3.此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小<br>
         * 
         */
        public static void cacheThreadPool() {
            ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
            for (int i = 1; i <= 10; i++) {
                final int ii = i;
                try {
                    Thread.sleep(ii * 1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                cachedThreadPool.execute(()->out.println("线程名称:" + Thread.currentThread().getName() + ",执行" + ii));
            }
    
        }
    -----output------
    线程名称:pool-1-thread-1,执行1
    线程名称:pool-1-thread-1,执行2
    线程名称:pool-1-thread-1,执行3
    线程名称:pool-1-thread-1,执行4
    线程名称:pool-1-thread-1,执行5
    线程名称:pool-1-thread-1,执行6
    线程名称:pool-1-thread-1,执行7
    线程名称:pool-1-thread-1,执行8
    线程名称:pool-1-thread-1,执行9
    线程名称:pool-1-thread-1,执行10
    

    相关文章

      网友评论

          本文标题:JAVA线程及线程池使用

          本文链接:https://www.haomeiwen.com/subject/ziqhphtx.html