1.回顾
前面一节介绍了并发容器和队列的内容,基于上次介绍关于队列的内容,才能更好的了解线程池的原理
开始介绍线程池之前,先看一道华为面试题:
两个线程,第一个线程从1到26,第二个线程从A到Z,交替顺序输出
LockSupport实现
package com.learn.thread.seven;
import java.util.concurrent.locks.LockSupport;
/**
* @author zglx
*
* 面试题,交替输出
*/
public class TestQuestion {
static Thread t1 = null;
static Thread t2 = null;
public static void main(String[] args) {
char[] a1 = "12345678".toCharArray();
char[] a2 = "abcdefgh".toCharArray();
t1 = new Thread(() -> {
for (char item : a1) {
System.out.println(item);
// 叫醒t2
LockSupport.unpark(t2);
// t1阻塞
LockSupport.park();
}
},"t1");
t2 = new Thread(() -> {
for (char item : a2) {
// t2阻塞
LockSupport.park();
// 打印值
System.out.println(item);
// 叫醒t2
LockSupport.unpark(t1);
}
},"t2");
t1.start();
t2.start();
}
}
wait 和 nofity的实现
package com.learn.thread.seven;
import java.util.concurrent.locks.LockSupport;
public class TestQueue2 {
private static final Object object = new Object();
public static void main(String[] args) {
char[] a1 = "12345678".toCharArray();
char[] a2 = "abcdefgh".toCharArray();
new Thread(() -> {
synchronized (object) {
for (char item : a1) {
System.out.println(item);
try {
object.notify();
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 最终两个线程终归有一个是wait的,阻塞在这里不懂
object.notify();
}
},"t1").start();
new Thread(() -> {
synchronized (object) {
for (char item : a2) {
System.out.println(item);
try {
object.notify();
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 为什么这里要唤醒
object.notify();
}
},"t2").start();
}
}
第二个线程比第一个线程先执行
package com.learn.thread.seven;
/**
* @author zglx
* 这里用cas自旋的方式去实现第二个线程比第一个线程先执行
*/
public class TestQueue3 {
private static volatile boolean status = false;
private static final Object object = new Object();
public static void main(String[] args) {
char[] a1 = "12345678".toCharArray();
char[] a2 = "abcdefgh".toCharArray();
new Thread(() -> {
synchronized (object) {
// 第一个线程上来就等待
while (!status) {
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (char item : a1) {
System.out.println(item);
try {
object.notify();
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
object.notify();
}
}).start();
new Thread(() -> {
synchronized (object) {
for (char item : a2) {
System.out.println(item);
status = true;
try {
object.notify();
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
object.notify();
}
}).start();
}
}
Condition 实现Synchronized
package com.learn.thread.seven;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestQueue5 {
static char[] a1 = "12345678".toCharArray();
static char[] a2 = "abcdefgh".toCharArray();
private static ReentrantLock reentrantLock = new ReentrantLock();
private static Condition condition = reentrantLock.newCondition();
public static void main(String[] args) {
new Thread(() -> {
try {
reentrantLock.lock();
for (char item : a1) {
System.out.println(item);
// signal 相当于nofity
condition.signal();
// await 相当于 wait
condition.await();
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
reentrantLock.unlock();
}
}).start();
new Thread(() -> {
try {
reentrantLock.lock();
for (char item : a2) {
System.out.println(item);
// signal 相当于nofity
condition.signal();
// await 相当于 wait
condition.await();
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
reentrantLock.unlock();
}
condition.signal();
}).start();
}
}
但是一个Condition 就是一个等待队列,既然有两个线程,那么就完全可以用生产者消费者的模式去实现,那么就需要两个Condition
package com.learn.thread.seven;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestQueue6 {
static char[] a1 = "12345678".toCharArray();
static char[] a2 = "abcdefgh".toCharArray();
private static ReentrantLock reentrantLock = new ReentrantLock();
private static Condition condition1 = reentrantLock.newCondition();
private static Condition condition2 = reentrantLock.newCondition();
public static void main(String[] args) {
new Thread(() -> {
try {
reentrantLock.lock();
for (char item : a1) {
System.out.println(item);
// signal 相当于nofity
condition2.signal();
// await 相当于 wait
condition1.await();
}
condition2.signal();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
reentrantLock.unlock();
}
}).start();
new Thread(() -> {
try {
reentrantLock.lock();
for (char item : a2) {
System.out.println(item);
// signal 相当于nofity
condition1.signal();
// await 相当于 wait
condition2.await();
}
condition1.signal();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
reentrantLock.unlock();
}
}).start();
}
}
自旋锁实现
线程取两个值t1,t2,定义一个ReadyToRun的变量
刚开始是t1,相当于一个信号灯r,某一时刻只能取一个值,不能同时取到两个线程
程序上来就判断是不是t1 如果不是就占用cpu等待,如果一看是t1就打印值,然后把r的值改成t2,打印完之后r又变成了t1。就这么交替的玩法
注意信号灯r要为volatile,保证线程之间的可见性,我们把信号灯设置为枚举,防止它取别的值
package com.learn.thread.seven;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestQueue7 {
enum ReadyToRun{
T1,
T2
}
// 保证线程可见性,并且保证只能是两个值
static volatile ReadyToRun readyToRun = ReadyToRun.T1;
static char[] a1 = "12345678".toCharArray();
static char[] a2 = "abcdefgh".toCharArray();
public static void main(String[] args) {
new Thread(() -> {
try {
for (char item : a1) {
// cas 无锁化,如果不是T1,什么都不做
while (readyToRun != ReadyToRun.T1) {
}
System.out.println(item);
// 信号灯交替给t2
readyToRun = ReadyToRun.T2;
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
}
}).start();
new Thread(() -> {
try {
for (char item : a2) {
while (readyToRun != ReadyToRun.T2) {
}
System.out.println(item);
// 信号灯交换给T1
readyToRun = ReadyToRun.T1;
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
}
}).start();
}
}
BlockingQueue的玩法
前面说到过Queue 的玩法,可以支持多线程阻塞操作,有put和take操作,put满的时候会被阻塞住,take 的时候,如果没有就会阻塞
实例两个BlockingQueue,都是ArrayBlockingQueue数组实现的,并且长度都是1
相当于两个容器,这两个容器里头放两个值,这两个值比如说我第一个线程打印出了1我就往第一个容器里放一个ok,然后另外一个线程就盯着这个容器,有值了就立马打印A,并且同样往另外一个容器放一个ok,这样第一个线程监听的容器就有值,打印出2,如此循环下去。
package com.learn.thread.seven;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class TestQueue8 {
// 保证线程可见性,并且保证只能是两个值
static char[] a1 = "12345678".toCharArray();
static char[] a2 = "abcdefgh".toCharArray();
private static BlockingQueue blockingQueue1 = new ArrayBlockingQueue(1);
private static BlockingQueue blockingQueue2 = new ArrayBlockingQueue(1);
public static void main(String[] args) {
new Thread(() -> {
try {
for (char item : a1) {
System.out.println(item);
// 赋值给第二个容器
blockingQueue2.put("ok");
// 开始监听
blockingQueue1.take();
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
}
}).start();
new Thread(() -> {
try {
for (char item : a2) {
blockingQueue2.take();
System.out.println(item);
blockingQueue1.put("ok");
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
}
}).start();
}
}
TransferQueue 优化
之前说过TransferQueue 可以跟exchange一样做线程数据交换
第一个线程上来就take 等待第二个线程给值,拿到值打印,又经过transferQueue传1给第二个线程
第二个线程上来就经过transfer 把自己变量扔进去,第一个线程立马take到了,拿出来打印。然后等待第一个线程给值。如此循环
package com.learn.thread.seven;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
public class TestQueue9 {
// 保证线程可见性,并且保证只能是两个值
static char[] a1 = "12345678".toCharArray();
static char[] a2 = "abcdefgh".toCharArray();
private static TransferQueue<Character> transferQueue = new LinkedTransferQueue<>();
public static void main(String[] args) {
new Thread(() -> {
try {
for (char item : a1) {
// 取值
System.out.println(transferQueue.take());
// 传值
transferQueue.transfer(item);
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
}
}).start();
new Thread(() -> {
try {
for (char item : a2) {
transferQueue.transfer(item);
System.out.println(transferQueue.take());
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
}
}).start();
}
}
2.线程池
线程池有几个接口,第一个接口是Executor,第二个是ExecutorService继承自Executor,他们的最终子类才是ThreadPoolExecutor,如下图
线程池继承关系
Executor 执行者 有一个方法叫做执行,执行的东西就是Runnable 是一个接口,可以有很多实现。因此可以说这里只是定义了任务的执行是Runnable,而执行交给了实现Executor的类。这里不像我们以前定义一个Thread,new 一个Thread然后去重写它的Run方法,然后start分开执行。
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package java.util.concurrent;
public interface Executor {
void execute(Runnable var1);
}
ExecutorService 是Executor的继承,它除了父类execute方法外,还完善了整个任务执行器的一个生命周期。拿线程池举例,一个线程池里面一堆线程就是一堆的工人,执行完一个任务后这个线程该怎么结束,ExecutorService就定义了这么一些个方法
package com.learn.thread.five;
import java.util.List;
import java.util.Collection;
import java.util.concurrent.*;
public interface ExecutorService extends Executor {
// 结束
void shutdown();
// 马上结束
List<Runnable> shutdownNow();
// 是否结束了
boolean isShutdown();
// 是不是整体都执行完了
boolean isTerminated();
// 等着结束,等多长时间,时间到了还不结束的话他就返回false
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交任务,返回结果放在Future中
<T> Future<T> submit(Callable<T> task);
// 提交任务,返回结果放在Future中
<T> Future<T> submit(Runnable task, T result);
// 提交任务,返回结果放在Future中
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
主要看任务submit方法,这里涉及了入参Callable和出参Future
2.1.Callable
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
/**
* A task that returns a result and may throw an exception.
* Implementors define a single method with no arguments called
* {@code call}.
*
* <p>The {@code Callable} interface is similar to {@link
* java.lang.Runnable}, in that both are designed for classes whose
* instances are potentially executed by another thread. A
* {@code Runnable}, however, does not return a result and cannot
* throw a checked exception.
*
* <p>The {@link Executors} class contains utility methods to
* convert from other common forms to {@code Callable} classes.
*
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> the result type of method {@code call}
*/
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
里边有一个call方法,并且带有返回值,跟run方法一样,只不过run方法没有返回值。
2.2.Future
Future 代表的就是线程池执行完runnable 后返回的结果,Future表示的就是未来的执行结果,如果任务还没有被执行完就调用get方法,就会造成线程阻塞。
其实更灵活的用法就是FutureTask即是一个Future 同时又是一个Task,因为Callable只能是一个Task 执行一个任务,但是不能作为一个Future来用FutureTask 实现了RunnbaleFuture ,RunnbaleFuture实现了Runnable 有实现了Future,所以它是一个任务又是一个Future,后面WorkStealingPool,ForkJoinPool 基本都会用到FutureTask。
package com.learn.thread.seven;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
public class TestFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> future = new FutureTask<Integer>(() -> {
Thread.sleep(1000);
return 100;
});
new Thread(future).start();
// 会造成阻塞
System.out.println(future.get());
}
}
小结
Callable类似于Runnable 但是有返回值
了解了Future 用于存储执行的将来才会产生的结果
FutureTask他是Future 加上Runnable 既可以执行也可以存结果
CompletableFuture 管理多个Future 的结果
CompletableFuture
它用来组合各种不同的任务,等这个任务执行完产生一个结果后进行一个组合,基于java8之前将的价格查询器,用小程序再模拟一个。
定义三个future分别代表淘宝、京东、天猫,用了CompletableFuture的一个方法叫supplyAsync产生了一个异步任务
每一个任务去不同的商家拉取数据,什么时候拉取完了就放在一个Future中,但是要求三个商家都拉取数据完了才给最后的展示
你可以使用allOf方法相当于这里面的所有任务完成之后,最后join,你才能继续往下执行。
除了allOf 方法,还提供了一些lamdba表示的写法
package com.learn.thread.seven;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class TestCompletableFuture {
public static void main(String[] args) throws IOException {
long start = System.currentTimeMillis();
CompletableFuture<Double> futurem = CompletableFuture.supplyAsync(() -> {
return priceOfM();
});
CompletableFuture<Double> futuret = CompletableFuture.supplyAsync(() -> {
return priceOfT();
});
CompletableFuture<Double> futureb = CompletableFuture.supplyAsync(() -> {
return priceOfB();
});
// 用join
CompletableFuture.allOf(futureb, futurem, futuret).join();
// 用lamdba
CompletableFuture.supplyAsync(TestCompletableFuture::priceOfM)
.thenApply(String::valueOf)
.thenApply(str -> "price" + str)
.thenAccept(System.out::println);
long end = System.currentTimeMillis();
System.out.println(end - start);
System.in.read();
}
private static double priceOfM() {
delay();
System.out.println(1.00);
return 1.00;
}
private static double priceOfB() {
delay();
System.out.println(2.00);
return 2.00;
}
private static double priceOfT() {
delay();
System.out.println(3.00);
return 3.00;
}
private static void delay() {
int time = new Random().nextInt(500);
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3.ThreadPoolExector和ForkJoinPool
我们来了解线程池,线程池从目前的JDK来说有两种类型,第一种就是普通的线程池ThreadPoolExector,第二种是ForkJoinPool,这两种是不同的类型的线程池,能做的事情也不一样。
Fork成为分叉,合并称为join,这是ForkJoinPool的一个特性,我们先来讲讲ThreadPoolExector
ThreadPoolExector 的父类是AbstractExecutorService,而AbstractExecutorService的父类是ExecutorService,ExecutorService的父类是Executor,所以ThreadPoolExector相当于线程池的一个执行器,你可以往这个池子里丢任务。
阿里开发手册要求线程是自定义的,下面看看如何自定义一个线程池
线程池有很多构造方法,来看看主要的七个参数
java多线程开发时,常常用到线程池技术,这篇文章是对创建java线程池时的七个参数的详细解释。
从源码中可以看出,线程池的构造函数有7个参数,分别是corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。下面会对这7个参数一一解释。
一、corePoolSize 线程池核心线程大小
线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。
二、maximumPoolSize 线程池最大线程数量
一个任务被提交到线程池以后,首先会找有没有空闲存活线程,如果有则直接将任务交给这个空闲线程来执行,如果没有则会缓存到工作队列(后面会介绍)中,如果工作队列满了,才会创建一个新线程,然后从工作队列的头部取出一个任务交由新线程来处理,而将刚提交的任务放入工作队列尾部。线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize指定。
三、keepAliveTime 空闲线程存活时间
一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定
四、unit 空闲线程存活时间单位
keepAliveTime的计量单位
五、workQueue 工作队列
新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中提供了四种工作队列:
①ArrayBlockingQueue
基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。
②LinkedBlockingQuene
基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
③SynchronousQuene
一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
④PriorityBlockingQueue
具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
六、threadFactory 线程工厂
创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等
七、handler 拒绝策略
当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢。这里的拒绝策略,就是解决这个问题的,jdk中提供了4中拒绝策略:
①CallerRunsPolicy
该策略下,在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务。
②AbortPolicy
该策略下,直接丢弃任务,并抛出RejectedExecutionException异常。
③DiscardPolicy
该策略下,直接丢弃任务,什么都不做。
④DiscardOldestPolicy
该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列
到此,构造线程池时的七个参数,就全部介绍完毕了
一般来说都是需要保存来的CallerRuns
package com.learn.thread.seven;
import com.learn.thread.one.T;
import java.io.IOException;
import java.util.concurrent.*;
public class TestTask {
static class Task implements Runnable{
private int i;
public Task(int i) {
this.i = i;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "task" + i);
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,4,60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 8; i++) {
poolExecutor.execute(new Task(i));
}
System.out.println(poolExecutor.getQueue());
poolExecutor.execute(new Task(100));
System.out.println(poolExecutor.getQueue());
poolExecutor.shutdown();
}
}
网友评论