美文网首页
java多线程,线程池,多线程工具

java多线程,线程池,多线程工具

作者: kaixingdeshui | 来源:发表于2020-11-09 13:18 被阅读0次

线程的基本操作

用new关键字创建一个线程对象,然后调用它的start()启动线程即可。

Thread thread  = new Thread();
thread.start();

线程有个run()方法,start()会创建一个新的线程并让这个线程执行run()方法。

//这种run()方法调用不能新建一个线程,而是在当前线程中调用run()方法,
//将run方法只是作为一个普通的方法调用。
Thread thread1  = new Thread();
thread1.run();

start方法是启动一个线程,run方法只会在当前线程中串行的执行run方法中的代码。

Thread thread = new Thread(){
    @Override
    public void run() {
        System.out.println("xxxxxxx");
    }
};
thread.start();

通过继承Thread类,然后重写run方法,来自定义一个线程。
java中刚好提供了Runnable接口来自定义一个线程。

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

Thread类有一个非常重要的构造方法:

public Thread(Runnable target)

Thread的run方法:

@Override
public void run() {
    if (target != null) {
        target.run();
    }
}

实现Runnable接口是比较常见的做法,也是推荐的做法。

Thread thread2 =new Thread(new Runnable() {
      @Override
      public void run() {

      }
});
thread2.start();

线程状态 Thread

6个状态定义:java.lang.Thread.State


image.png

1.NEW : 尚未启动的线程的线程状态

2.RUNNABLE :可运行线程的线程状态,等待cpu调度

3.BLOCKED:线程阻塞等待监视器锁定的线程状态,处在synchronized同步代码块或方法中被阻塞。

4.WAITING:等待线程的线程状态,如 不带超时的方式:Object.wait,Thread.join,LockSupport.park进入这个状态

5.TIMED_WAITING:具有指定等待时间的等待线程的此案陈状态,如 带超时方式;Thread.sleep,Object.wait,Thread.join,LockSupport.parkNanos,LockSupport.parkUntil进入这个状态

6.TERMINATED:终止线程的线程状态。线程正常完成执行或出现异常

线程终止

正确的线程中止 interrupter

如果目标线程在调用Object class的wait(),wait(long)或wait(long,int)方法,join(),join(long,int),或sleep(long,int)方法是被阻塞,那么Interrupt会生效,该线程的中断状态将被清除,抛出InterrupttedException异常。

如果目标线程是被I/O或者NIO中的Channel锁阻塞,同样,I/O操作会被中断或者返回特殊异常值,达到终止线程的目的。

如果以上条件都不满足,则会设置此线程的中断状态。

正确的线程中止 标志位

在代码逻辑中,增加一个判断,用来控制。

Thread提供了3个与线程中断有关的方法:

public void interrupt()//中断线程
public boolean isInterrupted()//判断线程是否中断
public static boolean interrupted()//判断线程是否被中断,并清除当前中断状态
public static void main(String[] args) throws InterruptedException {
        Thread thread2 =new Thread(){
            @Override
            public void run() {
                while (true){
                    if (this.interrupted()){
                        break;
                    }
                }
            }
        };
        thread2.start();
        TimeUnit.SECONDS.sleep(1);
        thread2.interrupt();//中断
    }

如果一个线程调用了sleep方法,一直处于休眠状态,通过变量控制,不可以中断线程。此时只能使用线程提供的interrupt方法来中断线程了。

 Thread thread2 =new Thread(){
            @Override
            public void run() {
                while (true){
                    try {
                        TimeUnit.SECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        this.interrupt();//抛异常,发出终止线程
                        e.printStackTrace();
                    }
                    if (this.isInterrupted()){
                        break;
                    }
                }
            }
        };
        thread2.setName("thread2");
        thread2.start();
        TimeUnit.SECONDS.sleep(1);
        thread2.interrupt();//中断
}

sleep方法由于中断而抛出异常之后,线程的中断标志会被清除(置为false),所以在异常中需要执行this.interrupt()方法,将中断标志位置为true.

