阻塞队列
非阻塞队列是一个先进先出的单向队列(Queue),而BlockingQueue阻塞队列实际是非阻塞队列的一个加强,之所以是阻塞队列最主要的是实现take和put,当阻塞队列满时,put会一直阻塞直到拿到数据,或者响应中断退出;当队列空时,take元素,队列也会阻塞直到队列可用。
阻塞的意思就是此条线程会处于等待卡死状态,解锁的条件是队列中有另一条线程存入或取出数据了,就会解锁,就相当于队列是仓库,仓库没有货了就生产,有货就能消费,锁条件是notFull和notEmpty。
Throws exception | Special value | Blocks | Times out | |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除方法 | remove() | poll() | take() | poll(time, unit) |
检查方法 | element() | peek() |
队列(Queue和Deque)
队列是一种特殊的线性表,链表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。
进行插入操作的端称为队尾,进行删除操作的端称为队头。
单向队列(Queue):先进先出(FIFO),只能从队列尾插入数据,只能从队列头删除数据.
双向队列(Deque):可以从队列尾/头插入数据,只能从队列头/尾删除数据.操作头和尾.
单向队列Queue结构
private Node first;
private Node last;
int size = 0;
class Node{
private Node next;
private Object element;
public Node(Object element) {
this.element = element;
}
public Node(){}
}
单向队列操作
public void push(Object element){
//单向队列(Queue):先进先出(FIFO),只能从队列尾插入数据
size++;
Node node = new Node(element);
if (size>1){
this.last.next = node;
this.last = node;
}else if (size == 1){
this.first = node;
this.last = node;
}
}
public void pull(){
//单向队列(Queue):先进先出(FIFO),只能从队列头删除数据.
size--;
this.first = this.first.next;
if (size == 1){
this.last = this.first;
}
}
常见的阻塞队列
阻塞队列:BlockingQueue,多用于创建线程池
imageArrayBlockingQueue :一个由数组结构组成的有界阻塞队列,只维护一个lock。
线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列,维护两个lock。
如果不设置队列容量,其初始化的默认容量大到可以认为是无界队列了,在这种情况下maximumPoolSize参数是无效的,队列中的任务太多,以至于由于无法及时处理导致一直增长,直到最后资源耗尽的问题。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayedWorkQueue:ScheduledThreadPoolExecutor静态内部类的一个无界延迟阻塞队列,添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被执行
SynchronousQueue:无存储空间的阻塞队列。
1.SynchronousQueue是一个双栈双队列算法,无空间的队列,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。
2.提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据设置的handler执行拒绝策略。因此这种方式提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,需要对程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略;
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
非阻塞队列的API
除了阻塞队列就是非阻塞队列,非阻塞队列,就是单纯意义上的"非"阻塞队列,其没有put和take方法,可以无限制存,且是线程安全的,N个用户同时存也能保证每次存放在队尾而不乱掉,但是其的size()方法会不定时遍历,所以很耗时
以ConcurrentLinkedQueue并发队列为例,多用于消息队列,并发异步处理,如日志等;add和poll是线程安全的,但是其他方法并没有保证线程安全,如判断isEmpty(),所以在多线程并发时还得自己加锁
ConcurrentLinkedQueue内部就是一个简单的单链表结构,每入队一个元素就是插入一个Node类型的结点。head指向队列头,tail指向队列尾,通过Unsafe来CAS操作字段值以及Node对象的字段值
imageoffer/add添加队列元素:两个方法一样,都是将指定元素插入此队列的尾部,添加成功返回true;区别在于队列满时,add会抛异常,而offer会返回false
poll获取并移除此队列的头,如果此队列为空,则返回 null
public static void main(String[] args) {
ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
queue.add("1000");
queue.offer("2000");
//先进先出
System.out.println(queue.poll());//1000
System.out.println(queue.poll());//2000
System.out.println(queue.poll());//null
}
peek获取但不移出队列的头,如果此队列为空,则返回 null
public static void main(String[] args) {
ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
queue.add("1000");
queue.offer("2000");
//peek获取但不移除队列的头,所以每次peek()都是1000
System.out.println(queue.peek());//1000
System.out.println(queue.peek());//1000
System.out.println(queue.poll());//1000
}
remove从队列中移除指定元素,已存在元素,会返回true,remove不存在元素,返回false
contains判断当前队列是否包含指定元素,如果存在返回true,不存在返回false
public static void main(String[] args) {
ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
queue.add("1000");
queue.offer("2000");
System.out.println(queue.remove("1000"));//true
//1000已经被移除了
System.out.println(queue.remove("1000"));//false
System.out.println(queue.contains("1000"));//false
System.out.println(queue.contains("2000"));//true
}
size队列的数量,如果此队列包含的元素数大于 Integer.MAX_VALUE,则只会返回 Integer.MAX_VALUE。由于这些队列的异步特性,确定当前的元素数需要进行一次花费 O(n) 时间的遍历。所以在需要判断队列是否为空时,使用peek()!=null,不要用 queue.size()>0
isEmpty判断当前队列是否为null
toArray(T[] a)转为指定数组,按照头->尾的顺序返回
ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
queue.add("1000");
queue.offer("2000");
queue.offer("3000");
Object[] array = queue.toArray();
for (Object o : array){
System.out.println(o);1000->2000->3000
}
iterator获取按照头到尾的顺序遍历的迭代器
ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
queue.add("1000");
queue.offer("2000");
Iterator<Object> iterator = queue.iterator();
while (iterator.hasNext()){
System.out.println(iterator.next());
}
队列应用的示例:
//创建并发队列
ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
//提供100000份需要消耗的食物,并放入队列
int productionNum = 10000;
for(int i = 1;i<10001;i++){
queue.offer("食物编号:"+i);
}
ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(100);
for (int i =0;i<10;i++){
//submit()有返回值,而execute()没有
Future<?> future = executorService.submit(new Runnable() {
@Override
public void run() {
while (!queue.isEmpty()) {
System.out.println("当前线程为:" + Thread.currentThread().getName() + ":" + queue.poll());
}
}
});
}
ArrayBlockQueue/LinkedBlockQueue
ArrayBlockingQueue基于数组不会产生或销毁任何额外的对象实例,LinkedBlockingQueue基于链表会生成额外的Node对象会有会内存溢出的风险。但是常用的其实还是LinkedBlockingQueue,使用两套锁,实现生产和消费操作的并行,单个锁只能保证生产者和消费者只能每次操作一次生产或者消费,而双锁可以使得生产者在队列满时不断的向队列尾部添加node,消费者不断从head获取Node,从而吞吐效率更高
//Array可以选择公平锁和非公平锁,而Linked两把锁都是非公平锁
BlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(100,true);
BlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>(100);
//阻塞的方法只有take和put
queue.take();
queue.put("");
ArrayBlockingQueue源码:
//数组
final Object[] items;
//获取数据的索引,主要用于take,poll,peek,remove方法
int takeIndex;
//添加数据的索引,主要用于 put, offer, or add 方法
int putIndex;
//队列元素的个数
int count;
final ReentrantLock lock;
//notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操作
private final Condition notEmpty;
//notFull条件对象,用于通知put方法队列未满,可执行添加操作
private final Condition notFull;
put方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//可中断加锁
lock.lockInterruptibly();
try {
//数组满了,生产条件释放锁并阻塞线程
while (count == items.length)
notFull.await();
//入队操作
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
//设置当前索引值为put添加的值,初始为0
items[putIndex] = x;
//length是固定的,简单理解就是当条件这个元素之后数组就满了
//那么当消费者从0开始往后消费,生产者被唤醒从而继续从0开始继续添加
//需要先了解消费的方式:会从头一直消费到最后一个元素之后又从0开始继续消费
if (++putIndex == items.length)
//消费者从0往后依次消费,生产者在从0开始继续添加
putIndex = 0;
count++;
//唤醒消费者
notEmpty.signal();
}
take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//数组没有资源,消费者进入休眠
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
//takeIndex初始也为0,从索引0开始消费
E x = (E) items[takeIndex];
items[takeIndex] = null;
//take索引自增,当消费完最大的索引值,又从0开始消费
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//消费之后数组未满,所以唤醒生产者继续生产
notFull.signal();
return x;
}
LinkedBlockingQueue源码:
//由Node单向链表组成的队列,初始化时创建一个空Node并且设置为head和last
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
//队列容量
private final int capacity;
//队列长度,因为多个线程可以同时生产或消费,需要保证改变值的可见性和原子性
private final AtomicInteger count = new AtomicInteger();
//队列的头和尾
transient Node<E> head;
private transient Node<E> last;
// 生产者和消费者维护了两套锁
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
put方法,可以实现入列和消费同时进行,但是生产或者消费时只能同时运行一个线程
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
//生产锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//如果满了,当前生产线待机
while (count.get() == capacity) {
notFull.await();
}
//入列
enqueue(node);
//cas设置count+1,返回值仍是原值
c = count.getAndIncrement();
//被消费之后队列未满则唤醒队列
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
//当队列为不为空时,消费锁释放
signalNotEmpty();
}
private void enqueue(Node<E> node) {
//当前node设置为last,并设置为上一个节点的next
last = last.next = node;
}
take方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//队列为空消费者等待被唤醒
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
//count值减1,返回的还是原值
c = count.getAndDecrement();
//当消费队列不为空消费者被唤醒
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//队列满了,生产者进行signal
if (c == capacity)
signalNotFull();
return x;
}
线程池
线程池,简单来说就是一个管理工作线程的管理器,可以复用线程从而减少频繁创建和销毁线程,控制并发数;可以创建多个线程池用于处理不同的业务
AQS队列是运行中的线程由于获取资源形成队列,更改线程的wait状态原地阻塞,直到被唤醒继续执行线程;而线程池中的任务队列虽然也是将Runnable对象加入队列,但并不存在阻塞,因为这是仅仅只是内存对象,通过工作线程去执行任务队列里Runnable对象的run方法,所有其实不一定要是Runnable对象,只需要统一将要执行的任务通过同一个方法去实现,在工作线程执行时再去统一调那个方法即可
提交任务执行,就是工作线程去直接调任务的run()或者call()方法
class SimperExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
简易线程池
static List<Runnable> runnables = Lists.newArrayList();
static Thread threadPool1 = new Thread(() -> {
System.out.println("线程对象启动:" + Thread.currentThread().getName());
while (true) {
if (!runnables.isEmpty()){
Thread runnable = (Thread) runnables.get(0);
System.out.println("被执行任务名称:"+runnable.getName());
runnable.run();
runnables.remove(0);
}
}
}, "threadPool1");
private static void execute(Runnable runnable){
runnables.add(runnable);
}
public static void main(String[] args) throws InterruptedException {
threadPool1.start();
Thread t1 = new Thread(() -> {
System.out.println("任务对象1:"+Thread.currentThread().getName());
}, "t1");
Thread t2 = new Thread(() -> {
System.out.println("任务对象2:"+Thread.currentThread().getName());
}, "t2");
Thread t3 = new Thread(() -> {
System.out.println("任务对象3:"+Thread.currentThread().getName());
}, "t3");
execute(t1);
execute(t2);
execute(t3);
}
构建ThreadPoolExecutor
ThreadPoolExecutor的继承和实现结构
imageExecutor只提供了executor方法不需要获取结果,ExecutorService提供的submit需要获取结果(FutureTask),可以进行Exception处理
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue){
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
-
corePoolSize 核心线程数,初始化ThreadPoolExecutor的时候,线程默认不会立即创建核心线程创建,等到有任务提交时才会创建
//初始化线程池对象之后并没有立即创建核心线程 ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(3); System.out.println(executorService.getActiveCount());//0 //当提交任务之后才会先去创建核心线程 executorService.submit(new Thread(()->{ try { TimeUnit.SECONDS.sleep(3); System.out.println(executorService.getActiveCount());//1 } catch (InterruptedException e) { e.printStackTrace(); } }));
-
maximumPoolSize 最大线程数,线程池允许创建的最大线程数,如果队列中任务已满,也就是核心线程无法及时处理这些任务,那么会创建新的线程来执行任务。
-
workQueue 任务队列,BlockingQueue
-
keepAliveTime 除了核心线程,空闲线程存活时间; 设置allowCoreThreadTimeOut(true)可以回收核心线程
-
threadFactory 用于创建线程池的工作线程,可自定义线程创建,线程池中线程就是通过ThreadPoolExecutor中的ThreadFactory线程工厂创建的。那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名、优先级等
//默认的DefaultThreadFactory的简单实现 ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread(Runnable r) { //线程命名 Thread th = new Thread(r,"pool-1"+"-thread"); return th; } };
-
rejectedHandler 拒绝策略,用来处理线程池"超载"的情况,当线程池已满,但是又有新的任务添加到队列时需要采取的策略,比如什么都不处理,抛异常,返回false等,也可以自定义实现,默认提供了4种拒绝策略:AbortPolicy,直接抛出异常,阻止系统正常工作; CallerRunsPolicy,如果线程池的线程数量达到上限,会把任务队列中的任务放在调用者线程当中运行; DiscardOledestPolicy,会丢弃任务队列中最早的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交;DiscardPolicy,没有任何处理,也就是丢弃任务,使用此策略,业务场景中需允许任务的丢失;
实现RejectedExecutionHandler及rejectedExecution方法
---- AbortPolicy 抛异常 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } ---- CallerRunsPolicy 溢出的任务直接执行 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } ---- DiscardOldestPolicy 取出workQueue中的firstQueue,再重新提交取出的任务 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } ---- DiscardPolicy 什么都不做,等于直接抛弃了这个任务 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
线程池的状态
线程状态和工作线程数
//ctl初始为 RUNNING
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//高3位表示线程状态,低29位表示线程数
private static final int COUNT_BITS = 32 - 3;
//000 111....
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState 线程状态
private static final int RUNNING = -1 << COUNT_BITS;//101
private static final int SHUTDOWN = 0 << COUNT_BITS;//000
private static final int STOP = 1 << COUNT_BITS;//001
private static final int TIDYING = 2 << COUNT_BITS;//010
private static final int TERMINATED = 3 << COUNT_BITS;//011
// ~1=0,0&1=1 低29位全部为0,用于获取高3位的线程状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//低29位全部为1,用于获取低29位的线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
//new AtomicInteger(ctlOf(RUNNING, 0))
private static int ctlOf(int rs, int wc) { return rs | wc; }
RUNNING 定义为 -1,SHUTDOWN 定义为 0,其他的都比 0 大,所以等于 0 的时候不能提交任务,大于 0 的话,连正在执行的任务也需要中断。
- RUNNING:最正常的状态:接受新的任务,处理等待队列中的任务
- SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务
- STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
- TIDYING:所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
- TERMINATED:terminated() 方法结束后,线程池的状态就会变成这个
状态变化的转换过程:
- RUNNING -> SHUTDOWN:调用 shutdown()
- (RUNNING or SHUTDOWN) -> STOP:调用 shutdownNow()
- SHUTDOWN -> TIDYING:当任务队列和线程池都清空后,会由 SHUTDOWN 转换为 TIDYING
- STOP -> TIDYING:当任务队列清空后,发生这个转换
- TIDYING -> TERMINATED:当 terminated() 方法结束后
submit提交任务
executor只能提交没有返回值得Runnable任务,submit可以提交Runnable和Callable任务,将提交的任务封装成RunnableFuture
Future接口
对线程池提交一个Callable任务,可以获得一个Future对象
ExecutorService service = new ThreadPoolExecutor(2,2,100,TimeUnit.SECONDS,new LinkedBlockingQueue<>());
List<Future> list = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
Callable<String> c1 = new Callable<String>() {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(10);
return UUID.randomUUID().toString().replace("-", "");
}
};
//提交callable任务
Future<String> future = service.submit(c1);
list.add(future);
}
//阻塞,直到获取所有future值
list.forEach(future -> {
try {
System.out.println(future.get());
} catch (Exception e) {
e.printStackTrace();
}
});
TimeUnit.SECONDS.sleep(3);
实现有返回值的callable接口
//方式一:提交一个Task任务,submit(Callable<T> task)获取future返回值
ExecutorService service = Executors.newFixedThreadPool(1);
Callable<String> c1 = new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println(111);
TimeUnit.SECONDS.sleep(10);
return "a1111";
}
};
Future<String> future = service.submit(c1);
System.out.println(futureTask.get());
//方式二:封装为一个FutureTask并启动线程执行,通过FutureTask获取返回值
Callable<String> c1 = new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println(111);
TimeUnit.SECONDS.sleep(10);
return "a1111";
}
};
FutureTask<String> futureTask = new FutureTask<>(c1);
Thread thread = new Thread(futureTask);
thread.start();
System.out.println(futureTask.get());
FutureTask
FutureTask 间接实现了 Runnable 和Future接口,所以FutureTask即可以作为一个Runnable任务,也可以作为一个Callable实例实现获取返回值
imgRunnable 的 run() 方法没有返回值,第二个参数将会放到 Future 中,作为返回值,通过这两个参数,将其包装成 Callable
---- ExecutorService
//提交一个Runnable任务肯定不会有返回值
Future<?> submit(Runnable task);
//两个参数封装为Callable
<T> Future<T> submit(Runnable task, T result);
// 提交一个 Callable 任务
<T> Future<T> submit(Callable<T> task);
---- 演示
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
return "111";
}
};
Runnable runnable = new Runnable() {
@Override
public void run() {
}
};
ExecutorService service = Executors.newFixedThreadPool(1);
Future<String> future = service.submit(callable);
Future<?> future1 = service.submit(runnable);
Future<String> run = service.submit(runnable, "run");
System.out.println("future:"+future.get());//111
System.out.println("future1:"+future1.get());//null
System.out.println("run:"+run.get());//run
newTaskFor
将Callable封装成 FutureTask 提交到线程池中执行,Runnable通过RunnableAdapter适配器封装成一个Callable
---- 提交任务
public <T> Future<T> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, null);
//交给执行器执行,execute 方法由具体的子类来实现
execute(ftask);
return ftask;
}
//newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
public static <T> Callable<T> callable(Runnable task, T result) {
return new RunnableAdapter<T>(task, result);
}
//Executors,最终发现是通过RunnableAdapter适配器将Runnable封装为一个Callable
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
//调run方法,返回result
task.run();
return result;
}
}
run/get
考虑回调callable时,FutureTask 封装的任务对象最终是通过工作线程执行run()方法处理任务,而FutureTask 的run方法则调用了callable的call方法,并将result设值到outcome
//封装的callable
private Callable<V> callable;
//callable的回调值,通过get方法获取
private Object outcome;
//CAS防止callable被执行多次
private volatile Thread runner;
//维护了一个单向链表 waiters , 在执行 get 的时候会向其中添加节点
private volatile WaitNode waiters;
//run方法,线程池工作线程最终通过runWork()调run方法
public void run() {
// 1. 状态如果不是NEW,说明任务或者已经执行过,或者已经被取消,直接返回
// 2. 状态如果是,则把当前执行线程保存在runner字段中
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行call()方法
result = c.call();
ran = true;
}
if (ran)
//执行成功,outcome = result
set(result);
}
} finally {
runner = null;
int s = state;
// 如果任务被中断,执行中断处理
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
//COMPLETING状态是任务是否执行完成的临界状态,进行阻塞直到任务执行结束
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
//当任务被执行完返回outcome
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
awaitDone中,get()形成一个阻塞队列
imagefuture可能会有多个线程去获取
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(2);
return "111";
}
};
ExecutorService service = Executors.newFixedThreadPool(1);
Future<String> future = service.submit(callable);
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+":"+future.get());
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+":"+future.get());
} catch (Exception e) {
e.printStackTrace();
}
}).start();
当多个线程访问get方法时,会阻塞形成waiters队列,由Node维护
imageFutureTask 执行活动图
imageexecute 方法
submit方法只是在executor方法中增加一个get获取Callable返回值功能
真正的开启线程执行任务
public void execute(Runnable command) {
// 表示 “线程池状态” 和 “线程数” 的整数
int c = ctl.get();
// 如果当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务,
if (workerCountOf(c) < corePoolSize) {
// addWorker(command, true)返回 false 代表线程池不允许提交任务
if (addWorker(command, true))
return;
}
//判断c < SHUTDOWN,只有RUNNING,offer添加任务到任务队列
if (isRunning(c) && workQueue.offer(command)) {
//防止并发更改了线程池状态
int recheck = ctl.get();
//防止并发更改了线程池状态
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果 workQueue 队列满了,以 maximumPoolSize 为界创建新的 worker
else if (!addWorker(command, false))
//执行初始化线程池中配置的reject策略
reject(command);
}
addWorker
addWorker()创建线程并执行工作线程处理任务,core:是否为核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//基本的校验,线程池状态不能大于SHUTDOWN
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
return false;
for (;;) {
//工作线程数
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
//execute方法已经作了判断,这里主要进行check
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
//工作线程+1,退出retry循环继续往下执行
break retry;
//CAS失败了,重新获取c判断线程池状态
c = ctl.get();
if (runStateOf(c) != rs)
//状态一致则重新执行内循环,否则执行retry循环
continue retry;
}
}
//创建的worker工作线程是否已经启动
boolean workerStarted = false;
//创建的worker是否添加到工作队列中 HashSet<Worker> workers
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
//ThreadFactory封装之后的工作线程
final Thread t = w.thread;
if (t != null) {
// 这个是整个线程池的全局锁,关闭线程池需要这个锁,持有锁线程池不会被关闭
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//线程池状态条件判断,小于SHUTDOWN只有RUNNING
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
//SHUTDOWN状态时,无法添加任务
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
//记录线程池最大值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//启动工作线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker
Worker则是线程工作实例,封装了工作线程thread,即将被执行的任务firstTask
//封装worker后的工作线程
final Thread thread;
//即将被执行的任务
Runnable firstTask;
//已成功执行完成的任务计数
volatile long completedTasks;
Worker(Runnable firstTask) {
//Worker继承AQS,设置的-1就是AQS的SIGNAL状态
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//Worker实现Runnable,通过初始化传参的ThreadFactory封装当前的Worker
this.thread = getThreadFactory().newThread(this);
}
//开启工作线程的run方法
public void run() {
runWorker(this);
}
runWorker
开启工作线程,runWorker处理任务
final void runWorker(Worker w) {
//执行任务的当前线程
Thread wt = Thread.currentThread();
//即将处理的任务
Runnable task = w.firstTask;
//worker中firstTask置空
w.firstTask = null;
//new Worker()是state==-1,调用tryRelease()将state置为0, 而interruptIfStarted()中只有state>=0才允许调用中断
w.unlock(); // allow interrupts
//异常导致的进入finally,那么completedAbruptly==true就是突然完成的
boolean completedAbruptly = true;
try {
//工作线程只会从firstTask或workQueue中获取任务
while (task != null || (task = getTask()) != null) {
//不是为了防止并发执行任务,在shutdown()时不终止正在运行的worker
w.lock();
//确保只有在线程stoping时,才会被设置中断标示,否则清除中断标示
if (...)
wt.interrupt();
try {
//钩子方法,由子类实现处理任务之前的操作
beforeExecute(wt, task);
try {
//执行任务的run方法
task.run();
} finally {
//同样是钩子方法,catch到的thrown异常
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
//没有突然的结束
completedAbruptly = false;
} finally {
//处理worker的退出
processWorkerExit(w, completedAbruptly);
}
}
1、beforeExecute:线程池中任务运行前执行
2、afterExecute:线程池中任务运行完毕后执行
3、terminated:线程池退出后执行
通过覆写ThreadPoolExecutor实例的方法
pool = new ThreadPoolExecutor(....) {
protected void beforeExecute(Thread t,Runnable r) {
System.out.println("准备执行:"+ ((ThreadTask)r).getTaskName());
}
protected void afterExecute(Runnable r,Throwable t) {
System.out.println("执行完毕:"+((ThreadTask)r).getTaskName());
}
protected void terminated() {
System.out.println("线程池退出");
}
};
getTask
getTask()获取任务,当前工作线程处理完firstTask开始从任务队列处理
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 队列为空时,工作线程-1退出循环
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//allowCoreThreadTimeOut为true则会关闭核心线程
//如果>corePoolSize数则一直会在超时之后关闭线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//是否获取workQueue合理性校验
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//timed为true两种情况:allowCoreThreadTimeOut为ture
//或者工作线程>corePoolSize
Runnable r = timed ?
//如果超时会抛异常,获取不到也表明任务少,不需要这么多工作线程
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
//获取成功
return r;
timedOut = true;
} catch (InterruptedException retry) {
//不抛异常,继续循环
timedOut = false;
}
}
}
Executors工具生成的线程池
//获取可用cpu核心数
public static final int maxThreads = Runtime.getRuntime().availableProcessors();
//Executors工具初始化线程池
public static ExecutorService executor = Executors.newFixedThreadPool(maxThreads);
//分配任务
Future<String> future = executor.submit(callable);
//关闭线程池
executorService.shutdown();
生成一个固定大小的线程池
最大线程数设置为与核心线程数相等,此时 keepAliveTime 设置为 0(因为这里它是没用的,即使不为 0,线程池默认也不会回收 corePoolSize 内的线程),任务队列采用 LinkedBlockingQueue,无界队列。
每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中,而且之后线程数始终为 nThreads。
//生成一个固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
生成只有一个线程的固定线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
生成一个需要的时候就创建新的线程,同时可以复用之前创建的线程的线程池
//核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
创建一个定长线程池,支持定时及周期性任务执行,定时线程池,schedule定时开启线程池
//最大线程数为 Integer.MAX_VALUE,任务队列采用 DelayedWorkQueue
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
static ScheduledExecutorService scheduledService = new ScheduledThreadPoolExecutor(2);
public static void main(String[] args) {
long startNow = System.currentTimeMillis();
//LocalDateTime处理时间
LocalDateTime dateTime = LocalDateTime.of(2020, 8, 19, 17, 28);
long time = Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant()).getTime();
System.out.println("time:"+(time-startNow));
```
scheduledService.schedule(new Thread(()->{
System.out.println("进入schedule处理的时间为:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}),(time-startNow),TimeUnit.MILLISECONDS);
网友评论