1.基础知识
1.1阻塞队列(BlockingQueue)
#用途:
1.生产者消费者模式
2.线程池
3.消息中间件
当阻塞队列是空时, 从队列中获取元素的操作将会被阻塞
当阻塞队列已满时, 向队列中添加元素的操作将会被阻塞
在多线程情况下, 某些情况下线程会挂起(阻塞), 被挂起的线程也可能被唤醒
BlockingQueue的好处是, 我们不再关心何时阻塞线程, 何时唤醒, BlockingQueue已在内部实现
方法 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
#抛出异常:
当阻塞队列是空时, 从队列中获取元素将抛出异常
当阻塞队列已满时, 向队列中添加元素将抛出异常
#返回特殊值:
插入方法: 成功true, 失败false
移除方法: 成功返回出队元素, 队列没有就返回null
#一直阻塞
当阻塞队列是空时, 消费者从队列中获取元素的操作将会被阻塞, 直到队列可用
当阻塞队列已满时, 生产者向队列中添加元素的操作将会被阻塞直到队列有空间或超时退出
#超时退出
当阻塞队列满时, 队列会阻塞生产者一段时间, 超过限制时间后, 生产者线程会退出
package com.zy.tools.undefined.concurrent.blocking;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingQueueDemo {
@Test
public void fn01() {
// 1.抛出异常
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(2);
// blockingQueue.remove(); // java.util.NoSuchElementException
// blockingQueue.element(); // java.util.NoSuchElementException
blockingQueue.add("a");
blockingQueue.add("b");
// blockingQueue.add("c"); // java.lang.IllegalStateException: Queue full
blockingQueue.remove();
}
@Test
public void fn02() {
// 2.返回特殊值
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(2);
System.out.println(blockingQueue.poll()); // null
System.out.println(blockingQueue.peek()); // null
System.out.println(blockingQueue.offer("a")); // true
System.out.println(blockingQueue.offer("b")); // true
System.out.println(blockingQueue.offer("c")); // false
}
@Test
public void fn03() throws InterruptedException {
// 3.一直阻塞-->慎用
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(2);
blockingQueue.put("a");
blockingQueue.put("b");
// blockingQueue.put("c"); // 线程会一直阻塞在这里
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
// blockingQueue.take(); // 线程会一直阻塞在这里
}
@Test
public void fn04() throws InterruptedException {
// 4.超时退出
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(2);
System.out.println(blockingQueue.poll(1, TimeUnit.SECONDS)); // null
System.out.println(blockingQueue.offer("a", 1, TimeUnit.SECONDS)); // true
System.out.println(blockingQueue.offer("b", 1, TimeUnit.SECONDS)); // true
System.out.println(blockingQueue.offer("c", 1, TimeUnit.SECONDS)); // false
System.out.println(blockingQueue.poll(1, TimeUnit.SECONDS)); // a
}
}
同步队列
package com.zy.tools.undefined.concurrent.blocking;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) {
// 同步队列, 生产一个, 消费一个
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "线程put-->a");
blockingQueue.put("a");
System.out.println(Thread.currentThread().getName() + "线程put-->b");
blockingQueue.put("b");
System.out.println(Thread.currentThread().getName() + "线程put-->c");
blockingQueue.put("c");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "线程take-->" + blockingQueue.take());
System.out.println(Thread.currentThread().getName() + "线程take-->" + blockingQueue.take());
System.out.println(Thread.currentThread().getName() + "线程take-->" + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
}
}
1.2线程
#进程
比如windows里运行的QQ,微信等程序,他们直接不会共享内存数据
#线程
线程就是进程中运行的多个子任务,是操作系统调用的最小单元
比如QQ可以边视频,边传文件,而传文件和视频就是线程,
线程之间是可以共享内存数据的,进程可以看做是线程的容器
1.2.1线程状态
Java中线程的状态分为6种。
public enum State {
/**
* Thread state for a thread which has not yet started.
* 新创建了一个线程对象,但还没有调用start()方法。
*/
NEW,
/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
* Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。
* 线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。
* 就绪状态(ready):该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权。
* 运行中状态(running):就绪状态的线程在获得CPU时间片开始执行任务。
*/
RUNNABLE,
/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
* 表示线程阻塞于锁
*/
BLOCKED,
/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
* 进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。
*/
WAITING,
/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
* 该状态不同于WAITING,它可以在指定的时间后自行返回。
*/
TIMED_WAITING,
/**
* Thread state for a terminated thread.
* The thread has completed execution.
* 表示该线程已经执行完毕。
*/
TERMINATED;
}
线程状态图.png
关于线程状态的解读
#TIMED_WAITING/WATING
这两种状态表示线程被挂起,必须等待lock.notify()或lock.notifyAll()或接收到interrupt信号才能退出等待状态.
>> 当设置超时时间时状态为TIMED_WAITING;
>> 如果是未设置超时时间,这时的状态为WATING.
#TIMED_WAITING/WATING下还需要关注下面几个线程状态:
>> waiting on condition:说明线程等待另一个条件的发生,来把自己唤醒;
>> on object monitor: 说明该线程正在执行obj.wait()方法,放弃了 Monitor,进入 “Wait Set”队列;
#BLOCKED
此时的线程处于阻塞状态,一般是在等待进入一个临界区“waiting for monitor entry”,这种状态是需要重点关注的
#哪些线程状态占用CPU?
处于TIMED_WAITING、WATING、BLOCKED状态的线程是不消耗CPU的,
而处于 RUNNABLE 状态的线程要结合当前线程代码的性质判断是否消耗CPU:
>> 纯java运算代码,并且未被挂起,是消耗CPU的;
>> 网络IO操作,在等待数据时是不消耗CPU的;
1.2.2线程调度
线程调度尽量交给JVM执行, 不建议人为干预线程的执行
1.Thread.sleep(long millis):
一定是当前线程调用此方法,当前线程进入"TIMED_WAITING"状态,
但不释放对象锁,millis后线程自动苏醒进入就绪状态。
作用:给其它线程执行机会的最佳方式。
2.Thread.yield():
一定是当前线程调用此方法,当前线程放弃获取的CPU时间片,
但不释放锁资源,由运行状态变为就绪状态,让OS再次选择线程。
作用:让相同优先级的线程轮流执行,但并不保证一定会轮流执行。
实际中无法保证yield()达到让步目的,因为让步的线程还有可能被线程调度程序再次选中。
Thread.yield()不会导致阻塞。该方法与sleep()类似,只是不能由用户指定暂停多长时间。
3.thread.join()/thread.join(long millis):
当前线程里调用其它线程t的join方法,
当前线程进入"WAITING"或"TIMED_WAITING"状态,当前线程不会释放已经持有的对象锁。除非当前线程锁对象是线程t.
线程t执行完毕或者millis时间到,当前线程一般情况下进入"RUNNABLE"状态,
也有可能进入"BLOCKED"状态(因为join是基于wait实现的)。
4.obj.wait(): // (必须在synchronized中使用)
当前线程调用对象的wait()方法,当前线程释放对象锁,进入等待队列。
依靠notify()/notifyAll()唤醒或者wait(long timeout) timeout时间到自动唤醒。
5.obj.notify():
唤醒在此对象监视器上等待的单个线程,选择是任意性的。
notifyAll()唤醒在此对象监视器上等待的所有线程。
6.LockSupport.park()/.parkNanos(long nanos)/.parkUntil(long deadlines):
当前线程进入"WAITING"或"TIMED_WAITING"状态。
对比wait方法,不需要获得锁就可以让线程进入"WAITING"或"TIMED_WAITING"状态,
需要通过LockSupport.unpark(Thread thread)唤醒。
7.thread.stop(): # @Deprecated, 方法已废弃
停止当前线程。但不会保证释放当前线程占有的资源。
该方式是通过立即抛出ThreadDeath异常来达到停止线程的目的,
而且此异常抛出可能发生在程序的任何一个地方,包括catch、finally等语句块中。
从SUN的官方文档可以得知,调用Thread.stop()方法是不安全的,
这是因为当调用Thread.stop()方法时,会发生下面两件事:
>> 即刻抛出ThreadDeath异常,在线程的run()方法内,
任何一点都有可能抛出ThreadDeath Error,包括在catch或finally语句中。
>> 释放该线程所持有的所有的锁。
调用thread.stop()后导致了该线程所持有的所有锁的突然释放,
那么被保护数据就有可能呈现不一致性,
其他线程在使用这些被破坏的数据时,有可能导致一些很奇怪的应用程序错误。
8.thread.interrupt():
8.1本线程中断自身是被允许的,且"中断标记"设置为true
8.2其它线程调用本线程的interrupt()方法时,会通过checkAccess()检查权限:
>> 若线程在阻塞状态时,调用了它的interrupt()方法,那么它的“中断状态”会被清除,
并且会收到一个InterruptedException异常。
>> 如果线程被阻塞在一个Selector选择器中,那么通过interrupt()中断它时;
线程的中断标记会被设置为true,并且它会立即从选择操作中返回。
>> 如果不属于前面所说的情况,那么通过interrupt()中断线程时,它的中断标记会被设置为“true”。
9.Thread.interrupted():
判断的是当前线程是否处于中断状态。
是类的静态方法,同时会清除线程的中断状态。
10.thread.isInterrupted():
判断调用线程是否处于中断状态
11.thread.destroy(): # @Deprecated, 方法已废弃
线程被强行关闭, 可能产生死锁。
12.thread.suspend(): # @Deprecated, 方法已废弃
使线程暂停,但是不会释放类似锁这样的资源。
suspend()和resume()必须要成对出现,否则非常容易发生死锁。
因为suspend方法并不会释放锁,如果使用suspend的目标线程对一个重要的系统资源持有锁,
那么没任何线程可以使用这个资源直到要suspend的目标线程被resumed,
如果一个线程在resume目标线程之前尝试持有这个重要的系统资源锁再去resume目标线程,
这两条线程就相互死锁了,也就冻结线程。
13.thread.resume(): # @Deprecated, 方法已废弃
使线程恢复,如果之前没有使用suspend暂停线程,则不起作用。
14.thread.run()和thread.start()
当你启动线程,使用start(),系统会把run()方法当成线程执行体来处理。
当你启动线程时,调用run()方法,系统会把run()方法当成普通的方法,线程对象也当成一个普通对象,
系统run()方法会直接在当前线程(如main线程中执行)执行, 不会在新的线程中执行, 没有了多线程。
15.其他方法略
currentThread
isAlive
setPriority
getPriority
setName
getName
getThreadGroup
activeCount
enumerate
countStackFrames
dumpStack
setDaemon
isDaemon
getContextClassLoader
setContextClassLoader
holdsLock
getStackTrace
getAllStackTraces
getId
getState
setDefaultUncaughtExceptionHandler
getDefaultUncaughtExceptionHandler
getUncaughtExceptionHandler
setUncaughtExceptionHandler
如何安全的暂停一个线程
#使用共享变量的方式
private volatile boolean exit = false;
#使用interrupt方法终止线程
如果一个线程由于等待某些事件的发生而被阻塞,又该怎样停止该线程呢?
这种情况经常会发生,比如当一个线程由于需要等候键盘输入而被阻塞,或者调用Thread.join()方法,
或者Thread.sleep()方法,在网络中调用ServerSocket.accept()方法,
或者调用了DatagramSocket.receive()方法时,都有可能导致线程阻塞,
使线程处于处于不可运行状态时,即使主程序中将该线程的共享变量设置为true,
但该线程此时根本无法检查循环标志,当然也就无法立即中断。
使用Thread提供的interrupt()方法,该方法虽然不会中断一个正在运行的线程,
但是它可以使一个被阻塞的线程抛出一个中断异常,从而使线程提前结束阻塞状态,退出堵塞代码。
https://www.cnblogs.com/luckygxf/p/4737655.html
关于线程的挂起与唤醒
#线程挂起的方式:
>> wait 及其重载方法(Object类)
>> park及其重载方法 & 其他 park方法(LockSupport类)
>> 各种await及其重载方法(condition类, CountDownLatch类等各种运用AQS的类)
>> Thread.suspend方法(已废弃, 不能使用, 会造成死锁)
#线程唤醒的方式:
>> notify 及其重载方法(Object类)
>> notifyAll 方法(Object类)
>> unpark及其重载方法 & 其他unpark方法(LockSupport类)
>> 各种signal及signalAll及其重载方法(condition类, CountDownLatch类等各种运用AQS的类)
>> Thread.resume方法(已废弃, 不能使用, 会造成死锁)
关于线程的阻塞的原因
>> wait 及其重载方法(Object类)
>> park及其重载方法 & 其他 park方法(LockSupport类)
>> 各种await及其重载方法(condition类, CountDownLatch类等各种运用AQS的类)
>> Thread.suspend方法(已废弃, 不能使用, 会造成死锁)
>> 调用了 Thread.sleep方法
>> 线程由于 I/O 操作(输入输出)而受阻塞
1.2.3线程的创建
1.继承Thread重写run方法
2.实现Runnable重写run方法
3.实现Callable重写call方法
实现Callable和实现Runnable类似,但是功能更强大,具体表现在:
a.可以在任务结束后提供一个返回值,Runnable不行
b.call方法可以抛出异常,Runnable的run方法不行
c.可以通过运行Callable得到的Fulture对象监听目标线程调用call方法的结果,得到返回值,
(fulture.get(),调用后会阻塞,直到获取到返回值)
1.2.4多线程相关控制类
#分为阻塞及非阻塞模式
java.util.concurrent.Future<V>
#不建议使用默认的supplyAsync...因其有默认线程池配置
java.util.concurrent.CompletableFuture<V>
#保存线程的独立变量。对一个线程类(继承自Thread)
java.lang.ThreadLocal<V>
2.Executor介绍
为什么使用线程池
#1.如果频繁创建线程会损耗过多额外的时间
线程创建所需时间为 T1,线程执行任务所需时间为 T2,线程销毁所需时间为 T3,一般 T1+T3 > T2.
如果有任务来了,再去创建线程的话效率比较低,如果从一个池子中可以直接获取可用的线程,那效率会有所提高。
所以线程池省去了任务过来,要先创建线程再去执行的过程,节省了时间,提升了效率;
#2.线程池可以管理和控制线程
因为线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,
使用线程池可以进行统一的分配,调优和监控;
#3.线程池提供队列,存放缓冲等待执行的任务。
/**
* 一、线程池:
* 提供了一个线程队列,队列中保存着所有等待状态的线程。
* 避免了创建与销毁额外开销,提高了响应的速度。
* 二、线程池的体系结构:
* java.util.concurrent.Executor : 负责线程的使用与调度的根接口
* |--**ExecutorService 子接口: 线程池的主要接口
* |-AbstractExecutorService:实现了ExecutorService接口,基本实现了ExecutorService其中声明的所有方法,另有添加其他方法
* |--ThreadPoolExecutor 线程池的实现类
* |--ScheduledExecutorService 子接口:负责线程的调度
* |--ScheduledThreadPoolExecutor :继承 ThreadPoolExecutor, 实现 ScheduledExecutorService
*
*/
2.1线程池的实现类(推荐使用):ThreadPoolExecutor
该类有多个重载的构造器,这里选最全参数的说明:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
#1. int corePoolSize
- 线程池核心池的大小, 常驻线程大小。
#2. int maximumPoolSize
- 线程池的最大线程数。
#3. long keepAliveTime
- 多余的空闲线程的存活时间。
当线程池中线程数量超过corePoolSize时,
当空闲时间达到keepAliveTime时,
多余空闲线程会被销毁, 直至剩下corePoolSize个线程为止
#4. TimeUnit unit
- keepAliveTime 的时间单位。
#5. BlockingQueue<Runnable> workQueue
- 被提交但尚未被执行的任务。
#6. ThreadFactory threadFactory
- 线程工厂。
用于创建线程池中线程的工厂
#7. RejectedExecutionHandler handler
- 拒绝策略。
当队列满了, 并且工作线程数大于等于maximumPoolSize时
#注:
当任务数 > 阻塞队列数 + 最大线程池时, 执行拒绝策略
线程池执行顺序.png
线程池工作原理.png
线程池工作原理解读.png
关注点1 线程池大小
线程池有两个线程数的设置,一个为核心池线程数,一个为最大线程数。
在创建了线程池后,默认情况下,线程池中并没有任何线程,等到有任务来才创建线程去执行任务,
除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法
当创建的线程数等于 corePoolSize 时,会加入设置的阻塞队列。
当队列满时,会创建线程执行任务直到线程池中的数量等于maximumPoolSize。
关注点2 适当的阻塞队列
java.lang.IllegalStateException: Queue full
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列(大小默认是Integer.MAX_VALUE)。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue: 一个使用优先级队列实现的延迟无界阻塞队列。
SynchronousQueue: 一个不存储元素的阻塞队列, 即单个元素的阻塞队列。
LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。
关注点3 明确拒绝策略
ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。 (默认)
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。如果允许任务丢失, 可采用此策略。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后把当前任务加入队列中, 尝试再次提交当前任务
ThreadPoolExecutor.CallerRunsPolicy:不抛弃任务, 也不抛出异常, 而是将任务回退到调用者, 从而降低新任务的流量。哪个线程提交这个任务,就由哪个线程执行。
# dubbo中的拒绝策略
当dubbo的工作线程触发了线程拒绝后,主要做了三个事情,让使用者清楚触发线程拒绝策略的真实原因
>> 输出了一条warn级别的日志,日志内容为线程池的详细设置参数,以及线程池当前的状态,还有当前拒绝任务的一些详细信息。
>> 输出当前线程堆栈详情。
>> 继续抛出拒绝执行异常,使本次任务失败,这个继承了JDK默认拒绝策略的特性
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
private final String threadName;
private final URL url;
private static volatile long lastPrintTime = 0;
private static Semaphore guard = new Semaphore(1);
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}
private void dumpJStack() {
//省略实现
}
}
#netty中的拒绝策略
Netty中的实现很像JDK中的CallerRunsPolicy,舍不得丢弃任务。
不同的是,CallerRunsPolicy是直接在调用者线程执行的任务。
而 Netty是新建了一个线程来处理的。
所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景了。
但是Netty的实现里,在创建线程时未做任何的判断约束,
即只要系统还有资源就会创建新的线程来处理,直到new不出新的线程了,才会抛创建线程失败的异常
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy() {
super();
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
final Thread t = new Thread(r, "Temporary task executor");
t.start();
} catch (Throwable e) {
throw new RejectedExecutionException(
"Failed to start a new thread", e);
}
}
}
#ActiveMQ中的线程池拒绝策略
ActiveMq中当触发拒绝策略时,会在一分钟的时间重新将任务塞进任务队列,当一分钟超时还没成功时,就抛出异常
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
}
throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
}
});
关注点4
线程池中任务执行流程
execute() -> addWorker() -> start() -> run() -> runWorker()
说明:Executors 各个方法的弊端
1)newFixedThreadPool 和 newSingleThreadExecutor:
主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。
2)newCachedThreadPool 和 newScheduledThreadPool:
主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。
2.2使用策略:
可以仿照Executors工具类中创建线程池的方法,
通过创建ThreadPoolExecutor,返回值是ExecutorService,
从而调用ExecutorService中的方法
package com.zy.juc;
import java.util.concurrent.*;
public class ThreadPoolExecutorTools{
public static ExecutorService newThreadPoolExecutor(Object ... params) { // 该方法可以传入参数,以占位下述7个参数
return new ThreadPoolExecutor(
0, // 线程池核心池的大小,即闲时线程池中线程数量
Integer.MAX_VALUE, // 线程池的最大线程数
60L, // 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
TimeUnit.SECONDS, // 这里的时间单位可以选择其他
new SynchronousQueue<Runnable>(), // 这里的阻塞队列可以选择其他类型
Executors.defaultThreadFactory(), // 这里的线程工厂参数可以自定义
new ThreadPoolExecutor.DiscardPolicy() // 这里的拒绝策略可以选择其他类型
);
}
}
2.3 线程池参数合理配置
2.3.1 核心线程数配置
#CPU密集型
即该任务需要大量的运算, 没有阻塞, CPU一直高速运转
只有在真正的多核CPU上才能得到加速(通过多线程)
--> 配置尽可能少的线程数量
--> 线程池最大线程数 = CPU核数 + 1个线程的线程池
#IO密集型
方案1:
由于IO密集型线程并不是一直在执行任务, 则应配置尽可能多的线程
--> 线程池最大线程数 = CPU核数 * 2 + 1
方案2:
由于IO密集型, 大部分线程都阻塞, 故需要多配置线程数:
--> 线程池最大线程数 = CPU / (1 - 阻塞系数), 0.8 < 阻塞系数 < 0.9
#最终结论:
以压测结果为准
2.3.2阻塞队列的选择
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。
这两个附加的操作是:
在队列为空时,获取元素的线程会等待队列变为非空。
当队列满时,存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。
阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
ArrayBlockingQueue与LinkedBlockingQueue区别
#1.队列大小有所不同
ArrayBlockingQueue是有界的初始化必须指定大小,
而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE, 而且不会初始化就占用一大片内存)
对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
#2.数据存储容器不同
ArrayBlockingQueue采用的是数组作为数据存储容器,
LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
#3.对GC的影响不同
由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,
而LinkedBlockingQueue则会生成一个额外的Node对象。
这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
#4.两者的实现队列添加或移除的锁不一样
ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,
这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,
以此来提高整个队列的并发性能。
#5.两者的size都是强一致的, 但实现有区别
ArrayBlockingQueue使用全局锁
LinkedBlockingQueue使用原子变量实现。
PriorityBlockingQueue
PriorityBlockingQueue是线程安全的PriorityQueue,其实现原理与PriorityQueue类似,
在此基础上实现了BlockingQueue接口,能够作为阻塞队列使用,
由于PriorityBlockingQueue是无界队列,因而使用put方法并不会阻塞,offer方法不会返回false。
该队列不支持插入null元素,同时不支持插入非comparable的对象。
它的迭代器并不保证队列保持任何特定的顺序,如果想要顺序遍历,考虑使用Arrays.sort(pq.toArray())。
该类不保证同等优先级的元素顺序,如果你想要强制顺序,
就需要考虑自定义顺序或者是Comparator使用第二个比较属性。
PriorityBlockingQueue也是基于最小二叉堆实现,
对于堆数组中索引为k的节点,其父节点为(k-1)/2,其左右子节点分别为2k+1,2k+2。
PriorityBlockingQueue使用ReentrantLock来控制所有公用操作的线程同步,
使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行。
DelayQueue
DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。
队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。
只有在延迟期满时才能从队列中提取元素。不能将null元素放置到这种队列中。
DelayQueue非常有用,可以运用在以下两个应用场景:
#缓存系统的设计:
使用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,
一旦能从DelayQueue中获取元素时,就表示有缓存到期了。
#定时任务调度:
使用DelayQueue保存当天要执行的任务和执行时间,
一旦从DelayQueue中获取到任务就开始执行,
比如Timer就是使用DelayQueue实现的。
SynchronousQueue
SynchronousQueue是一个没有数据缓冲的BlockingQueue,此队列不允许 null 元素。
生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。
与ArrayBlockingQueue,LinkedListBlockingQueue 不同,
SynchronousQueue内部并没有数据缓存空间,
不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,
不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。
队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。
数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。
可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。
SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,
这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
LinkedTransferQueue
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。
相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
LinkedTransferQueue采用一种预占模式。
意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,
若队列为空,那就生成一个节点(节点元素为null)入队,
然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null的节点,
生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,
被唤醒的消费者线程取走元素,从调用的方法返回。
我们称这种节点操作为“匹配”方式。
LinkedTransferQueue是ConcurrentLinkedQueue、SynchronousQueue(公平模式下转交元素)、LinkedBlockingQueue(阻塞Queue的基本方法)的超集。
而且LinkedTransferQueue更好用,因为它不仅仅综合了这几个类的功能,同时也提供了更高效的实现。
和SynchronousQueue相比,LinkedTransferQueue多了一个可以存储的队列,
与LinkedBlockingQueue相比,LinkedTransferQueue多了直接传递元素,少了用锁来同步。
LinkedBlockingDeque
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。
所谓双向队列指的你可以从队列的两端插入和移出元素。
双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。
相比其他的阻塞队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,
以First单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。
以Last单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。
另外插入方法add等同于addLast,移除方法remove等效于removeFirst。
但是take方法却等同于takeFirst,不知道是不是Jdk的bug,使用时还是用带有First和Last后缀的方法更清楚。
在初始化LinkedBlockingDeque时可以初始化队列的容量,用来防止其再扩容时过渡膨胀。
另外双向阻塞队列可以运用在“工作窃取”模式中。
2.3.3 ExecutorService接口中常用方法
/*
* 在调用shutdown()方法之后,
* ExecutorService不会立即关闭,但是它不再接收新的任务,直到当前所有线程执行完成才会关闭,
* 所有在shutdown()执行之前提交的任务都会被执行
*/
void shutdown();
/*
* 立即关闭ExecutorService, 这个动作将跳过所有正在执行的任务和被提交还没有执行的任务。
* 但是它并不对正在执行的任务做任何保证,有可能它们都会停止,也有可能执行完成
*/
List<Runnable> shutdownNow();
// 只有调用了shutdown方法,返回true
boolean isShutdown();
// 只有调用了shutdown方法,并且没有任务在执行了,返回true
boolean isTerminated();
// 等待所有任务执行完毕,也需要调用shutdown方法,否则只能等待超时
boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;
// 执行有返回值的线程。返回Future对象
<T> Future<T> submit(Callable<T> var1);
// 如果线程执行成功则返回预设的result
<T> Future<T> submit(Runnable var1, T var2);
// 执行没有返回值的线程并返回Future对象
Future<?> submit(Runnable var1);
############两个批量执行任务的方法,invokeAll()和invokeAny()############
// 执行一个集合的有返回值的线程,在所有任务都完成(包括成功/被中断/超时)后才会返回
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;
/*
* 在指定的时间内执行集合的方法,如果指定时间内还没有获取结果,那么终止该线程执行。
* 返回的Future对象可通过isDone()方法和isCancel()来判断是执行成功还是被终止了
* 在所有任务都完成(包括成功/被中断/超时)后才会返回
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;
/*
* 方法接收的是一个Callable的集合,执行这个方法会返回所有Callable任务中其中一个任务的执行结果。
* 在任意一个任务成功(或ExecutorService被中断/超时)后就会返回
*/
<T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;
/*
* 在指定的时间内执行集合的方法,如果指定时间内还没有获取结果,那么终止该线程执行。
* 返回的Future对象可通过isDone()方法和isCancel()来判断是执行成功还是被终止了
* 在任意一个任务成功(或ExecutorService被中断/超时)后就会返回
*/
<T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
2.3.4 线程池的参数设置
CPU参数设置(美团总结).png上图可知, 均不理想.
可选的方案是: 线程池参数动态化:
线程池最核心的是3个:corePoolSize、maximumPoolSize,workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略
#1.加线程池监控
用户基于这个功能可以了解线程池的实时状态,比如当前有多少个工作线程,执行了多少个任务,队列中等待的任务数等等。
>> getActiveCount()
>> getCompletedTaskCount()
>> getCorePoolSize()
>> getKeepAliveTime()
>> getLargestPoolSize()
>> getMaximumPoolSize()
>> getQueue()
>> getRejectedExecutionHandler()
>> getTaskCount()
#2.参数可动态修改:根据上述线程池监控到的参数动态调整参数
>> setCorePoolSize
>> setMaximumPoolSize
>> setRejectedExecutionHandler
>> setKeepAliveTime
>> setThreadFactory
------------------分割线------------------
#Netty进阶指南给出来的方案
在Netty服务编写的过程中,也要涉及到两个线程池的参数配置,尤其是IO线程池的配置,
这里书中也给了一套经验方案来针对线程的监控情况,可以参考:
同样的先用CPU核数*2,看看是否存在瓶颈,运行时的监控则用比较土的办法了:
1.打印thread dump,同时获取当时cpu排在前面几个的线程号
2.然后在线程dump文件中去对应的线程号堆栈
3.然后在堆栈中查找是否有SelectotImpl.lookAndDoSelect处的lock信息
4.如果多次采集都发现有这堆信息的话,说明此时此刻的IO线程比较空闲,无需调整;
如果一直在read或者write的执行处,则说明IO较为繁忙,可以适当的去调大NioEventLoop线程的个数来提升网络的读写性能。
但是这边线程数的改动就不是动态化的了,服务启动后指定的线程数就不能再修改了。
2.3.5 如何处理线程池内线程执行异常
#线程池中任务执行流程
submit()/execute() -> addWorker() -> start() -> run() -> runWorker()
#java.util.concurrent.ThreadPoolExecutor#runWorker中执行到task.run处时:
>> execute 执行的是 Runnable 的 run 方法
>> submit 执行的是 FutureTask 的 run 方法, 该方法catch住了Throwable的异常, 并且没有抛出, 可以采用submit的get方法获取到该异常
#如何打印出异常的堆栈信息
1.任务内部将可能会抛出异常的步骤进行 try catch (建议使用)
2.重写线程池的 afterExecute 方法
3.ThreadFactory中设置线程 UncaughtExceptionHandler
package com.zy.research.concurrent;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.*;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
/*ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS,
// new LinkedBlockingQueue<>(1),
new SynchronousQueue<>(),
new DefaultThreadFactory("my-threadFactory"),
new ThreadPoolExecutor.CallerRunsPolicy()) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
// fixme, 这里也可以解决 submit 不打印堆栈信息的问题
t.printStackTrace();
}
};*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.SECONDS,
// new LinkedBlockingQueue<>(1),
new SynchronousQueue<>(),
new DefaultThreadFactory("my-threadFactory"),
// customThreadFactory(), // fixme, 这里也可以解决 submit 不打印堆栈信息的问题
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 1; i < 10; i++) {
// testExecute(threadPoolExecutor, finalI);
// testSubmit(threadPoolExecutor, finalI);
// testExecuteCatchThrowable(threadPoolExecutor, finalI);
testSubmitCatchThrowable(threadPoolExecutor, i);
}
}
/**
* 执行 execute 方法(任务内部不捕获异常)
* @param threadPoolExecutor
* @param finalI
*/
private static void testExecute(ThreadPoolExecutor threadPoolExecutor, int finalI) {
// fixme, execute 方法 ==> 执行时抛出了异常, 此处会打印堆栈信息
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ===> " + finalI);
throw new RuntimeException(Thread.currentThread().getName() + " ===> " + "error --》 " + finalI);
});
}
/**
* 执行 submit 方法(任务内部不捕获异常)
* @param threadPoolExecutor
* @param finalI
*/
private static void testSubmit(ThreadPoolExecutor threadPoolExecutor, int finalI) {
// fixme, submit 方法 ==> 执行时抛出了异常, 此处不会打印堆栈信息
threadPoolExecutor.submit(() -> {
System.out.println(Thread.currentThread().getName() + " ===> " + finalI);
throw new RuntimeException(Thread.currentThread().getName() + " ===> " + "error --》 " + finalI);
});
}
/**
* 执行 execute 方法(任务内部捕获异常)
* @param threadPoolExecutor
* @param finalI
*/
private static void testExecuteCatchThrowable(ThreadPoolExecutor threadPoolExecutor, int finalI) {
threadPoolExecutor.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " ===> " + finalI);
throw new RuntimeException(Thread.currentThread().getName() + " ===> " + "error --》 " + finalI);
} catch (Throwable e) {
// fixme, 任务内部捕获所有异常, 打印堆栈信息
e.printStackTrace();
}
});
}
/**
* 执行 execute 方法(任务内部捕获异常)
* @param threadPoolExecutor
* @param finalI
*/
private static void testSubmitCatchThrowable(ThreadPoolExecutor threadPoolExecutor, int finalI) {
// fixme, execute 方法 ==> 执行时抛出了异常
threadPoolExecutor.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() + " ===> " + finalI);
throw new RuntimeException(Thread.currentThread().getName() + " ===> " + "error --》 " + finalI);
} catch (Throwable e) {
// fixme, 任务内部捕获所有异常, 打印堆栈信息
e.printStackTrace();
}
});
}
private static ThreadFactory customThreadFactory() {
Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (t, e) -> {
// fixme, 这里可以打印各种堆栈信息及日志
e.printStackTrace();
};
return new ThreadFactoryBuilder()
.setNameFormat("custom-threadFactory")
.setUncaughtExceptionHandler(uncaughtExceptionHandler)
.build();
}
}
2.3.6 五种线程池状态
private static final int COUNT_BITS = Integer.SIZE - 3;
#RUNNING = -1 << COUNT_BITS;
该状态的线程池能够接受新任务,并对新添加的任务进行处理
#SHUTDOWN = 0 << COUNT_BITS;
该状态的线程池不再接受新任务,但是会把阻塞队列中的任务全部执行完
#STOP = 1 << COUNT_BITS;
该状态的线程池不再接收新任务,不处理阻塞队列的任务,并且中断正在处理的任务
#TIDYING = 2 << COUNT_BITS;
当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态
#TERMINATED = 3 << COUNT_BITS;
线程池彻底终止的状态
线程池生命周期.png
3.Executor的工具类Executors简介
executors线程池.png阿里的 Java开发手册,关于java线程池的一个建议:
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,
这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
package com.zy.juc;
import org.junit.Test;
import java.util.concurrent.*;
/*
* 工具类 : Executors下的方法名及返回值
* ExecutorService newFixedThreadPool() :
创建固定大小的线程池。
每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。
线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
适合执行长期的任务, 性能好很多。
* ExecutorService newCachedThreadPool() :
创建一个可缓存的线程池。
如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,
当任务数增加时,此线程池又可以智能的添加新线程来处理任务。
此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
执行很多短期异步的小程序或者负载较轻的服务器。
* ExecutorService newSingleThreadExecutor() :
创建一个单线程的线程池。
这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。
如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。
此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
适合一个任务一个任务执行的场景。
*
* ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务。
*/
public class ExecutorsAndScheduledExecutorService {
@Test
public void fn01(){
// 创建固定大小的线程池
ExecutorService pool = Executors.newFixedThreadPool(10);
// 实现Callable接口的线程池
Future<Integer> future = pool.submit(() -> {
int i = (int) (Math.random() * 100);
for (int k = 0; i < i; i ++){
i += k;
}
return i;
});
try {
Integer num = future.get();
System.out.println(num);
// 关闭线程池
pool.shutdown();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
@Test
public void fn02(){
// 创建线程任务调度
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
// 实现Callable接口的线程池并定时调度
ScheduledFuture<Integer> future = executor.schedule(() -> {
int i = (int) (Math.random() * 100);
for (int k = 0; i < i; i++) {
i += k;
}
System.out.println(">>>>>>>>>>>>"+ i +"<<<<<<<<<<<");
return i;
}, 3, TimeUnit.SECONDS);
try {
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
4.自定义线程池
4.1利用锁机制, 来防止阻塞队列满时, 任务被丢弃扔给主线程
package com.zy.commons.lang.concurrent;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class BlockedAndFixedThreadPoolExecutor extends ThreadPoolExecutor {
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition;
private static final int BLOCKING_QUEUE_SIZE = 1024 * 2;
@Getter
private final int blockingQueueSize;
public BlockedAndFixedThreadPoolExecutor(int poolSize, int blockingQueueSize, String threadPrefix) {
super(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(blockingQueueSize <= 0 ? BLOCKING_QUEUE_SIZE : blockingQueueSize), new ThreadFactory() {
private final LongAdder threadNo = new LongAdder();
@Override
public Thread newThread(Runnable r) {
threadNo.increment();
String threadName = threadPrefix + threadNo;
Thread thread = new Thread(r, threadName);
thread.setDaemon(true);
return thread;
}
});
this.blockingQueueSize = blockingQueueSize <= 0 ? BLOCKING_QUEUE_SIZE : blockingQueueSize;
this.condition = this.lock.newCondition();
}
@Override
public void execute(Runnable command) {
this.lock.lock();
try {
super.execute(command);
while (this.getPoolSize() >= this.getCorePoolSize() && this.getQueue().size() >= this.getBlockingQueueSize()) {
// 这里需要考虑下, 是否应该加上这个 3s 的超时时间
this.condition.await(3L, TimeUnit.SECONDS);
}
} catch (Throwable e) {
log.warn("failed to execute task.", e);
} finally {
this.lock.unlock();
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
this.lock.lock();
try {
this.condition.signalAll();
} finally {
this.lock.unlock();
}
}
}
4.2 利用 CompletableFuture 及 flatMap合并多任务结果
package com.zy.netty;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class Demo {
/**
* CPU核数
*/
private static final int CPU_NUM = Runtime.getRuntime().availableProcessors();
/**
* IO密集型的核心线程数配置
*/
private static final int POOL_SIZE = 2 * CPU_NUM + 1;
/**
* 线程池阻塞队列数量
*/
private static final Integer BLOCKING_QUEUE_SIZE = 1024 * 10;
/**
* 线程池
*/
private static final ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(BLOCKING_QUEUE_SIZE));
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
List<CompletableFuture<List<Integer>>> completableFutures = new ArrayList<>();
try {
for (int i = 0; i < 10; i ++) {
final int j = i;
CompletableFuture<List<Integer>> future = CompletableFuture.supplyAsync(() -> cal(j), executorService)
.whenComplete((res, exception) -> {
list.addAll(res);
});
completableFutures.add(future);
}
CompletableFuture<List<List<Integer>>> result = combineResult(completableFutures);
// 为避免一直等待, 超时即退出
List<List<Integer>> lists = result.get(60L, TimeUnit.SECONDS);
if (lists != null && lists.size() > 0) {
// 采用 flatMap 进行结果合并处理
List<Integer> collect = lists.stream().flatMap(Collection::stream).collect(Collectors.toList());
System.out.println(collect);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
private static List<Integer> cal(int i) {
List<Integer> list = new ArrayList<>();
list.add(i);
list.add(i);
return list;
}
/**
* 全流式处理的融合
* @param futures
* @param <T>
* @return
*/
public static <T> CompletableFuture<List<T>> combineResult(List<CompletableFuture<T>> futures) {
CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList()));
}
}
5.ForkJoinPool
ForkJoinPool-->common.png ForkJoinPool-->static静态块初始化.png ForkJoinPool-->commonPool方法获取common实例.png6.ScheduledThreadPoolExecutor (注意捕获任务的异常)
#scheduleAtFixedRate:
是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,
如果上一个任务执行完毕,则当前任务立即执行,
如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
#scheduleWithFixedDelay:
是以上一个任务结束时开始计时,period时间过去后,立即执行。
#schedule
一次性任务, 执行完毕即终止.
参考资源
http://ifeve.com/java-blocking-queue/
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html (美团线程池)
https://blog.csdn.net/mlljava1111/article/details/81170787 (jstack性能问题定位案例分析详解)
网友评论