线程通信

通信方式:

1)文件共享
2)网络共享
3)共享变量
4)jdk提供的线程协调API

线程协作 JDK API

多线程协作的典型场景:生产者 - 消费者 (线程的阻塞,线程唤醒)

等待(wait)和通知(notify)
为了支持 多线程 之间的协作,JDK提供了两个非常重要的方法:等待wait()方法和通知notify()方法。在Object类中定义的。这意味着所有的对象都可以调用者两个方法。java.lang.Object

public final void wait() throws InterruptedException
public final native void notify();//随机唤醒

当在一个对象实例上调用wait()方法后,当前线程就会在这个对象上等待。

比如在线程A中,调用了obj.wait()方法,那么线程A就会停止继续执行,转为等待状态。等待到什么时候结束呢?线程A会一直等到其他线程调用obj.notify()方法为止,这时,obj对象成为了多个线程之间的有效通信手段。

如果一个线程调用了object.wait()方法,那么它就会进出object对象的等待队列。这个队列中,可能会有多个线程,因为系统可能运行多个线程同时等待某一个对象。

当object.notify()方法被调用时,它就会从这个队列中 随机 选择一个线程,并将其唤醒。这个选择是不公平.

Object独享还有一个nofiyAll()方法,它和notify()方法的功能类似,不同的是,它会唤醒在这个等待队列中所有等待的线程,而不是随机选择一个。

Object.wait()方法并不能随便调用。它必须包含在对应的synchronize语句汇总,无论是wait()方法或者notify()方法都需要首先获取目标独享的一个监视器。
Object.wait()方法和Thread.sleeep()方法都可以让现场等待若干时间。除wait()方法可以被唤醒外,另外一个主要的区别就是wait()方法会释放目标对象的锁,而Thread.sleep()方法不会释放锁。

park/unpark机制
线程调用park则等待“许可”,unpark方法为指定线程
提供“许可permit”。
不要求 park和unpark方法的调用顺序。
调了几次park,就得调几次unpark

线程封闭

数据都被封闭在各自的线程之中,就不需要同步,这种通过将数据封闭在线程中而避免使用同步的技术即线程封闭。
ThreadLocal
ThreadLocal是java中的一种特殊的变量,每个线程都有一个ThreadLocal就是每个线程都拥有了自己的一个变量,竞争条件被彻底消除了,在并发模式下是绝对安全的变量。

用法:

  ThreadLocal<T> var = new ThreadLocal<T>();

会自动在每一个线程上创建一个T的副本,副本之间彼此独立,互不影响。可以用ThreadLocal存储一些参数,以便在线程中多个方法中使用,用来代替方法传参的做法。

栈封闭
局部变量的固有属性之一即是封闭在线程中,它们位于执行线程的栈中,其它线程无法访问这个栈。

线程池及原理

volatile修饰共享变量

java帮我们提供了这样的方法,使用volatile修饰共享变量,被volatile修改的变量有以下特点:

1.线程中读取的时候,每次读取都会去主内存中读取共享变量最新的值,然后将其复制到工作内存

2.线程中修改了工作内存中变量的副本,修改之后会立即刷新到主内存

volatile解决了共享变量在多线程中可见性的问题,可见性是指一个线程对共享变量的修改,对于另一个线程来说是否是可以看到的。

volatile不能保证线程安全,只能保证被修饰变量的内存可见性,如果对该变量执行的是非原子性操作依旧线程不安全。

什么是线程池?

如果系统能够提前为我们创建好线程,我们需要的时候直接拿来使用,用完之后不是直接将其关闭,而是将其返回到线程中中,给其他需要这使用,这样直接节省了创建和销毁的时间,提升了系统的性能。

线程池实现原理

