美文网首页
十一、Java高级特性(阻塞队列和线程池)

十一、Java高级特性(阻塞队列和线程池)

作者: 大虾啊啊啊 | 来源:发表于2021-05-31 11:36 被阅读0次

一、概念、生产者消费者模式

  • 队列
    先进先出的一个数据结构
  • 阻塞队列
    (1)当队列为空的时候,从里面取数据的动作会被阻塞。
    (2)当队列满的时候,往里面放元素的动作会被阻塞。
  • 生产者消费者模式
    在生产者和消费者模式之间插入一个阻塞队列。生产者生产的东西放到队列容器中,而消费者直接从队列中取出东西进行消费。这样可以使得生产者和消费者之间的解耦,也可以使得生产者和消费者的性能的均衡,避免生产者生产东西过快,一直等待消费者消费,或者消费者消费过快等待生产者生产东西。

二、JDK中实现的阻塞队列

在JDK中BlockingQueue类封装了阻塞队列的接口
其中包含了几个核心的方法:

  • add和remove方法,非阻塞方法
    当队列满的时候往里面add数据,会抛出异常,当队列为空的时候从里面remove数据,会抛出异常。
    boolean add(E e);
    boolean remove(Object o);
  • offer和poll方法,非阻塞方法
    offer:往队列中里插入一个元素,当队列满的时候,返回一个false,
    poll:从队列取出一个元素,当队列为空的时候,返回一个null
    boolean offer(E e);
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

  • take和put方法,真正意义上的阻塞方法
    take:当队列为空的时候,从队列中取元素,会被阻塞
    put:当队列满的时候,往队列中插入元素,会被阻塞
E take() throws InterruptedException;
void put(E e) throws InterruptedException;

三、JDK中常用的阻塞队列

  • ArrayBlockingQueue
    是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。初始化时有参数可以设置

  • LinkedBlockingQueue
    是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

  • PriorityBlockingQueue
    PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

  • DelayQueue
    是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

  • SynchronousQueue:一个不存储元素的阻塞队列。

  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

以上的阻塞队列都实现了BlockingQueue接口,也都是线程安全的。

在使用阻塞队列的时候,尽量使用有界的阻塞队列,有界的阻塞队列,规定了最大容量,当队列满的时候,往里面插入元素,会被阻塞。而无界的阻塞队列,可以一直往里插入元素,会占用内存,最终总会使得内存溢出。

缓存系统的设计

可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue。一旦可以从队列中查到元素,表示缓存有效期到了。

四、线程池

Java中的线程池是运用场景最多的并发框架。合理的使用线程池能够带来以下3个好处:
(1)降低资源的消耗:通过重复利用线程池中的线程,降低线程创建和销毁带来的开销。
(2)提高响应速度:当提交一个任务的时候,不需要等待线程的创建就能立即执行。
(3)提高线程的可管理性:在操作系统中,线程是非常稀缺的资源,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性。

1、JDK中线程池的实现

在JDK中使用ThreadPoolExecutor 作为线程池的核心类,用来执行被提交的任务。

ThreadPoolExecutor 的使用举例
package com.it.test.thread.consumer_product.retranlock;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class LockTest {
    public static void main(String[] args) {
        LinkedBlockingQueue queue = new LinkedBlockingQueue();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 30, TimeUnit.MINUTES, queue, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        }, new ThreadPoolExecutor.DiscardOldestPolicy());
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":你好");
            }
        });
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":世界");
            }
        });
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":hello");

            }
        });
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":world");
            }
        });
        System.out.println("阻塞队列的长度:"+queue.size());
    }
}

pool-1-thread-1:你好
pool-1-thread-2:世界
pool-1-thread-2:world
pool-1-thread-1:hello
阻塞队列的长度:2
ThreadPoolExecutor 类的解析

(1)构造方法
构造方法中包含了7个核心参数

  public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
  • corePoolSize
    核心线程数,当提交一个任务的时候,默认情况下线程池会新建一个线程执行任务,直到线程数等于核心线程数。
    如果当前线程数为corePoolSize(核心线程数),继续提交任务的时候,会被保存到阻塞队列中等待被执行。
    如果执行了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有的核心线程。

  • maximumPoolSize
    线程池允许的最大线程数,如果当阻塞队列中的任务满的时候,继续提交任务,线程池会继续创建新的线程执行任务。前提是当前线程数要小于等于最大线程数。

  • keepAliveTime
    线程空闲的时候,存活的时间。即当没有任务执行的时候,线程继续存活的时间。默认情况下该参数只有在线程数大于核心线程数才有用

  • TimeUnit
    时间单位

  • workQueue
    存放任务的BlockingQueue阻塞队列,如果当前线程数量等于核心线程数,继续提交任务的时候,会将任务存放到阻塞队列,等待被执行。

  • threadFactory
    线程池中创建线程的工厂,一般只是对线程做一些策略,例如自定义线程名字等等

  • handler
    拒绝策略,当线程池中的线程超出最大线程数maximumPoolSize的时候,继续提交任务,会触发拒绝策略。在JDK中有四种拒绝策略的核心类:

(1)AbortPolicy:直接抛出异常,默认策略;
(2)CallerRunsPolicy:用调用者所在的线程来执行任务;
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务;

线程池的工作机制

(1)如果运行的线程池少于核心线程数,提交任务的时候将创建新的线程来执行任务。
(2)如果运行的线程数等于核心线程数,提交任务的时候将会把任务存放到阻塞队列中(BlockingQueue)。
(3)如果阻塞队列满的时候,无法再添加,则创建新的线程来执行任务。
(4)如果创建的线程超过最大线程数,将会被拒绝。调用RejectedExecutionHandler.rejectedExecution()方法

提交任务

execute提交任务,不需要任务返回值。所以无法判断任务是否被线程池执行成功
submit提交任务,有返回值,返回一个future类型的对象。可以通过future对象判断任务是否执行成功,通过future的get方法,获取任务的返回值。在调用get方法的时候,会阻塞当前线程,直到任务完成为止。也可以在get方法设置返回时间。

关闭线程池

可以通过shutdown和shutdownNow来关闭线程池。

合理的配置线程池

合理配置线程池之前首先要分析任务的特性:
(1)CPU密集型
核心线程数 = Ncpu+1,
(2)IO密集型
核心线程数 = 2*Ncpu
(3)混合型
可以根据不同的任务类型对线程池进行拆分多个。

ThreadPoolExecutor源码分析
  • 构造方法
    传入核心线程数,阻塞队列等参数,设置到当前成员
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • 提交任务
    根据设置的核心线程数和阻塞队列判断是否要调用addWorker添加任务还是reject拒绝任务
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
  • 添加任务
private boolean addWorker(Runnable firstTask, boolean core) {
      .......
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        workerAdded = true;
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

(1)w = new Worker(firstTask);
创建一个Worker对象,在Worker构造方法中创建一个线程,传入Worker当前对象,Worker实现了Runnable接口

   Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

(2)添加Worker到HashSet中(HashSet<Worker> workers),添加成功则启动woker中的线程

 workers.add(w);
....
  if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }

因为Worker实现了Runnable接口,并且在创建线程的时候传入了当前Worker对象,所以启动线程的时候调用了Worker中的run方法

  public void run() {
            runWorker(this);
        }

接着调用runWorker方法

   final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker方法中,开启了一个while 循环,获取第一次传进的任务或者是
getTask(),阻塞队列中的任务。然后调用 task.run();这样就回调到了我们提交任务的时候实现的run方法。

源码小结

在ThreadPoolExecutor中用一个HashSet用来存放woker,而woker的构造方法中创建了线程,因此可以理解为woker存放了核心线程。当HashSet的长度等于核心线程数的时候,则将任务提交到阻塞队列。
在执行任务的时候获取当前提交的task或者从阻塞队列中取出task执行。

相关文章

  • 十一、Java高级特性(阻塞队列和线程池)

    一、概念、生产者消费者模式 队列先进先出的一个数据结构 阻塞队列(1)当队列为空的时候,从里面取数据的动作会被阻塞...

  • 线程池

    [TOC] 线程池 1. 并发队列:阻塞队列和非阻塞队列 区别如下: 入队: 非阻塞队列:当队列中满了的时候,放入...

  • 线程池

    线程池执行过程 线程池生命周期 线程池分类 阻塞队列 拒绝策略 - ThreadPoolExecutor.Abor...

  • 线程池

    说线程池前 先来了解一下 阻塞队列是什么;大致上说下自己的理解阻塞队列 java中 有这么几种 ArrayBloc...

  • ReentranLock底层原理分析

    1 J.U.C简介 java.util.concurrent 简称,java并发工具包 线程池 阻塞队列 计时器/...

  • Java线程池实现原理详解

    原理概述 其实java线程池的实现原理很简单,说白了就是一个线程集合workerSet和一个阻塞队列workQue...

  • JAVA线程池(转)

    原理概述 其实java线程池的实现原理很简单,说白了就是一个线程集合workerSet和一个阻塞队列workQue...

  • Java线程池的实现原理,你清楚么?

    原理概述 其实java线程池的实现原理很简单,说白了就是一个线程集合workerSet和一个阻塞队列workQue...

  • 队列

    队列特性 对比队列和栈 基于数组的队列 对比队列学习循环队列 循环队列难点 阻塞队列 并发队列 应用:线程池中拒绝...

  • 阻塞队列和线程池

    1.阻塞队列 1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。2)支持阻塞的移除...

网友评论

      本文标题:十一、Java高级特性(阻塞队列和线程池)

      本文链接:https://www.haomeiwen.com/subject/bebvsltx.html