java.util.concurrent.Executors
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
- 核心线程数== 最大线程数(没有救急线程)
- 队列无界的,可以放任意多数量任务
适合:任务量已知,相对耗时的任务
简要说明,FixedThreadPool,也就是可重用固定线程数的线程池。 它corePoolSize和 maximumPoolSize是一样的。并且他的keepAliveTime=0, 也就是当线程池中的线程数大于corePoolSize, 多余的空闲线程会被立即终止。
它的基本执行过程如下
1, 如果当前运行的线程数少于corePoolSize, 会立刻创建新线程执行任务。
2,当线程数到达corePoolSize后,将任务加入到LinkedBlockingQueue中。
3,当线程执行完任务后,会循环从LinkedBlockingQueue中获取任务来执行。
image.png
FixedThreadPool使用了LinkedBlockingQueue, 也就是无界队列(队列最大可容纳Integer.MAX_VALUE), 因此会造成以下影响:
a, 线程池线程数到达corePoolSize后,任务会被存放在LinkedBlockingQueue中
b, 因为无界队列,运行中(未调用shutdown()或者shutdownNow()方法)的不会拒绝任务(队列无界,可以放入"无限"任务)
LinkedBlockingQueue
很多人问为啥不用ConcurrentLinkedQueue,这里用了LinkedBlockingQueue。主要是因为LinkedBlockingQueue 是阻塞算法, 实现起来比较简单。
有时候我们把并发包下面的所有容器都习惯叫作并发容器,但是严格来讲,类似 ConcurrentLinkedQueue 这种“Concurrent”容器,才是真正代表并发。不知道你有没有注意到,java.util.concurrent 包提供的容器(Queue、List、Set)、Map,从命名上可以大概区分为 Concurrent、CopyOnWrite和 Blocking等三类,同样是线程安全容器,可以简单认为:
- Concurrent 类型基于 lock-free,在常见的多线程访问场景,一般可以提供较高吞吐量。
- Blocking 内部则是基于锁,并提供了 BlockingQueue 的等待性方法。
- CopyonWrite 是基于复制。
关于ConcurrentLinkedQueue和LinkedBlockingQueue:
- LinkedBlockingQueue是使用锁机制,ConcurrentLinkedQueue是使用CAS算法,虽然LinkedBlockingQueue的底层获取锁也是使用的CAS算法
- 关于取元素,ConcurrentLinkedQueue不支持阻塞去取元素,LinkedBlockingQueue支持阻塞的take()方法,如若大家需要ConcurrentLinkedQueue的消费者产生阻塞效果,需要自行实现
- 关于插入元素的性能,从字面上和代码简单的分析来看ConcurrentLinkedQueue肯定是最快的,但是这个也要看具体的测试场景,我做了两个简单的demo做测试,测试的结果如下,两个的性能差不多,但在实际的使用过程中,尤其在多cpu的服务器上,有锁和无锁的差距便体现出来了,ConcurrentLinkedQueue会比LinkedBlockingQueue快很多:
package com.dxz.queue.linked;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerConcurrnetLinkedQueue implements Runnable{
//容器
private final ConcurrentLinkedQueue<Bread> queue;
private final CountDownLatch cdl;
public ProducerConcurrnetLinkedQueue(ConcurrentLinkedQueue<Bread> queue, CountDownLatch cdl){
this.queue = queue;
this.cdl = cdl;
}
@Override
public void run() {
for(int i=0;i<100000; i++){
produce(i);
}
cdl.countDown();
}
public void produce(int i){
/**
* put()方法是如果容器满了的话就会把当前线程挂起
* offer()方法是容器如果满的话就会返回false。
*/
try {
Bread bread = new Bread();
bread.setName(""+i);
queue.offer(bread);
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.dxz.queue.linked;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class ClientPut {
public static void main(String[] args) throws InterruptedException {
int capacity = 9000000;
//testArray(capacity); //put in ArrayBlockingQueue size:=1000000,use time:=624
//testLinked(capacity); //put in LinkedBlockingQueue size:=1000000,use time:=289
testConcurrentLinked(); //put in ConcurrentLinkedQueue size:=1000000,use time:=287
}
private static void testArray(int capacity) throws InterruptedException {
ArrayBlockingQueue<Bread> queue = new ArrayBlockingQueue<Bread>(capacity);
CountDownLatch cdl = new CountDownLatch(10);
ExecutorService es = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
for(int i = 0; i < 10;i++) {
es.submit(new ProducerArray(queue, cdl));
}
cdl.await();
long end = System.currentTimeMillis();
es.shutdown();
System.out.println("put in ArrayBlockingQueue size:="+queue.size() +",use time:="+(end-start));
}
private static void testLinked(int capacity) throws InterruptedException {
LinkedBlockingQueue<Bread> queue = new LinkedBlockingQueue<Bread>(capacity);
CountDownLatch cdl = new CountDownLatch(10);
ExecutorService es = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
for(int i = 0; i < 10;i++) {
es.submit(new ProducerLinked(queue, cdl));
}
cdl.await();
long end = System.currentTimeMillis();
es.shutdown();
System.out.println("put in LinkedBlockingQueue size:="+queue.size() +",use time:="+(end-start));
}
private static void testConcurrentLinked() throws InterruptedException {
ConcurrentLinkedQueue<Bread> queue = new ConcurrentLinkedQueue<Bread>();
CountDownLatch cdl = new CountDownLatch(10);
ExecutorService es = Executors.newFixedThreadPool(10);
long start = System.currentTimeMillis();
for(int i = 0; i < 10;i++) {
es.submit(new ProducerConcurrnetLinkedQueue(queue, cdl));
}
cdl.await();
long end = System.currentTimeMillis();
es.shutdown();
System.out.println("put in ConcurrentLinkedQueue size:="+queue.size() +",use time:="+(end-start));
}
}
可能的问题
因为使用了 newFixedThreadPool 线程池,而它的工作机制是,固定了N个线程,而提交给线程池的任务队列是不限制大小的,如果Kafka发消息被阻塞或者变慢,那么显然队列里面的内容会越来越多,但是永远不会触及到reject handler。造成OOM。
比如下面一段代码
private static ExecutorService executor = Executors.newFixedThreadPool(10);
public void push2Kafka(String message){
executor.execute(new KafkaMessageWriteTask(message));
}
这段代码的功能是:每次线上调用,都会把计算结果的日志打到 Kafka,Kafka消费方再继续后续的逻辑。从表面上看,这个代码是没有任何问题,但是如果kafka 出现连接问题,Kafka发消息被阻塞或者变慢,那么显然[队列]里面的内容会越来越多,也就会导致线程的积压问题。
网友评论