当向线程池提交一个任务之后,线程池的处理流程如下:

  1. 判断是否达到核心线程数,若未达到,则直接创建新的线程处理当前传入的任务,否则进入下个流程
  2. 线程池中的工作队列是否已满,若未满,则将任务丢入工作队列中先存着等待处理,否则进入下个流程
  3. 是否达到最大线程数,若未达到,则创建新的线程处理当前传入的任务,否则交给线程池中的饱和策略进行处理。
    image.png

jdk中提供了线程池的具体实现,实现类是:java.util.concurrent.ThreadPoolExecutor,主要构造方法:ThreadPoolExecutor

类型 类名 描述
接口 Executor 最上层的接口,定义了执行任务的 execute
接口 ExecutorService 继承Executor接口,扩展Callable,Tutrue,关闭方法
接口 ScheduledExecutorService 继承ExecutorService,增加定时任务相关方法
实现类 ThreadPoolExecutor 基础,标准的线程池实现
实现类 ScheduledThreadPoolExecutor 继承ThreadPoolExecutor,实现ScheduledExecutorService定时任务的方法
image.png

ExecutorService

//优雅关闭线程,之前提交的任务继续执行,但不接受新的任务
void shutdown();
//尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行任务的列表
List<Runnable> shutdownNow();
//如果关闭后所有任务都已完成,则返回true
boolean isTerminated();
//监测线程池是否关闭,直到所有任务完成执行,或超时发生,或当前线程被中断
boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
//提交一个用于执行Callable返回任务,并返回一个Future,用获取Callable执行结果
<T> Future<T> submit(Callable<T> task);
//提交一个运行任务执行,并返回一个Future, 执行结果为传入result
<T> Future<T> submit(Runnable task, T result);
//提交一个运行任务执行,并返回一个Future, 执行结果null
Future<?> submit(Runnable task);
//执行给定的任务集合,执行完毕后返回结果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
//执行给定的任务集合,执行完毕或超时,返回结果,其它任务终止
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
//执行给定的任务,任意一个执行成功,则返回结果,其它终止
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
//执行给定的任务,任意一个执行成功或超时,则返回结果,其它终止
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

ScheduledExecutorService

//创建并执行一个一次性任务,过了延迟时间会被执行
public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
//创建并执行一个一次性任务,过了延迟时间会被执行
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
//创建并执行一个周期性任务,过了给定初始延迟时间,会第一次被执行,执行过程发生异常,那么任务终止
//一次性任务执行超过了周期时间,下一次任务会等到该任务执行结束后,立刻执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
//创建并执行一个周期性任务,过了给定初始延迟时间,第一次被执行,后续以给定的周期时间执行
//执行过程中发生异常,那么任务就停止。
//一次性任务执行超过了周期时间,下一次任务在该任务执行结束的时间基础上,计算执行延时
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
ThreadPoolExecutor
image.png
image.png
image.png
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

corePoolSize:核心线程大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使有其他空闲线程可以处理任务也会创新线程,等到工作的线程数大于核心线程数时就不会在创建了。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前把核心线程都创造好,并启动

maximumPoolSize:线程池允许创建的最大线程数。如果队列满了,并且以创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果我们使用了无界队列,那么所有的任务会加入队列,这个参数就没有什么效果了

keepAliveTime:线程池的工作线程空闲后,保持存活的时间。如果没有任务处理了,有些线程会空闲,空闲的时间超过了这个值,会被回收掉。如果任务很多,并且每个任务的执行时间比较短,避免线程重复创建和回收,可以调大这个时间,提高线程的利用率

unit:keepAliveTIme的时间单位,可以选择的单位有天、小时、分钟、毫秒、微妙、千分之一毫秒和纳秒。类型是一个枚举java.util.concurrent.TimeUnit,这个枚举也经常使用,有兴趣的可以看一下其源码

workQueue:工作队列,用于缓存待处理任务的阻塞队列,常见的有4种,本文后面有介绍

threadFactory:线程池中创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字

handler:饱和策略,当线程池无法处理新来的任务了,那么需要提供一种策略处理提交的新任务,默认有4种策略

