一、概念、生产者消费者模式
- 队列
先进先出的一个数据结构 - 阻塞队列
(1)当队列为空的时候,从里面取数据的动作会被阻塞。
(2)当队列满的时候,往里面放元素的动作会被阻塞。 - 生产者消费者模式
在生产者和消费者模式之间插入一个阻塞队列。生产者生产的东西放到队列容器中,而消费者直接从队列中取出东西进行消费。这样可以使得生产者和消费者之间的解耦,也可以使得生产者和消费者的性能的均衡,避免生产者生产东西过快,一直等待消费者消费,或者消费者消费过快等待生产者生产东西。
二、JDK中实现的阻塞队列
在JDK中BlockingQueue类封装了阻塞队列的接口
其中包含了几个核心的方法:
- add和remove方法,非阻塞方法
当队列满的时候往里面add数据,会抛出异常,当队列为空的时候从里面remove数据,会抛出异常。
boolean add(E e);
boolean remove(Object o);
- offer和poll方法,非阻塞方法
offer:往队列中里插入一个元素,当队列满的时候,返回一个false,
poll:从队列取出一个元素,当队列为空的时候,返回一个null
boolean offer(E e);
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
- take和put方法,真正意义上的阻塞方法
take:当队列为空的时候,从队列中取元素,会被阻塞
put:当队列满的时候,往队列中插入元素,会被阻塞
E take() throws InterruptedException;
void put(E e) throws InterruptedException;
三、JDK中常用的阻塞队列
-
ArrayBlockingQueue
是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。初始化时有参数可以设置 -
LinkedBlockingQueue
是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。 -
PriorityBlockingQueue
PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。 -
DelayQueue
是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。 -
SynchronousQueue:一个不存储元素的阻塞队列。
-
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
-
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
以上的阻塞队列都实现了BlockingQueue接口,也都是线程安全的。
在使用阻塞队列的时候,尽量使用有界的阻塞队列,有界的阻塞队列,规定了最大容量,当队列满的时候,往里面插入元素,会被阻塞。而无界的阻塞队列,可以一直往里插入元素,会占用内存,最终总会使得内存溢出。
缓存系统的设计
可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue。一旦可以从队列中查到元素,表示缓存有效期到了。
四、线程池
Java中的线程池是运用场景最多的并发框架。合理的使用线程池能够带来以下3个好处:
(1)降低资源的消耗:通过重复利用线程池中的线程,降低线程创建和销毁带来的开销。
(2)提高响应速度:当提交一个任务的时候,不需要等待线程的创建就能立即执行。
(3)提高线程的可管理性:在操作系统中,线程是非常稀缺的资源,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性。
1、JDK中线程池的实现
在JDK中使用ThreadPoolExecutor 作为线程池的核心类,用来执行被提交的任务。
ThreadPoolExecutor 的使用举例
package com.it.test.thread.consumer_product.retranlock;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class LockTest {
public static void main(String[] args) {
LinkedBlockingQueue queue = new LinkedBlockingQueue();
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 30, TimeUnit.MINUTES, queue, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
}, new ThreadPoolExecutor.DiscardOldestPolicy());
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+":你好");
}
});
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+":世界");
}
});
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+":hello");
}
});
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+":world");
}
});
System.out.println("阻塞队列的长度:"+queue.size());
}
}
pool-1-thread-1:你好
pool-1-thread-2:世界
pool-1-thread-2:world
pool-1-thread-1:hello
阻塞队列的长度:2
ThreadPoolExecutor 类的解析
(1)构造方法
构造方法中包含了7个核心参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize
核心线程数,当提交一个任务的时候,默认情况下线程池会新建一个线程执行任务,直到线程数等于核心线程数。
如果当前线程数为corePoolSize(核心线程数),继续提交任务的时候,会被保存到阻塞队列中等待被执行。
如果执行了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有的核心线程。 -
maximumPoolSize
线程池允许的最大线程数,如果当阻塞队列中的任务满的时候,继续提交任务,线程池会继续创建新的线程执行任务。前提是当前线程数要小于等于最大线程数。 -
keepAliveTime
线程空闲的时候,存活的时间。即当没有任务执行的时候,线程继续存活的时间。默认情况下该参数只有在线程数大于核心线程数才有用 -
TimeUnit
时间单位 -
workQueue
存放任务的BlockingQueue阻塞队列,如果当前线程数量等于核心线程数,继续提交任务的时候,会将任务存放到阻塞队列,等待被执行。 -
threadFactory
线程池中创建线程的工厂,一般只是对线程做一些策略,例如自定义线程名字等等 -
handler
拒绝策略,当线程池中的线程超出最大线程数maximumPoolSize的时候,继续提交任务,会触发拒绝策略。在JDK中有四种拒绝策略的核心类:
(1)AbortPolicy:直接抛出异常,默认策略;
(2)CallerRunsPolicy:用调用者所在的线程来执行任务;
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务;
线程池的工作机制
(1)如果运行的线程池少于核心线程数,提交任务的时候将创建新的线程来执行任务。
(2)如果运行的线程数等于核心线程数,提交任务的时候将会把任务存放到阻塞队列中(BlockingQueue)。
(3)如果阻塞队列满的时候,无法再添加,则创建新的线程来执行任务。
(4)如果创建的线程超过最大线程数,将会被拒绝。调用RejectedExecutionHandler.rejectedExecution()方法
提交任务
execute提交任务,不需要任务返回值。所以无法判断任务是否被线程池执行成功
submit提交任务,有返回值,返回一个future类型的对象。可以通过future对象判断任务是否执行成功,通过future的get方法,获取任务的返回值。在调用get方法的时候,会阻塞当前线程,直到任务完成为止。也可以在get方法设置返回时间。
关闭线程池
可以通过shutdown和shutdownNow来关闭线程池。
合理的配置线程池
合理配置线程池之前首先要分析任务的特性:
(1)CPU密集型
核心线程数 = Ncpu+1,
(2)IO密集型
核心线程数 = 2*Ncpu
(3)混合型
可以根据不同的任务类型对线程池进行拆分多个。
ThreadPoolExecutor源码分析
- 构造方法
传入核心线程数,阻塞队列等参数,设置到当前成员
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- 提交任务
根据设置的核心线程数和阻塞队列判断是否要调用addWorker添加任务还是reject拒绝任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- 添加任务
private boolean addWorker(Runnable firstTask, boolean core) {
.......
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
(1)w = new Worker(firstTask);
创建一个Worker对象,在Worker构造方法中创建一个线程,传入Worker当前对象,Worker实现了Runnable接口
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
(2)添加Worker到HashSet中(HashSet<Worker> workers),添加成功则启动woker中的线程
workers.add(w);
....
if (workerAdded) {
t.start();
workerStarted = true;
}
因为Worker实现了Runnable接口,并且在创建线程的时候传入了当前Worker对象,所以启动线程的时候调用了Worker中的run方法
public void run() {
runWorker(this);
}
接着调用runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法中,开启了一个while 循环,获取第一次传进的任务或者是
getTask(),阻塞队列中的任务。然后调用 task.run();这样就回调到了我们提交任务的时候实现的run方法。
源码小结
在ThreadPoolExecutor中用一个HashSet用来存放woker,而woker的构造方法中创建了线程,因此可以理解为woker存放了核心线程。当HashSet的长度等于核心线程数的时候,则将任务提交到阻塞队列。
在执行任务的时候获取当前提交的task或者从阻塞队列中取出task执行。
网友评论