美文网首页线程池
ReentrantLock限制线程池队列大小

ReentrantLock限制线程池队列大小

作者: 爱吃鱼aichiyu | 来源:发表于2018-01-25 13:56 被阅读6次

关于线程池介绍,我不在此赘叙,请参考https://www.jianshu.com/p/ade771d2c9c0
线程池中queue一般设置大小默认是Integer.MAX_VALUE,如果设置了大小,就必须实现一个丢弃策略,而默认的丢弃策略居然是抛异常。

    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

当任务量超大,内存被撑满造成宕机,会导致所有的任务都丢失了。当然,可以使用MQ来解决类似的问题。在此我们只讨论使用线程池本身来解决。
那能不能人为控制队列大小,当队列达到该值,就不再往线程池队列里提交任务呢?以下采用ReentrantLock可重入锁机制来实现

/**
 * Created on 2018/1/22 16:29
 * <p>
 * Description: [测试控制线程池队列大小]
 * <p>
 * Company: [xxxx]
 *
 * @author [aichiyu]
 */
public class TestLockPool {

    private int maxSize = 100 ;

    private final ReentrantLock lock = new ReentrantLock();
    private List<Condition> list = new LinkedList<>();

    private ThreadPoolExecutor executor =new ThreadPoolExecutor(20, 100,
                                      60L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());

    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);


    public void init(){
        scheduledExecutorService.scheduleAtFixedRate(()->{
            int queueSize = executor.getQueue().size();
            //每秒检查一次,当队列中任务被执行完就解锁一批任务,继续往队列中加
            if( queueSize < maxSize * 0.8 && list.size() > 0  ){
                System.out.println("unlock !!~~");
                lock.lock();
                int i = 0 ;
                Iterator<Condition> iterator = list.iterator();
                while (i < maxSize-queueSize && iterator.hasNext()){
                    iterator.next().signal();
                    iterator.remove();
                    i++;
                }
                System.out.println("signal over!!~~,num="+(i));
                lock.unlock();
            }
        },1,1, TimeUnit.SECONDS);
    }

    private void consume(){

        try {
            //当队列大小超过限制,阻塞当前线程,等待队列空闲
            if(executor.getQueue().size() >= maxSize ){
                System.out.println(Thread.currentThread()+" wait !!~"+"pool queue size = "+executor.getQueue().size());
                lock.lock();
                Condition condition = lock.newCondition();
                list.add(condition);
                condition.await();
                System.out.println(Thread.currentThread()+"wait over!~~");
                lock.unlock();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executor.submit(()->{
            System.out.println(Thread.currentThread()+" execute !!~~"+"pool queue size = "+executor.getQueue().size());


            try {
                //模拟任务阻塞
                Thread.sleep(2500L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    public static void main(String[] args)  {
        TestLockPool testLock = new TestLockPool();
        testLock.init();
         ExecutorService service = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 200; i++) {
            service.submit(()->testLock.consume());
        }

        System.out.println("main over!~");
    }

}

相关文章

  • ReentrantLock限制线程池队列大小

    关于线程池介绍,我不在此赘叙,请参考https://www.jianshu.com/p/ade771d2c9c0线...

  • 线程池工具

    功能简介 固定线程、限制最大队列长度的自定义线程池; 定制线程池加载任务、子线程各种参数,如分页大小、是否子线程出...

  • Java 线程池

    组成: 核心池 队列 主要参数: 核心池大小; 最大线程数; 队列大小; 活动保持时间;(...

  • java中的线程池

    线程池创建的参数 线程池的基本大小(核心线程数) 任务队列(ArrayBlockingQueue/LinkedBl...

  • 简易线程池的实现

    构成线程池的基本元素 线程池中的线程 任务队列 生产者 消费者 线程池 消费者 生产者 问题 任务队列的大小:如果...

  • Android线程池使用

    一:无大小限制的线程池执行效果如下 二:限制按顺序来执行任务的线程池效果如下 四:按指定个数来执行任务的线程池效果...

  • Thread

    队列 线程锁 多线程,线程池 队列 多线程爬虫示例 多线程 自定义线程 线程池

  • 线程池工作机制与原理

    书接上文, Java线程池 。接下来记录一下线程池的工作机制和原理 线程池的两个核心队列: 线程等待池,即线程队列...

  • spring异步任务

    注意:此异步的默认配置线程池的大小和队列的大小是Integer.MaxSize,建议重新设置。 TaskExecu...

  • C++11 ThreadPool的应用

    线程池的应用 代码结构 任务队列 线程池 工作线程 代码如下

网友评论

  • 小鱼嘻嘻:干嘛要用LinkedBlockingQueue 还有觉得你这么实现意义不是很大。
    爱吃鱼aichiyu:@小鱼嘻嘻 是的,所以我才想用锁去控制队列的大小。这段代码仅供测试探讨,并未上生产实践
    小鱼嘻嘻:@刘辉_wh 无界队列最严重的就是无界,很容易CPU打满
    爱吃鱼aichiyu:谢评论。我看了下其他几种队列,使用ArrayBlockingQueue一样面临超过队列大小,使用何种丢弃策略的问题。而SynchronousQueue基本没有大小同步执行,PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序,且他的大小如果不够是会自动扩大。

本文标题:ReentrantLock限制线程池队列大小

本文链接:https://www.haomeiwen.com/subject/curvaxtx.html