调用线程池的execute方法处理任务,执行execute方法的过程:
  1. 判断线程池中运行的线程数是否小于corepoolsize,是:则创建新的线程来处理任务,否:执行下一步
  2. 试图将任务添加到workQueue指定的队列中,如果无法添加到队列,进入下一步
  3. 判断线程池中运行的线程数是否小于maximumPoolSize,是:则新增线程处理当前传入的任务,否:将任务传递给handler对象rejectedExecution方法处理
线程池的使用步骤:
  1. 调用构造方法创建线程池
  2. 调用线程池的方法处理任务
  3. 关闭线程池
/**
 * ThreadPoolExecutor 线程池
 *
 */
public class ThreadPoolExecutorDemo {

    static ThreadPoolExecutor executor = new ThreadPoolExecutor(3,
            5,
            10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(10),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) {
        for (int i=0;i<10;i++){
            int j = i;
            String taskName = "任务"+j;
            executor.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(j);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+taskName+"处理完毕");
            });

        }
        executor.shutdown();
    }

}

线程池中常见5种工作队列

任务太多的时候,工作队列用于暂时缓存待处理的任务,jdk中常见的5种阻塞队列:

ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按照先进先出原则对元素进行排序

LinkedBlockingQueue:是一个基于链表结构阻塞队列,此队列按照先进先出排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool使用了这个队列。

SynchronousQueue :一个不存储元素的阻塞队列,每个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处理阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用这个队列

PriorityBlockingQueue:优先级队列,进入队列的元素按照优先级会进行排序

image.png
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    System.out.println("开始");
    for (int i=0;i<50;i++){
        int j = i;
        String taskName = "任务"+j;
        executorService.execute(() -> {
            System.out.println(Thread.currentThread().getName()+" 处理 "+taskName);
            try {
                //模拟任务内部处理耗时
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    System.out.println("结束");
    executorService.shutdown();
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

使用上面的方式创建线程池需要注意,如果需要处理的任务比较耗时,会导致新来的任务都会创建新的线程进行处理,可能会导致创建非常多的线程,最终耗尽系统资源,触发OOM。

PriorityBlockingQueue优先级队列的线程池
static class Task implements Runnable,Comparable<Task>{
    private int i;
    private String name;
    public Task(int i,String name){
        this.i = i;
        this.name=name;
    }
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" 处理 "+this.name);
    }
    @Override
    public int compareTo(Task o) {
        return Integer.compare(o.i,this.i);
    }
}    
public static void main(String[] args) {
    ExecutorService executorService = new ThreadPoolExecutor(1,1,60L,
            TimeUnit.SECONDS,new PriorityBlockingQueue<>());
    for (int i=0;i<10;i++){
        int j = i;
        String taskName = " 任务 "+j;
        executorService.execute(new Task(i,taskName));
    }
    for (int i=100;i>=90;i--){
        int j = i;
        String taskName = " 任务 "+j;
        executorService.execute(new Task(i,taskName));
    }
    executor.shutdown();
}

除了第一个任务,其他任务按照优先级高低按顺序处理。原因在于:创建线程池的时候使用了优先级队列,进入队列中的任务会进行排序,任务的先后顺序由Task中的i变量决定。向PriorityBlockingQueue加入元素的时候,内部会调用代码中Task的compareTo方法决定元素的先后顺序。

自定义创建线程的工厂

给线程池中线程起一个有意义的名字,在系统出现问题的时候,通过线程堆栈信息可以更容易发现系统中问题所在。自定义创建工厂需要实现java.util.concurrent.ThreadFactory接口中的Thread newThread(Runnable r)方法,参数为传入的任务,需要返回一个工作线程。

static AtomicInteger threadNum  = new AtomicInteger(1);
public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5,5,
            60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(10),
            r -> {
        Thread thread = new Thread(r);
        thread.setName("自定义线程--"+threadNum.getAndIncrement());
        return thread;
            });
    for (int i=0;i<5;i++){
        String taskName = "任务--" + i;
        executor.execute(() ->{
            System.out.println(Thread.currentThread().getName()+" , 处理: "+taskName);
        });
    }
    executor.shutdown();
}
4种常见饱和策略

