参考如下文章进行整理:
- JAVA几种线程使用
1.1 Runnable同时执行任务模式
1.2 Join等待某个线程执行完开始执行模式
1.3 Object lock+synchronized资源竞争与释放模式
1.4 CountDownLatch等待几个线程执行完再执行某一个线程模式
1.5 CyclicBarrier多个线程执行到某个点后同时执行模式
1.6 FutureTask获取线程执行结果模式
1.7 Semaphore信号量
- 线程池
2.1 线程池介绍
2.1.1 总体设计
2.1.2 ThreadPoolExecutor生命周期及运行状态
2.1.3 任务调度
2.2 线程池四种模式
2.2.1 newSingleThreadExecutor单线程线程池
2.2.2 newFixedThreadPool定长线程池
2.2.3 newScheduledThreadPool定时线程池
2.2.4 newCachedThreadPool 缓存线程池
1. JAVA几种线程使用
1.1 Runnable同时执行任务模式
package sync;
// 线程同时运行
public class Sync1Runable {
public static void main(String args[]){
demo1();
}
private static void demo1(){
Thread A = new Thread(new Runnable() {
@Override
public void run() {
printNumber("A");
}
});
Thread B = new Thread(new Runnable() {
@Override
public void run() {
printNumber("B");
}
});
A.start();
B.start();
}
private static void printNumber(String threadName) {
int i=0;
System.out.println(threadName + " print: " + i);
while (i++ < 3) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadName + " print: " + i);
}
}
}
1.2 Join等待某个线程执行完开始执行模式
package sync;
public class Sync2Join {
public static void main(String args[]){
demo1();
}
// 通过join函数,等待某个线程结束,再执行,等A执行完后再执行B
private static void demo1(){
Thread A = new Thread(new Runnable() {
@Override
public void run() {
printNumber("A");
}
});
Thread B = new Thread(new Runnable() {
@Override
public void run() {
try{
A.join();
}catch (InterruptedException e){
e.printStackTrace();
}
printNumber("B");
}
});
A.start();
B.start();
}
private static void printNumber(String threadName) {
int i=0;
while (i++ < 3) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadName + " print: " + i);
}
}
}
1.3 Object lock+synchronized资源竞争与释放模式
这里要注意的是,这个demo仅是作为大家作为该功能的展示,如果你把A.start()放在B.start()后面会发生死锁,这里大家可以思考下具体原因。
package sync;
import org.omg.CORBA.OBJ_ADAPTER;
/**
* @ClassName sync.Sync3ObjectWaitNotify
* @Description wait() notify()实现线程交叉执行
* @Author Lczy-Huang
* @Date 2018/10/9 11:19
* @Version 1.0
**/
public class Sync3ObjectWaitNotify {
public static void main(String arg[]){
demo3();
}
/*
* @description 通过对一个对象的锁,wait休眠,notify重新唤醒达到线程交叉执行
* @params []
* @return void
* @author Huang Bing
* @time 2018/10/9 11:26
*/
private static void demo3(){
Object lock = new Object();
Thread A = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock){
System.out.println("A1");
try{
lock.wait();
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("A2");
System.out.println("A3");
}
}
});
Thread B = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock){
System.out.println("B1");
System.out.println("B2");
System.out.println("B3");
lock.notify();
}
}
});
A.start();
B.start();
}
}
1.4 CountDownLatch等待几个线程执行完再执行某一个线程模式
package sync;
import java.util.concurrent.CountDownLatch;
/**
* @ClassName sync.Sync4CountDownLatch
* @Description 等待3个线程全部执行完后,执行第4个线程
* @Author Lczy-Huang
* @Date 2018/10/9 11:37
* @Version 1.0
**/
public class Sync4CountDownLatch {
public static void main(String arg[]){
runDAfterABC();
}
private static void runDAfterABC(){
int worker =3;
CountDownLatch countDownLatch = new CountDownLatch(worker);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("D is waiting for other three threads");
try{
countDownLatch.await();
}catch (InterruptedException e){
e.printStackTrace();
}
System.out.println("D is working");
}
}).start();
for (char threadName='A'; threadName <= 'C'; threadName++) {
final String tN = String.valueOf(threadName);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(tN + " is working");
try {
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(tN + " finished");
countDownLatch.countDown();
}
}).start();
}
}
}
1.5 CyclicBarrier多个线程执行到某个点后同时执行模式
package sync;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* @ClassName Sync5CyclicBarrierDemo
* @Description 所有线程到某个点一起等待进行
* @Author Lczy-Huang
* @Date 2018/10/9 11:52
* @Version 1.0
**/
public class Sync5CyclicBarrierDemo {
public static void main(String arg[]){
runABCWhenAllReady();
}
private static void runABCWhenAllReady(){
int runner = 3;
CyclicBarrier cyclicBarrier = new CyclicBarrier(runner);
final Random random = new Random();
for(char runnerName='A';runnerName<='C';runnerName++){
final String rN = String.valueOf(runnerName);
new Thread(new Runnable() {
@Override
public void run() {
long prepareTime = random.nextInt(10000)+100;
System.out.println(rN+"is preparing for time:"+prepareTime);
try{
Thread.sleep(prepareTime);
}catch (Exception ex){
ex.printStackTrace();
}
try{
System.out.println(rN + " is prepared, waiting for others");
cyclicBarrier.await(); // 当前运动员准备完毕,等待别人准备好
}catch (InterruptedException e){
e.printStackTrace();
}catch (BrokenBarrierException e){
e.printStackTrace();
}
System.out.println(rN+"start running");// 所有运动员都准备好了,一起开始跑
}
}).start();
}
}
}
1.6 FutureTask获取线程执行结果模式
该模式看似很鸡肋,但是需要配合线程池去使用,获得一群线程的返回结果存放在list中。
package sync;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* @ClassName Sync6FutureTask
* @Description 获取线程执行返回值
* @Author Lczy-Huang
* @Date 2018/10/9 12:26
* @Version 1.0
**/
public class Sync6FutureTask {
public static void main(String arg[]){
doTaskWithResultInWorker();
}
private static void doTaskWithResultInWorker() {
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Task starts");
Thread.sleep(1000);
int result = 0;
for (int i=0; i<=100; i++) {
result += i;
}
System.out.println("Task finished and return result");
return result;
}
};
FutureTask<Integer> futureTask = new FutureTask<>(callable);
new Thread(futureTask).start();
new Thread(futureTask).start();
try {
System.out.println("Before futureTask.get()");
System.out.println("Result: " + futureTask.get());
System.out.println("After futureTask.get()");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
配合线程池使用:
package sync;
import java.util.ArrayList;
import java.util.concurrent.*;
/**
* @author Lczy-Huang
* @version 1.0
* @classname Sync7ParamFutureTask
* @description TODO
* @date 2019/1/17 10:43
**/
public class Sync7ParamFutureTask implements Callable<String> {
private int id;
public Sync7ParamFutureTask(int id){
this.id=id;
}
public String call() throws Exception{
return "result:"+id;
}
public static class CallableDemo{
public static void main(String[] args){
ExecutorService executorService = Executors.newCachedThreadPool();
ArrayList<Future<String>> futures = new ArrayList<Future<String>>();
for(int i=0;i<10;i++){
futures.add(executorService.submit(new Sync7ParamFutureTask(i)));
}
for(Future<String> fs:futures){
try{
System.out.println(fs.get());
}catch (InterruptedException e){
System.out.println(e);
e.printStackTrace();
}catch (ExecutionException ex){
System.err.println(ex);
ex.printStackTrace();
}finally {
executorService.shutdown();
}
}
}
}
}
1.7 Semaphore信号量
如果你认真的看完以上几个例子,看不懂,没关系,你完全可以用Semaphore代替它们。有两个重要的方法:
方法名 | 作用 |
---|---|
acquire() | 消耗一个semaphore的值,如果值为0则堵塞等待有值 |
release() | 释放一个semaphore的值,即semaphore++; |
交替执行例子:
class FooBar {
private int n;
public FooBar(int n) {
this.n = n;
}
Semaphore foo = new Semaphore(1);
Semaphore bar = new Semaphore(0);
public void foo(Runnable printFoo) throws InterruptedException {
for (int i = 0; i < n; i++) {
foo.acquire();
// printFoo.run() outputs "foo". Do not change or remove this line.
printFoo.run();
bar.release();
}
}
public void bar(Runnable printBar) throws InterruptedException {
for (int i = 0; i < n; i++) {
bar.acquire();
// printBar.run() outputs "bar". Do not change or remove this line.
printBar.run();
foo.release();
}
}
}
通过这里例子你可以发现,只要有足够多的信号量存在配合runable接口,可以实现大部分的线程模式。
2. 线程池
2.1 线程池设计说明
2.1.1 总体设计
Java中的线程池核心实现类是ThreadPoolExecutor,本章基于JDK 1.8的源码来分析Java线程池的核心设计与实现。我们首先来看一下ThreadPoolExecutor的UML类图,了解下ThreadPoolExecutor的继承关系。
图ThreadPoolExecutor UML类图
ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。ExecutorService接口增加了一些能力:(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;(2)提供了管控线程池的方法,比如停止线程池的运行。
AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。
图ThreadPoolExecutor运行流程
线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。
2.1.2 ThreadPoolExecutor生命周期及运行状态
ThreadPoolExecutor的运行状态有5种,分别为:
运行状态 | 状态描述 |
---|---|
RUNNING | 能接受新提交的任务,并且也能处理阻塞队列中任务 |
SHUTDOWN | 关闭状态,不能接受新提交任务,但可以继续处理阻塞队列中的任务 |
STOP | 不能接受新任务,也不处理队列中的任务,会中断正在指向的任务线程 |
TIDYING | 所有任务都已经终止,workerCount(有效) |
TERMINATED | 在terminated()方法执行后进入该状态 |
2.1.3 任务调度
首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:
首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
-
如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
其执行流程如下图所示:
任务调度流程
2.2 线程池四种模式
2.2.1 newSingleThreadExecutor单线程线程池
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
/**
*创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
*/
public static void singleTheadPoolTest() {
ExecutorService pool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int ii = i;
pool.execute(() -> out.println(Thread.currentThread().getName() + "=>" + ii));
}
}
-----output-------
线程名称:pool-1-thread-1,执行0
线程名称:pool-1-thread-1,执行1
线程名称:pool-1-thread-1,执行2
线程名称:pool-1-thread-1,执行3
线程名称:pool-1-thread-1,执行4
线程名称:pool-1-thread-1,执行5
线程名称:pool-1-thread-1,执行6
线程名称:pool-1-thread-1,执行7
线程名称:pool-1-thread-1,执行8
线程名称:pool-1-thread-1,执行9
2.2.2 newFixedThreadPool定长线程池
- 底层:返回ThreadPoolExecutor实例,接收参数为所设定线程数量nThread,corePoolSize为nThread,maximumPoolSize为nThread;keepAliveTime为0L(不限时);unit为:TimeUnit.MILLISECONDS;WorkQueue为:new LinkedBlockingQueue<Runnable>() 无界阻塞队列
- 通俗:创建可容纳固定数量线程的池子,每隔线程的存活时间是无限的,当池子满了就不在添加线程了;如果池中的所有线程均在繁忙状态,对于新任务会进入阻塞队列中(无界的阻塞队列)
- 适用:执行长期的任务,性能好很多
/**
* 1.创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小<br>
* 2.线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程<br>
* 3.因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字,和线程名称<br>
*/
public static void fixTheadPoolTest() {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int ii = i;
fixedThreadPool.execute(() -> {
out.println("线程名称:" + Thread.currentThread().getName() + ",执行" + ii);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
------output-------
线程名称:pool-1-thread-3,执行2
线程名称:pool-1-thread-1,执行0
线程名称:pool-1-thread-2,执行3
线程名称:pool-1-thread-3,执行4
线程名称:pool-1-thread-1,执行5
线程名称:pool-1-thread-2,执行6
线程名称:pool-1-thread-3,执行7
线程名称:pool-1-thread-1,执行8
线程名称:pool-1-thread-3,执行9
2.2.3 newScheduledThreadPool定时线程池
- 底层:创建ScheduledThreadPoolExecutor实例,corePoolSize为传递来的参数,maximumPoolSize为Integer.MAX_VALUE;keepAliveTime为0;unit为:TimeUnit.NANOSECONDS;workQueue为:new DelayedWorkQueue() 一个按超时时间升序排序的队列
- 通俗:创建一个固定大小的线程池,线程池内线程存活时间无限制,线程池可以支持定时及周期性任务执行,如果所有线程均处于繁忙状态,对于新任务会进入DelayedWorkQueue队列中,这是一种按照超时时间排序的队列结构
- 适用:周期性执行任务的场景
/**
* 创建一个定长线程池,支持定时及周期性任务执行。延迟执行
*/
public static void sceduleThreadPool() {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
Runnable r1 = () -> out.println("线程名称:" + Thread.currentThread().getName() + ",执行:3秒后执行");
scheduledThreadPool.schedule(r1, 3, TimeUnit.SECONDS);
Runnable r2 = () -> out.println("线程名称:" + Thread.currentThread().getName() + ",执行:延迟2秒后每3秒执行一次");
scheduledThreadPool.scheduleAtFixedRate(r2, 2, 3, TimeUnit.SECONDS);
Runnable r3 = () -> out.println("线程名称:" + Thread.currentThread().getName() + ",执行:普通任务");
for (int i = 0; i < 5; i++) {
scheduledThreadPool.execute(r3);
}
}
----output------
线程名称:pool-1-thread-1,执行:普通任务
线程名称:pool-1-thread-5,执行:普通任务
线程名称:pool-1-thread-4,执行:普通任务
线程名称:pool-1-thread-3,执行:普通任务
线程名称:pool-1-thread-2,执行:普通任务
线程名称:pool-1-thread-1,执行:延迟2秒后每3秒执行一次
线程名称:pool-1-thread-5,执行:3秒后执行
线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次
线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次
线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次
线程名称:pool-1-thread-4,执行:延迟2秒后每3秒执行一次
2.2.4 newCachedThreadPool 缓存线程池
- 底层:返回ThreadPoolExecutor实例,corePoolSize为0;maximumPoolSize为Integer.MAX_VALUE;keepAliveTime为60L;unit为TimeUnit.SECONDS;workQueue为SynchronousQueue(同步队列)
- 通俗:当有新任务到来,则插入到SynchronousQueue中,由于SynchronousQueue是同步队列,因此会在池中寻找可用线程来执行,若有可以线程则执行,若没有可用线程则创建一个线程来执行该任务;若池中线程空闲时间超过指定大小,则该线程会被销毁。
- 适用:执行很多短期异步的小程序或者负载较轻的服务器
/**
* 1.创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程<br>
* 2.当任务数增加时,此线程池又可以智能的添加新线程来处理任务<br>
* 3.此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小<br>
*
*/
public static void cacheThreadPool() {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 1; i <= 10; i++) {
final int ii = i;
try {
Thread.sleep(ii * 1);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(()->out.println("线程名称:" + Thread.currentThread().getName() + ",执行" + ii));
}
}
-----output------
线程名称:pool-1-thread-1,执行1
线程名称:pool-1-thread-1,执行2
线程名称:pool-1-thread-1,执行3
线程名称:pool-1-thread-1,执行4
线程名称:pool-1-thread-1,执行5
线程名称:pool-1-thread-1,执行6
线程名称:pool-1-thread-1,执行7
线程名称:pool-1-thread-1,执行8
线程名称:pool-1-thread-1,执行9
线程名称:pool-1-thread-1,执行10
网友评论