美文网首页
FixedThreadPool

FixedThreadPool

作者: 程序员札记 | 来源:发表于2022-03-08 08:28 被阅读0次

    java.util.concurrent.Executors

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        }
    
    • 核心线程数== 最大线程数(没有救急线程)
    • 队列无界的,可以放任意多数量任务

    适合:任务量已知,相对耗时的任务

    简要说明,FixedThreadPool,也就是可重用固定线程数的线程池。 它corePoolSize和 maximumPoolSize是一样的。并且他的keepAliveTime=0, 也就是当线程池中的线程数大于corePoolSize, 多余的空闲线程会被立即终止。

    它的基本执行过程如下
    1, 如果当前运行的线程数少于corePoolSize, 会立刻创建新线程执行任务。
    2,当线程数到达corePoolSize后,将任务加入到LinkedBlockingQueue中。
    3,当线程执行完任务后,会循环从LinkedBlockingQueue中获取任务来执行。


    image.png

    FixedThreadPool使用了LinkedBlockingQueue, 也就是无界队列(队列最大可容纳Integer.MAX_VALUE), 因此会造成以下影响:
    a, 线程池线程数到达corePoolSize后,任务会被存放在LinkedBlockingQueue中
    b, 因为无界队列,运行中(未调用shutdown()或者shutdownNow()方法)的不会拒绝任务(队列无界,可以放入"无限"任务)

    LinkedBlockingQueue

    很多人问为啥不用ConcurrentLinkedQueue,这里用了LinkedBlockingQueue。主要是因为LinkedBlockingQueue 是阻塞算法, 实现起来比较简单。

    有时候我们把并发包下面的所有容器都习惯叫作并发容器,但是严格来讲,类似 ConcurrentLinkedQueue 这种“Concurrent”容器,才是真正代表并发。不知道你有没有注意到,java.util.concurrent 包提供的容器(Queue、List、Set)、Map,从命名上可以大概区分为 Concurrent、CopyOnWrite和 Blocking等三类,同样是线程安全容器,可以简单认为:

    • Concurrent 类型基于 lock-free,在常见的多线程访问场景,一般可以提供较高吞吐量。
    • Blocking 内部则是基于锁,并提供了 BlockingQueue 的等待性方法。
    • CopyonWrite 是基于复制。

    关于ConcurrentLinkedQueue和LinkedBlockingQueue:

    • LinkedBlockingQueue是使用锁机制,ConcurrentLinkedQueue是使用CAS算法,虽然LinkedBlockingQueue的底层获取锁也是使用的CAS算法
    • 关于取元素,ConcurrentLinkedQueue不支持阻塞去取元素,LinkedBlockingQueue支持阻塞的take()方法,如若大家需要ConcurrentLinkedQueue的消费者产生阻塞效果,需要自行实现
    • 关于插入元素的性能,从字面上和代码简单的分析来看ConcurrentLinkedQueue肯定是最快的,但是这个也要看具体的测试场景,我做了两个简单的demo做测试,测试的结果如下,两个的性能差不多,但在实际的使用过程中,尤其在多cpu的服务器上,有锁和无锁的差距便体现出来了,ConcurrentLinkedQueue会比LinkedBlockingQueue快很多:
    package com.dxz.queue.linked;
    
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class ProducerConcurrnetLinkedQueue implements Runnable{  
          
        //容器  
        private final ConcurrentLinkedQueue<Bread> queue;
        private final CountDownLatch cdl;
          
        public ProducerConcurrnetLinkedQueue(ConcurrentLinkedQueue<Bread> queue, CountDownLatch cdl){  
            this.queue = queue;  
            this.cdl = cdl;
        }  
      
        @Override  
        public void run() {  
            for(int i=0;i<100000; i++){  
                produce(i);  
            }
            cdl.countDown();
        }  
          
        public void produce(int i){  
            /** 
             * put()方法是如果容器满了的话就会把当前线程挂起 
             * offer()方法是容器如果满的话就会返回false。 
             */  
            try {  
                Bread bread = new Bread();
                bread.setName(""+i);
                queue.offer(bread);
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
        }  
    }  
    
    package com.dxz.queue.linked;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class ClientPut {  
      
        public static void main(String[] args) throws InterruptedException {  
            int capacity = 9000000;
            //testArray(capacity);    //put in ArrayBlockingQueue size:=1000000,use time:=624
            //testLinked(capacity);    //put in LinkedBlockingQueue size:=1000000,use time:=289
            testConcurrentLinked(); //put in ConcurrentLinkedQueue size:=1000000,use time:=287
            
        }
        
        private static void testArray(int capacity) throws InterruptedException {
            ArrayBlockingQueue<Bread> queue = new ArrayBlockingQueue<Bread>(capacity);  
            CountDownLatch cdl = new CountDownLatch(10);
            ExecutorService es = Executors.newFixedThreadPool(10);
            long start = System.currentTimeMillis();
            for(int i = 0; i < 10;i++) {
                es.submit(new ProducerArray(queue, cdl));
            }
            cdl.await();
            long end = System.currentTimeMillis();
            es.shutdown();
            System.out.println("put in ArrayBlockingQueue size:="+queue.size() +",use time:="+(end-start));
        }
    
        private static void testLinked(int capacity) throws InterruptedException {
            LinkedBlockingQueue<Bread> queue = new LinkedBlockingQueue<Bread>(capacity);
            CountDownLatch cdl = new CountDownLatch(10);
            ExecutorService es = Executors.newFixedThreadPool(10);
            long start = System.currentTimeMillis();
            for(int i = 0; i < 10;i++) {
                es.submit(new ProducerLinked(queue, cdl));
            }
            cdl.await();
            long end = System.currentTimeMillis();
            es.shutdown();
            System.out.println("put in LinkedBlockingQueue size:="+queue.size() +",use time:="+(end-start));
        }
        
        private static void testConcurrentLinked() throws InterruptedException {
            ConcurrentLinkedQueue<Bread> queue = new ConcurrentLinkedQueue<Bread>();
            CountDownLatch cdl = new CountDownLatch(10);
            ExecutorService es = Executors.newFixedThreadPool(10);
            long start = System.currentTimeMillis();
            for(int i = 0; i < 10;i++) {
                es.submit(new ProducerConcurrnetLinkedQueue(queue, cdl));
            }
            cdl.await();
            long end = System.currentTimeMillis();
            es.shutdown();
            System.out.println("put in ConcurrentLinkedQueue size:="+queue.size() +",use time:="+(end-start));
        }
      
    }
    
    

    可能的问题

    因为使用了 newFixedThreadPool 线程池,而它的工作机制是,固定了N个线程,而提交给线程池的任务队列是不限制大小的,如果Kafka发消息被阻塞或者变慢,那么显然队列里面的内容会越来越多,但是永远不会触及到reject handler。造成OOM。

    比如下面一段代码

    private static ExecutorService executor = Executors.newFixedThreadPool(10);
    
    public void push2Kafka(String message){
      executor.execute(new KafkaMessageWriteTask(message));
    
    }
    
    

    这段代码的功能是:每次线上调用,都会把计算结果的日志打到 Kafka,Kafka消费方再继续后续的逻辑。从表面上看,这个代码是没有任何问题,但是如果kafka 出现连接问题,Kafka发消息被阻塞或者变慢,那么显然[队列]里面的内容会越来越多,也就会导致线程的积压问题。

    相关文章

      网友评论

          本文标题:FixedThreadPool

          本文链接:https://www.haomeiwen.com/subject/svoorrtx.html