当线程池中队列已满,并且线程池已达到最大线程数,线程池会将任务传递给饱和策略进行处理。这些策略都实现了RejectedExecutionHandler接口。接口中有个方法:

void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

r:需要执行的任务

executor:当前线程池对象

JDK中提供了4种常见的饱和策略:

AbortPolicy:直接抛出异常

CallerRunsPolicy:在当前调用者的线程中运行任务,即随丢来的任务,由他自己去处理

DiscardOldestPolicy:丢弃队列中最老的一个任务,即丢弃队列头部的一个任务,然后执行当前传入的任务

DiscardPolicy:不处理,直接丢弃掉,方法内部为空

自定义饱和策略

需要实现RejectedExecutionHandler接口。任务无法处理的时候,我们想记录一下日志,我们需要自定义一个饱和策略,示例代码:

static class Task implements Runnable{
        private String name;
        public Task(String name){
            this.name = name;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+", 处理="+this.name);
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public String toString() {
            return "Task{name="+name+"}";
        }
    }
  public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1,1,
                60L,TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1),
                Executors.defaultThreadFactory(),
                (r,rexecutor) -> {
                //自定义饱和策略
                 //记录一下无法处理的任务
                    System.out.println("无法处理的任务: "+r.toString());
        });
        for (int i=0;i<5;i++){
            executor.execute(new Task(" 任务-"+i));
        }
        executor.shutdown();
    }

输出结果中可以看到有3个任务进入了饱和策略中,记录了任务的日志,对于无法处理多任务,我们最好能够记录一下,让开发人员能够知道。任务进入了饱和策略,说明线程池的配置可能不是太合理,或者机器的性能有限,需要做一些优化调整。

扩展线程池

ThreadPoolExecutor内部提供了几个方法beforeExecuteafterExecuteterminated,可以由开发人员自己去这些方法。看一下线程池内部的源码:

image.png
image.png
image.png
beforeExecute:任务执行之前调用的方法,有2个参数,第1个参数是执行任务的线程,第2个参数是任务
protected void beforeExecute(Thread t, Runnable r) { }

afterExecute:任务执行完成之后调用的方法,2个参数,第1个参数表示任务,第2个参数表示任务执行时的异常信息,如果无异常,第二个参数为null

protected void afterExecute(Runnable r, Throwable t) { }

terminated:线程池最终关闭之后调用的方法。所有的工作线程都退出了,最终线程池会退出,退出时调用该方法

protected void terminated() { }
public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
            10,60L,TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(1),
            Executors.defaultThreadFactory(),
            (r,executors) -> {
                //自定义饱和策略
                //记录一下无法处理的任务
                System.out.println("无法处理的任务-" + r.toString());

            }){
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            System.out.println(System.currentTimeMillis()+","+t.getName()+",开始执行任务:" + r.toString());
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()+",r任务:" + r.toString()+",执行完毕");
        }

        @Override
        protected void terminated() {
            System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName() + ",关闭线程池");
        }
    };
  for (int i= 0;i<10;i++){
        executor.execute(new Task("任务-"+i));
    }
    TimeUnit.SECONDS.sleep(2);
    executor.shutdown();
}

JUC中的Executors框架

