美文网首页
十一、Java高级特性(阻塞队列和线程池)

十一、Java高级特性(阻塞队列和线程池)

作者: 大虾啊啊啊 | 来源:发表于2021-05-31 11:36 被阅读0次

    一、概念、生产者消费者模式

    • 队列
      先进先出的一个数据结构
    • 阻塞队列
      (1)当队列为空的时候,从里面取数据的动作会被阻塞。
      (2)当队列满的时候,往里面放元素的动作会被阻塞。
    • 生产者消费者模式
      在生产者和消费者模式之间插入一个阻塞队列。生产者生产的东西放到队列容器中,而消费者直接从队列中取出东西进行消费。这样可以使得生产者和消费者之间的解耦,也可以使得生产者和消费者的性能的均衡,避免生产者生产东西过快,一直等待消费者消费,或者消费者消费过快等待生产者生产东西。

    二、JDK中实现的阻塞队列

    在JDK中BlockingQueue类封装了阻塞队列的接口
    其中包含了几个核心的方法:

    • add和remove方法,非阻塞方法
      当队列满的时候往里面add数据,会抛出异常,当队列为空的时候从里面remove数据,会抛出异常。
        boolean add(E e);
        boolean remove(Object o);
    
    • offer和poll方法,非阻塞方法
      offer:往队列中里插入一个元素,当队列满的时候,返回一个false,
      poll:从队列取出一个元素,当队列为空的时候,返回一个null
        boolean offer(E e);
        E poll(long timeout, TimeUnit unit)
            throws InterruptedException;
    
    
    • take和put方法,真正意义上的阻塞方法
      take:当队列为空的时候,从队列中取元素,会被阻塞
      put:当队列满的时候,往队列中插入元素,会被阻塞
    E take() throws InterruptedException;
    void put(E e) throws InterruptedException;
    

    三、JDK中常用的阻塞队列

    • ArrayBlockingQueue
      是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。初始化时有参数可以设置

    • LinkedBlockingQueue
      是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

    • PriorityBlockingQueue
      PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

    • DelayQueue
      是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

    • SynchronousQueue:一个不存储元素的阻塞队列。

    • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

    • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

    以上的阻塞队列都实现了BlockingQueue接口,也都是线程安全的。

    在使用阻塞队列的时候,尽量使用有界的阻塞队列,有界的阻塞队列,规定了最大容量,当队列满的时候,往里面插入元素,会被阻塞。而无界的阻塞队列,可以一直往里插入元素,会占用内存,最终总会使得内存溢出。

    缓存系统的设计

    可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue。一旦可以从队列中查到元素,表示缓存有效期到了。

    四、线程池

    Java中的线程池是运用场景最多的并发框架。合理的使用线程池能够带来以下3个好处:
    (1)降低资源的消耗:通过重复利用线程池中的线程,降低线程创建和销毁带来的开销。
    (2)提高响应速度:当提交一个任务的时候,不需要等待线程的创建就能立即执行。
    (3)提高线程的可管理性:在操作系统中,线程是非常稀缺的资源,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性。

    1、JDK中线程池的实现

    在JDK中使用ThreadPoolExecutor 作为线程池的核心类,用来执行被提交的任务。

    ThreadPoolExecutor 的使用举例
    package com.it.test.thread.consumer_product.retranlock;
    
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class LockTest {
        public static void main(String[] args) {
            LinkedBlockingQueue queue = new LinkedBlockingQueue();
            ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 30, TimeUnit.MINUTES, queue, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r);
                }
            }, new ThreadPoolExecutor.DiscardOldestPolicy());
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+":你好");
                }
            });
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+":世界");
                }
            });
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+":hello");
    
                }
            });
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+":world");
                }
            });
            System.out.println("阻塞队列的长度:"+queue.size());
        }
    }
    
    
    pool-1-thread-1:你好
    pool-1-thread-2:世界
    pool-1-thread-2:world
    pool-1-thread-1:hello
    阻塞队列的长度:2
    
    ThreadPoolExecutor 类的解析

    (1)构造方法
    构造方法中包含了7个核心参数

      public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) 
    
    • corePoolSize
      核心线程数,当提交一个任务的时候,默认情况下线程池会新建一个线程执行任务,直到线程数等于核心线程数。
      如果当前线程数为corePoolSize(核心线程数),继续提交任务的时候,会被保存到阻塞队列中等待被执行。
      如果执行了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有的核心线程。

    • maximumPoolSize
      线程池允许的最大线程数,如果当阻塞队列中的任务满的时候,继续提交任务,线程池会继续创建新的线程执行任务。前提是当前线程数要小于等于最大线程数。

    • keepAliveTime
      线程空闲的时候,存活的时间。即当没有任务执行的时候,线程继续存活的时间。默认情况下该参数只有在线程数大于核心线程数才有用

    • TimeUnit
      时间单位

    • workQueue
      存放任务的BlockingQueue阻塞队列,如果当前线程数量等于核心线程数,继续提交任务的时候,会将任务存放到阻塞队列,等待被执行。

    • threadFactory
      线程池中创建线程的工厂,一般只是对线程做一些策略,例如自定义线程名字等等

    • handler
      拒绝策略,当线程池中的线程超出最大线程数maximumPoolSize的时候,继续提交任务,会触发拒绝策略。在JDK中有四种拒绝策略的核心类:

    (1)AbortPolicy:直接抛出异常,默认策略;
    (2)CallerRunsPolicy:用调用者所在的线程来执行任务;
    (3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    (4)DiscardPolicy:直接丢弃任务;

    线程池的工作机制

    (1)如果运行的线程池少于核心线程数,提交任务的时候将创建新的线程来执行任务。
    (2)如果运行的线程数等于核心线程数,提交任务的时候将会把任务存放到阻塞队列中(BlockingQueue)。
    (3)如果阻塞队列满的时候,无法再添加,则创建新的线程来执行任务。
    (4)如果创建的线程超过最大线程数,将会被拒绝。调用RejectedExecutionHandler.rejectedExecution()方法

    提交任务

    execute提交任务,不需要任务返回值。所以无法判断任务是否被线程池执行成功
    submit提交任务,有返回值,返回一个future类型的对象。可以通过future对象判断任务是否执行成功,通过future的get方法,获取任务的返回值。在调用get方法的时候,会阻塞当前线程,直到任务完成为止。也可以在get方法设置返回时间。

    关闭线程池

    可以通过shutdown和shutdownNow来关闭线程池。

    合理的配置线程池

    合理配置线程池之前首先要分析任务的特性:
    (1)CPU密集型
    核心线程数 = Ncpu+1,
    (2)IO密集型
    核心线程数 = 2*Ncpu
    (3)混合型
    可以根据不同的任务类型对线程池进行拆分多个。

    ThreadPoolExecutor源码分析
    • 构造方法
      传入核心线程数,阻塞队列等参数,设置到当前成员
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    • 提交任务
      根据设置的核心线程数和阻塞队列判断是否要调用addWorker添加任务还是reject拒绝任务
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
    
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    
    • 添加任务
    private boolean addWorker(Runnable firstTask, boolean core) {
          .......
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int c = ctl.get();
    
                        if (isRunning(c) ||
                            (runStateLessThan(c, STOP) && firstTask == null)) {
                            if (t.getState() != Thread.State.NEW)
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            workerAdded = true;
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    (1)w = new Worker(firstTask);
    创建一个Worker对象,在Worker构造方法中创建一个线程,传入Worker当前对象,Worker实现了Runnable接口

       Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    

    (2)添加Worker到HashSet中(HashSet<Worker> workers),添加成功则启动woker中的线程

     workers.add(w);
    ....
      if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
    

    因为Worker实现了Runnable接口,并且在创建线程的时候传入了当前Worker对象,所以启动线程的时候调用了Worker中的run方法

      public void run() {
                runWorker(this);
            }
    

    接着调用runWorker方法

       final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        try {
                            task.run();
                            afterExecute(task, null);
                        } catch (Throwable ex) {
                            afterExecute(task, ex);
                            throw ex;
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    runWorker方法中,开启了一个while 循环,获取第一次传进的任务或者是
    getTask(),阻塞队列中的任务。然后调用 task.run();这样就回调到了我们提交任务的时候实现的run方法。

    源码小结

    在ThreadPoolExecutor中用一个HashSet用来存放woker,而woker的构造方法中创建了线程,因此可以理解为woker存放了核心线程。当HashSet的长度等于核心线程数的时候,则将任务提交到阻塞队列。
    在执行任务的时候获取当前提交的task或者从阻塞队列中取出task执行。

    相关文章

      网友评论

          本文标题:十一、Java高级特性(阻塞队列和线程池)

          本文链接:https://www.haomeiwen.com/subject/bebvsltx.html