Excecutor框架主要包含3部分的内容:

  1. 任务相关的:包含被执行的任务要实现的接口:Runnable接口或Callable接口
  2. 任务的执行相关的:包含任务执行机制的核心接口Executor,以及继承自ExecutorExecutorService接口。Executor框架中有两个关键的类实现了ExecutorService接口(ThreadPoolExecutorScheduleThreadPoolExecutor
  3. 异步计算结果相关的:包含接口Future实现Future接口的FutureTask类

Executors框架包括:

  • Executor
  • ExecutorService
  • ThreadPoolExecutor
  • Executors
  • Future
  • Callable
  • FutureTask
  • CompletableFuture
  • CompletionService
  • ExecutorCompletionService

Executors类

Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。常用的方法有:


image.png
image.png

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(){}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {}

创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。内部使用了无限容量的LinkedBlockingQueue阻塞队列来缓存任务,任务如果比较多,单线程如果处理不过来,会导致队列堆满,引发OOM。

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {}

创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,在提交新任务,任务将会进入等待队列中等待。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。内部使用了无限容量的LinkedBlockingQueue阻塞队列来缓存任务,任务如果比较多,如果处理不过来,会导致队列堆满,引发OOM。

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {}

public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {}

创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,
使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。

Future、Callable接口

Future接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。

image.png
Callable接口中定义了需要有返回的任务需要实现的方法。
@FunctionalInterface
public interface Callable<V> { 
    V call() throws Exception;
}

比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。

//获取异步任务执行结果
public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(1);
    Future<Integer> result = executorService.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()+",开始");
            TimeUnit.SECONDS.sleep(5);
            System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()+",结束");
            return 10;
        }
    });
     System.out.println(System.currentTimeMillis() +"  "+ Thread.currentThread().getName());
    try {//get()阻塞等待结果返回
        System.out.println(System.currentTimeMillis() +"  "+ Thread.currentThread().getName()+", result="+result.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }finally {
        executorService.shutdown();
    }
}

代码中创建了一个线程池,调用线程池的submit方法执行任务,submit参数为Callable接口:表示需要执行的任务有返回值,submit方法返回一个Future对象,Future相当于一个凭证,可以在任意时间拿着这个凭证去获取对应任务的执行结果(调用其get方法),代码中调用了result.get()方法之后,此方法会阻塞当前线程直到任务执行结束。

超时获取异步任务执行结果

可能任务执行比较耗时,比如耗时1分钟,我最多只能等待10秒,如果10秒还没返回,我就去做其他事情了。

刚好get有个超时的方法,声明如下:

java.util.concurrent.Future;

V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
Future其他方法介绍一下

cancel:取消在执行的任务,参数表示是否对执行的任务发送中断信号,方法声明如下:

boolean cancel(boolean mayInterruptIfRunning);

isCancelled:用来判断任务是否被取消

isDone:判断任务是否执行完毕。

Future、Callable接口需要结合ExecutorService来使用,需要有线程池的支持。

FutureTask类

FutureTask除了实现Future接口,还实现了Runnable接口,因此FutureTask可以交给Executor执行,也可以交给线程执行执行(Thread有个Runnable的构造方法),FutureTask表示带返回值结果的任务。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            System.out.println(System.currentTimeMillis() + "开始" + Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(3);
            System.out.println(System.currentTimeMillis() + " 结束 " + Thread.currentThread().getName());
            return 10;
        }
    });
    System.out.println(System.currentTimeMillis() + "  " + Thread.currentThread().getName());
    new Thread(futureTask).start();//线程启动Futuretask
    System.out.println(System.currentTimeMillis() + "  " + Thread.currentThread().getName());
    System.out.println(System.currentTimeMillis() +"  "+ Thread.currentThread().getName()+",result:"+futureTask.get());
}

image.png
image.png

线程池的submit方法返回的Future实际类型正是FutureTask对象

image.png
CompletionService接口

java.util.concurrent.CompletionService


CompletionService相当于一个执行任务的服务,通过submit丢任务给这个服务,服务内部去执行任务,可以通过服务提供的一些方法获取服务中已经完成的任务。

接口内的几个方法:
Future<V> submit(Callable<V> task);

用于向服务中提交有返回结果的任务,并返回Future对象

Future<V> take() throws InterruptedException;

从服务中返回并移除一个已经完成的任务,如果获取不到,会一致阻塞到有返回值为止。此方法会响应线程中断。

从服务中返回并移除一个已经完成的任务,如果获取不到,会一致阻塞到有返回值为止。此方法会响应线程中断。
Future<V> poll();

通过submit向内部提交任意多个任务,通过take方法可以获取已经执行完成的任务,如果获取不到将等待。

ExecutorCompletionService 类

ExecutorCompletionService类是CompletionService接口的具体实现;

ExecutorCompletionService创建的时候会传入一个线程池,调用submit方法传入需要执行的任务,任务由内部的线程池来处理;ExecutorCompletionService内部有个阻塞队列,任意一个任务完成之后,会将任务的执行结果(Future类型)放入阻塞队列中,然后其他线程可以调用它take、poll方法从这个阻塞队列中获取一个已经完成的任务,获取任务返回结果的顺序和任务执行完成的先后顺序一致,所以最先完成的任务会先返回。

image.png
public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

构造方法需要传入一个Executor对象,这个对象表示任务执行器,所有传入的任务会被这个执行器执行。

completionQueue是用来存储任务结果的阻塞队列,默认用采用的是LinkedBlockingQueue,也支持开发自己设置。通过submit传入需要执行的任务,任务执行完成之后,会放入completionQueue中。

执行一批任务,然后消费执行结果
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    List<Callable<Integer>> list = new ArrayList<>();
    int taskCount = 5;
    for (int i = taskCount;i>0;i--){
        int j=i*2;
        list.add(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                TimeUnit.SECONDS.sleep(j);
                return j;
            }
        });
    }
    soive(executorService,list,new Consumer<Integer>(){

        @Override
        public void accept(Integer integer) {
            System.out.println(System.currentTimeMillis()+" = "+integer);
        }
    });
    executorService.shutdown();
}
private static <T> void soive(ExecutorService executorService, List<Callable<T>> solvers, Consumer<T> consumer) throws InterruptedException, ExecutionException {
    CompletionService<T> ecs = new ExecutorCompletionService<T>(executorService);
    for (Callable<T> s:solvers){
        ecs.submit(s);//提交任务
    }
    int n = solvers.size();
    for (int i = 0;i<n;i++){
        T r = ecs.take().get();
        if (r !=null){
            consumer.accept(r);//消费任务
        }
    }
}

相关文章

  • 10.3多线程详解

    Java高级-多线程 多线程创建 多线程通讯 线程池 1.多线程创建 thread/runnable图:继承Thr...

  • Java:线程池Executors.newFixedThread

    摘要:Java,多线程,线程池 多线程编程和线程池概述 (1)多线程程序: 计算机可以实现多任务 ( multit...

  • java多线程--Callable

    **移步[java多线程系列文章]Java多线程(二十二)---LockSupport工具Java 停止线程 一、...

  • Thread

    队列 线程锁 多线程,线程池 队列 多线程爬虫示例 多线程 自定义线程 线程池

  • java多线程,线程池,多线程工具

    线程的基本操作 用new关键字创建一个线程对象,然后调用它的start()启动线程即可。 线程有个run()方法,...

  • 带你搞懂Java多线程(五)

    带你搞懂Java多线程(一)带你搞懂Java多线程(二)带你搞懂Java多线程(三)带你搞懂Java多线程(四) ...

  • Java多线程目录

    Java多线程目录 Java多线程1 线程基础Java多线程2 多个线程之间共享数据Java多线程3 原子性操作类...

  • 带你搞懂Java多线程(六)

    带你搞懂Java多线程(一)带你搞懂Java多线程(二)带你搞懂Java多线程(三)带你搞懂Java多线程(四)带...

  • Java源码-线程池

    一、线程池实现原理 Java支持多线程,多线程可以提高任务的执行效率。但是Java里面的线程跟操作系统的线程是一一...

  • 线程的并发工具类

    Java 下多线程的开发我们可以自己启用多线程,线程池,除此之外,Java还为我们提供了Fork-Join、Cou...

网友评论

      本文标题:java多线程,线程池,多线程工具

      本文链接:https://www.haomeiwen.com/subject/imdlbktx.html