美文网首页
探讨阻塞队列和线程池源码

探讨阻塞队列和线程池源码

作者: Sorry2Me | 来源:发表于2020-12-27 13:16 被阅读0次

阻塞队列

非阻塞队列是一个先进先出的单向队列(Queue),而BlockingQueue阻塞队列实际是非阻塞队列的一个加强,之所以是阻塞队列最主要的是实现take和put,当阻塞队列满时,put会一直阻塞直到拿到数据,或者响应中断退出;当队列空时,take元素,队列也会阻塞直到队列可用。

阻塞的意思就是此条线程会处于等待卡死状态,解锁的条件是队列中有另一条线程存入或取出数据了,就会解锁,就相当于队列是仓库,仓库没有货了就生产,有货就能消费,锁条件是notFull和notEmpty。

Throws exception Special value Blocks Times out
插入方法 add(e) offer(e) put(e) offer(e, time, unit)
移除方法 remove() poll() take() poll(time, unit)
检查方法 element() peek()

队列(Queue和Deque)

队列是一种特殊的线性表,链表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。

进行插入操作的端称为队尾,进行删除操作的端称为队头。

单向队列(Queue):先进先出(FIFO),只能从队列尾插入数据,只能从队列头删除数据.

双向队列(Deque):可以从队列尾/头插入数据,只能从队列头/尾删除数据.操作头和尾.

单向队列Queue结构

private Node first;
private Node last;
int size = 0;

class Node{
    private Node next;
    private Object element;

    public Node(Object element) {
        this.element = element;
    }
    public Node(){}
}

单向队列操作

public void push(Object element){
    //单向队列(Queue):先进先出(FIFO),只能从队列尾插入数据
    size++;
    Node node = new Node(element);
    if (size>1){
        this.last.next = node;
        this.last = node;
    }else if (size == 1){
        this.first = node;
        this.last = node;
    }
}

public void pull(){
    //单向队列(Queue):先进先出(FIFO),只能从队列头删除数据.
    size--;
    this.first = this.first.next;
    if (size == 1){
        this.last = this.first;
    }
}

常见的阻塞队列

阻塞队列:BlockingQueue,多用于创建线程池

image

ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列,只维护一个lock。

线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。

LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列,维护两个lock。

如果不设置队列容量,其初始化的默认容量大到可以认为是无界队列了,在这种情况下maximumPoolSize参数是无效的,队列中的任务太多,以至于由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。

DelayedWorkQueue:ScheduledThreadPoolExecutor静态内部类的一个无界延迟阻塞队列,添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被执行

SynchronousQueue:无存储空间的阻塞队列。

1.SynchronousQueue是一个双栈双队列算法,无空间的队列,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。
2.提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据设置的handler执行拒绝策略。因此这种方式提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,需要对程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略;

LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

非阻塞队列的API

除了阻塞队列就是非阻塞队列,非阻塞队列,就是单纯意义上的"非"阻塞队列,其没有put和take方法,可以无限制存,且是线程安全的,N个用户同时存也能保证每次存放在队尾而不乱掉,但是其的size()方法会不定时遍历,所以很耗时

以ConcurrentLinkedQueue并发队列为例,多用于消息队列,并发异步处理,如日志等;add和poll是线程安全的,但是其他方法并没有保证线程安全,如判断isEmpty(),所以在多线程并发时还得自己加锁

ConcurrentLinkedQueue内部就是一个简单的单链表结构,每入队一个元素就是插入一个Node类型的结点。head指向队列头,tail指向队列尾,通过Unsafe来CAS操作字段值以及Node对象的字段值

image

offer/add添加队列元素:两个方法一样,都是将指定元素插入此队列的尾部,添加成功返回true;区别在于队列满时,add会抛异常,而offer会返回false

poll获取并移除此队列的头,如果此队列为空,则返回 null

public static void main(String[] args) {
    ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
    queue.add("1000");
    queue.offer("2000");
    //先进先出
    System.out.println(queue.poll());//1000
    System.out.println(queue.poll());//2000
    System.out.println(queue.poll());//null
}

peek获取但不移出队列的头,如果此队列为空,则返回 null

public static void main(String[] args) {
    ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
    queue.add("1000");
    queue.offer("2000");
    //peek获取但不移除队列的头,所以每次peek()都是1000
    System.out.println(queue.peek());//1000
    System.out.println(queue.peek());//1000
    System.out.println(queue.poll());//1000
}

remove从队列中移除指定元素,已存在元素,会返回true,remove不存在元素,返回false

contains判断当前队列是否包含指定元素,如果存在返回true,不存在返回false

public static void main(String[] args) {
    ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
    queue.add("1000");
    queue.offer("2000");
    System.out.println(queue.remove("1000"));//true
    //1000已经被移除了
    System.out.println(queue.remove("1000"));//false
    System.out.println(queue.contains("1000"));//false
    System.out.println(queue.contains("2000"));//true
}

size队列的数量,如果此队列包含的元素数大于 Integer.MAX_VALUE,则只会返回 Integer.MAX_VALUE。由于这些队列的异步特性,确定当前的元素数需要进行一次花费 O(n) 时间的遍历。所以在需要判断队列是否为空时,使用peek()!=null,不要用 queue.size()>0

isEmpty判断当前队列是否为null

toArray(T[] a)转为指定数组,按照头->尾的顺序返回

ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
queue.add("1000");
queue.offer("2000");
queue.offer("3000");
Object[] array = queue.toArray();
for (Object o : array){
    System.out.println(o);1000->2000->3000
}

iterator获取按照头到尾的顺序遍历的迭代器

ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();
queue.add("1000");
queue.offer("2000");
Iterator<Object> iterator = queue.iterator();
while (iterator.hasNext()){
    System.out.println(iterator.next());
}

队列应用的示例:

    //创建并发队列
    ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<>();

    //提供100000份需要消耗的食物,并放入队列
    int productionNum = 10000;
    for(int i = 1;i<10001;i++){
        queue.offer("食物编号:"+i);
    }

    ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(100);
    for (int i =0;i<10;i++){
        //submit()有返回值,而execute()没有
        Future<?> future = executorService.submit(new Runnable() {
            @Override
            public void run() {
                while (!queue.isEmpty()) {
                    System.out.println("当前线程为:" + Thread.currentThread().getName() + ":" + queue.poll());
                }
            }
        });
    }

ArrayBlockQueue/LinkedBlockQueue

ArrayBlockingQueue基于数组不会产生或销毁任何额外的对象实例,LinkedBlockingQueue基于链表会生成额外的Node对象会有会内存溢出的风险。但是常用的其实还是LinkedBlockingQueue,使用两套锁,实现生产和消费操作的并行,单个锁只能保证生产者和消费者只能每次操作一次生产或者消费,而双锁可以使得生产者在队列满时不断的向队列尾部添加node,消费者不断从head获取Node,从而吞吐效率更高

//Array可以选择公平锁和非公平锁,而Linked两把锁都是非公平锁
BlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(100,true);
BlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>(100);
//阻塞的方法只有take和put
queue.take();
queue.put("");

ArrayBlockingQueue源码:

//数组 
final Object[] items;
//获取数据的索引,主要用于take,poll,peek,remove方法
int takeIndex;
//添加数据的索引,主要用于 put, offer, or add 方法
int putIndex;
//队列元素的个数
int count;
final ReentrantLock lock;
//notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操作
private final Condition notEmpty;
//notFull条件对象,用于通知put方法队列未满,可执行添加操作
private final Condition notFull;

put方法

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    //可中断加锁
    lock.lockInterruptibly();
    try {
        //数组满了,生产条件释放锁并阻塞线程
        while (count == items.length)
            notFull.await();
        //入队操作
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    final Object[] items = this.items;
    //设置当前索引值为put添加的值,初始为0
    items[putIndex] = x;
    //length是固定的,简单理解就是当条件这个元素之后数组就满了
    //那么当消费者从0开始往后消费,生产者被唤醒从而继续从0开始继续添加
    //需要先了解消费的方式:会从头一直消费到最后一个元素之后又从0开始继续消费
    if (++putIndex == items.length)
        //消费者从0往后依次消费,生产者在从0开始继续添加
        putIndex = 0;
    count++;
    //唤醒消费者
    notEmpty.signal();
}

take方法

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //数组没有资源,消费者进入休眠
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    final Object[] items = this.items;
    //takeIndex初始也为0,从索引0开始消费
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    //take索引自增,当消费完最大的索引值,又从0开始消费
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //消费之后数组未满,所以唤醒生产者继续生产
    notFull.signal();
    return x;
}

LinkedBlockingQueue源码:

//由Node单向链表组成的队列,初始化时创建一个空Node并且设置为head和last
static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}
//队列容量
private final int capacity;
//队列长度,因为多个线程可以同时生产或消费,需要保证改变值的可见性和原子性
private final AtomicInteger count = new AtomicInteger();
//队列的头和尾
transient Node<E> head;
private transient Node<E> last;
// 生产者和消费者维护了两套锁
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

put方法,可以实现入列和消费同时进行,但是生产或者消费时只能同时运行一个线程

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    //生产锁
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        //如果满了,当前生产线待机
        while (count.get() == capacity) {
            notFull.await();
        }
        //入列
        enqueue(node);
        //cas设置count+1,返回值仍是原值
        c = count.getAndIncrement();
        //被消费之后队列未满则唤醒队列
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        //当队列为不为空时,消费锁释放
        signalNotEmpty();
}

private void enqueue(Node<E> node) {
    //当前node设置为last,并设置为上一个节点的next
    last = last.next = node;
}

take方法

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        //队列为空消费者等待被唤醒
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        //count值减1,返回的还是原值
        c = count.getAndDecrement();
        //当消费队列不为空消费者被唤醒
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    //队列满了,生产者进行signal
    if (c == capacity)
        signalNotFull();
    return x;
}

线程池

线程池,简单来说就是一个管理工作线程的管理器,可以复用线程从而减少频繁创建和销毁线程,控制并发数;可以创建多个线程池用于处理不同的业务

AQS队列是运行中的线程由于获取资源形成队列,更改线程的wait状态原地阻塞,直到被唤醒继续执行线程;而线程池中的任务队列虽然也是将Runnable对象加入队列,但并不存在阻塞,因为这是仅仅只是内存对象,通过工作线程去执行任务队列里Runnable对象的run方法,所有其实不一定要是Runnable对象,只需要统一将要执行的任务通过同一个方法去实现,在工作线程执行时再去统一调那个方法即可

提交任务执行,就是工作线程去直接调任务的run()或者call()方法

class SimperExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}

简易线程池

static List<Runnable> runnables = Lists.newArrayList();
static Thread threadPool1 = new Thread(() -> {
    System.out.println("线程对象启动:" + Thread.currentThread().getName());
    while (true) {
        if (!runnables.isEmpty()){
            Thread runnable = (Thread) runnables.get(0);
            System.out.println("被执行任务名称:"+runnable.getName());
            runnable.run();
            runnables.remove(0);
        }
    }
}, "threadPool1");

private static void execute(Runnable runnable){
    runnables.add(runnable);
}

public static void main(String[] args) throws InterruptedException {
    threadPool1.start();
    Thread t1 = new Thread(() -> {
        System.out.println("任务对象1:"+Thread.currentThread().getName());
    }, "t1");
    Thread t2 = new Thread(() -> {
        System.out.println("任务对象2:"+Thread.currentThread().getName());
    }, "t2");
    Thread t3 = new Thread(() -> {
        System.out.println("任务对象3:"+Thread.currentThread().getName());
    }, "t3");

    execute(t1);
    execute(t2);
    execute(t3);

}

构建ThreadPoolExecutor

ThreadPoolExecutor的继承和实现结构

image

Executor只提供了executor方法不需要获取结果,ExecutorService提供的submit需要获取结果(FutureTask),可以进行Exception处理

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue){
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
        }
  • corePoolSize 核心线程数,初始化ThreadPoolExecutor的时候,线程默认不会立即创建核心线程创建,等到有任务提交时才会创建

    //初始化线程池对象之后并没有立即创建核心线程
    ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(3);
    System.out.println(executorService.getActiveCount());//0
    //当提交任务之后才会先去创建核心线程
    executorService.submit(new Thread(()->{
        try {
            TimeUnit.SECONDS.sleep(3);
            System.out.println(executorService.getActiveCount());//1
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }));
    
  • maximumPoolSize 最大线程数,线程池允许创建的最大线程数,如果队列中任务已满,也就是核心线程无法及时处理这些任务,那么会创建新的线程来执行任务。

  • workQueue 任务队列,BlockingQueue

  • keepAliveTime 除了核心线程,空闲线程存活时间; 设置allowCoreThreadTimeOut(true)可以回收核心线程

  • threadFactory 用于创建线程池的工作线程,可自定义线程创建,线程池中线程就是通过ThreadPoolExecutor中的ThreadFactory线程工厂创建的。那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名、优先级等

    //默认的DefaultThreadFactory的简单实现
    ThreadFactory threadFactory = new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            //线程命名
            Thread th = new Thread(r,"pool-1"+"-thread");
            return th;
        }
    };
    
  • rejectedHandler 拒绝策略,用来处理线程池"超载"的情况,当线程池已满,但是又有新的任务添加到队列时需要采取的策略,比如什么都不处理,抛异常,返回false等,也可以自定义实现,默认提供了4种拒绝策略:AbortPolicy,直接抛出异常,阻止系统正常工作; CallerRunsPolicy,如果线程池的线程数量达到上限,会把任务队列中的任务放在调用者线程当中运行; DiscardOledestPolicy,会丢弃任务队列中最早的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交;DiscardPolicy,没有任何处理,也就是丢弃任务,使用此策略,业务场景中需允许任务的丢失;

    实现RejectedExecutionHandler及rejectedExecution方法

    ---- AbortPolicy 抛异常
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
    
    ---- CallerRunsPolicy 溢出的任务直接执行
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
           if (!e.isShutdown()) {
               r.run();
           }
    }
    
    ---- DiscardOldestPolicy 取出workQueue中的firstQueue,再重新提交取出的任务
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
    
    ---- DiscardPolicy 什么都不做,等于直接抛弃了这个任务
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
    

线程池的状态

线程状态和工作线程数

//ctl初始为 RUNNING
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//高3位表示线程状态,低29位表示线程数
private static final int COUNT_BITS = 32 - 3;
//000 111....
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState 线程状态
private static final int RUNNING    = -1 << COUNT_BITS;//101
private static final int SHUTDOWN   =  0 << COUNT_BITS;//000
private static final int STOP       =  1 << COUNT_BITS;//001
private static final int TIDYING    =  2 << COUNT_BITS;//010
private static final int TERMINATED =  3 << COUNT_BITS;//011

// ~1=0,0&1=1 低29位全部为0,用于获取高3位的线程状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//低29位全部为1,用于获取低29位的线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }
//new AtomicInteger(ctlOf(RUNNING, 0))
private static int ctlOf(int rs, int wc) { return rs | wc; }

RUNNING 定义为 -1,SHUTDOWN 定义为 0,其他的都比 0 大,所以等于 0 的时候不能提交任务,大于 0 的话,连正在执行的任务也需要中断。

  • RUNNING:最正常的状态:接受新的任务,处理等待队列中的任务
  • SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务
  • STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程
  • TIDYING:所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
  • TERMINATED:terminated() 方法结束后,线程池的状态就会变成这个

状态变化的转换过程:

  • RUNNING -> SHUTDOWN:调用 shutdown()
  • (RUNNING or SHUTDOWN) -> STOP:调用 shutdownNow()
  • SHUTDOWN -> TIDYING:当任务队列和线程池都清空后,会由 SHUTDOWN 转换为 TIDYING
  • STOP -> TIDYING:当任务队列清空后,发生这个转换
  • TIDYING -> TERMINATED:当 terminated() 方法结束后

submit提交任务

executor只能提交没有返回值得Runnable任务,submit可以提交Runnable和Callable任务,将提交的任务封装成RunnableFuture

Future接口

对线程池提交一个Callable任务,可以获得一个Future对象

ExecutorService service = new ThreadPoolExecutor(2,2,100,TimeUnit.SECONDS,new LinkedBlockingQueue<>());

List<Future> list = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
    Callable<String> c1 = new Callable<String>() {
        @Override
        public String call() throws Exception {
            TimeUnit.SECONDS.sleep(10);
            return UUID.randomUUID().toString().replace("-", "");
        }
    };
    //提交callable任务
    Future<String> future = service.submit(c1);
    list.add(future);
}

//阻塞,直到获取所有future值
list.forEach(future -> {
    try {
        System.out.println(future.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
});
TimeUnit.SECONDS.sleep(3);

实现有返回值的callable接口

//方式一:提交一个Task任务,submit(Callable<T> task)获取future返回值
ExecutorService service = Executors.newFixedThreadPool(1);
Callable<String> c1 = new Callable<String>() {
    @Override
    public String call() throws Exception {
        System.out.println(111);
        TimeUnit.SECONDS.sleep(10);
        return "a1111";
    }
};
Future<String> future = service.submit(c1);
System.out.println(futureTask.get());

//方式二:封装为一个FutureTask并启动线程执行,通过FutureTask获取返回值
Callable<String> c1 = new Callable<String>() {
    @Override
    public String call() throws Exception {
        System.out.println(111);
        TimeUnit.SECONDS.sleep(10);
        return "a1111";
    }
};
FutureTask<String> futureTask = new FutureTask<>(c1);
Thread thread = new Thread(futureTask);
thread.start();
System.out.println(futureTask.get());

FutureTask

FutureTask 间接实现了 Runnable 和Future接口,所以FutureTask即可以作为一个Runnable任务,也可以作为一个Callable实例实现获取返回值

img

Runnable 的 run() 方法没有返回值,第二个参数将会放到 Future 中,作为返回值,通过这两个参数,将其包装成 Callable

---- ExecutorService
//提交一个Runnable任务肯定不会有返回值
Future<?> submit(Runnable task);
//两个参数封装为Callable
<T> Future<T> submit(Runnable task, T result);
// 提交一个 Callable 任务
<T> Future<T> submit(Callable<T> task);

---- 演示
Callable<String> callable = new Callable<String>() {
    @Override
    public String call() throws Exception {
        return "111";
    }
};

Runnable runnable = new Runnable() {
    @Override
    public void run() {
    }
};

ExecutorService service = Executors.newFixedThreadPool(1);
Future<String> future = service.submit(callable);
Future<?> future1 = service.submit(runnable);
Future<String> run = service.submit(runnable, "run");
System.out.println("future:"+future.get());//111
System.out.println("future1:"+future1.get());//null
System.out.println("run:"+run.get());//run

newTaskFor

将Callable封装成 FutureTask 提交到线程池中执行,Runnable通过RunnableAdapter适配器封装成一个Callable

---- 提交任务
public <T> Future<T> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, null);
    //交给执行器执行,execute 方法由具体的子类来实现
    execute(ftask);
    return ftask;
}

//newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW; 
}

public static <T> Callable<T> callable(Runnable task, T result) {
    return new RunnableAdapter<T>(task, result);
}
//Executors,最终发现是通过RunnableAdapter适配器将Runnable封装为一个Callable
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        //调run方法,返回result
        task.run();
        return result;
    }
}

run/get

考虑回调callable时,FutureTask 封装的任务对象最终是通过工作线程执行run()方法处理任务,而FutureTask 的run方法则调用了callable的call方法,并将result设值到outcome

//封装的callable
private Callable<V> callable;
//callable的回调值,通过get方法获取
private Object outcome;
//CAS防止callable被执行多次
private volatile Thread runner;
//维护了一个单向链表 waiters , 在执行 get 的时候会向其中添加节点
private volatile WaitNode waiters;

//run方法,线程池工作线程最终通过runWork()调run方法
public void run() {
    // 1. 状态如果不是NEW,说明任务或者已经执行过,或者已经被取消,直接返回
    // 2. 状态如果是,则把当前执行线程保存在runner字段中
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 执行call()方法
                result = c.call();
                ran = true;
            } 
            if (ran)
                //执行成功,outcome = result
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        // 如果任务被中断,执行中断处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    //COMPLETING状态是任务是否执行完成的临界状态,进行阻塞直到任务执行结束
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

private V report(int s) throws ExecutionException {
    Object x = outcome;
    //当任务被执行完返回outcome
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

awaitDone中,get()形成一个阻塞队列

image

future可能会有多个线程去获取

Callable<String> callable = new Callable<String>() {
    @Override
    public String call() throws Exception {
        TimeUnit.SECONDS.sleep(2);
        return "111";
    }
};

ExecutorService service = Executors.newFixedThreadPool(1);
Future<String> future = service.submit(callable);

new Thread(()->{
    try {
        System.out.println(Thread.currentThread().getName()+":"+future.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
}).start();
new Thread(()->{
    try {
        System.out.println(Thread.currentThread().getName()+":"+future.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
}).start();

当多个线程访问get方法时,会阻塞形成waiters队列,由Node维护

image

FutureTask 执行活动图

image

execute 方法

submit方法只是在executor方法中增加一个get获取Callable返回值功能

真正的开启线程执行任务

public void execute(Runnable command) {
    // 表示 “线程池状态” 和 “线程数” 的整数
    int c = ctl.get();
    // 如果当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务,
    if (workerCountOf(c) < corePoolSize) {
        // addWorker(command, true)返回 false 代表线程池不允许提交任务
        if (addWorker(command, true))
            return;
    }

    //判断c < SHUTDOWN,只有RUNNING,offer添加任务到任务队列
    if (isRunning(c) && workQueue.offer(command)) {
        //防止并发更改了线程池状态
        int recheck = ctl.get();
        //防止并发更改了线程池状态
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果 workQueue 队列满了,以 maximumPoolSize 为界创建新的 worker
    else if (!addWorker(command, false))
        //执行初始化线程池中配置的reject策略
        reject(command);
}

addWorker

addWorker()创建线程并执行工作线程处理任务,core:是否为核心线程

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //基本的校验,线程池状态不能大于SHUTDOWN
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()))
            return false;
        for (;;) {
            //工作线程数
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                //execute方法已经作了判断,这里主要进行check
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                //工作线程+1,退出retry循环继续往下执行
                break retry;
            //CAS失败了,重新获取c判断线程池状态
            c = ctl.get();
            if (runStateOf(c) != rs)
                //状态一致则重新执行内循环,否则执行retry循环
                continue retry;
        }
    }
    //创建的worker工作线程是否已经启动
    boolean workerStarted = false;
    //创建的worker是否添加到工作队列中 HashSet<Worker> workers
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        //ThreadFactory封装之后的工作线程
        final Thread t = w.thread;
        if (t != null) {
            // 这个是整个线程池的全局锁,关闭线程池需要这个锁,持有锁线程池不会被关闭
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //线程池状态条件判断,小于SHUTDOWN只有RUNNING
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    //SHUTDOWN状态时,无法添加任务
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    //记录线程池最大值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();//启动工作线程
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker

Worker则是线程工作实例,封装了工作线程thread,即将被执行的任务firstTask

//封装worker后的工作线程
final Thread thread;
//即将被执行的任务
Runnable firstTask;
//已成功执行完成的任务计数
volatile long completedTasks;

Worker(Runnable firstTask) {
    //Worker继承AQS,设置的-1就是AQS的SIGNAL状态
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    //Worker实现Runnable,通过初始化传参的ThreadFactory封装当前的Worker
    this.thread = getThreadFactory().newThread(this);
}
//开启工作线程的run方法
public void run() {
    runWorker(this);
}

runWorker

开启工作线程,runWorker处理任务

final void runWorker(Worker w) {
    //执行任务的当前线程
    Thread wt = Thread.currentThread();
    //即将处理的任务
    Runnable task = w.firstTask;
    //worker中firstTask置空
    w.firstTask = null;
    //new Worker()是state==-1,调用tryRelease()将state置为0, 而interruptIfStarted()中只有state>=0才允许调用中断
    w.unlock(); // allow interrupts
    //异常导致的进入finally,那么completedAbruptly==true就是突然完成的
    boolean completedAbruptly = true;
    try {
        //工作线程只会从firstTask或workQueue中获取任务
        while (task != null || (task = getTask()) != null) {
            //不是为了防止并发执行任务,在shutdown()时不终止正在运行的worker
            w.lock();
            //确保只有在线程stoping时,才会被设置中断标示,否则清除中断标示
            if (...)
                wt.interrupt();
            try {
                //钩子方法,由子类实现处理任务之前的操作
                beforeExecute(wt, task);
                try {
                    //执行任务的run方法
                    task.run();
                } finally {
                    //同样是钩子方法,catch到的thrown异常
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        //没有突然的结束
        completedAbruptly = false;
    } finally {
        //处理worker的退出
        processWorkerExit(w, completedAbruptly);
    }
}

1、beforeExecute:线程池中任务运行前执行

2、afterExecute:线程池中任务运行完毕后执行

3、terminated:线程池退出后执行

通过覆写ThreadPoolExecutor实例的方法

pool = new ThreadPoolExecutor(....) {
    protected void beforeExecute(Thread t,Runnable r) {
        System.out.println("准备执行:"+ ((ThreadTask)r).getTaskName());
    } 
    protected void afterExecute(Runnable r,Throwable t) {
        System.out.println("执行完毕:"+((ThreadTask)r).getTaskName());
    }    
    protected void terminated() {
        System.out.println("线程池退出");
    }
};

getTask

getTask()获取任务,当前工作线程处理完firstTask开始从任务队列处理

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 队列为空时,工作线程-1退出循环
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        //allowCoreThreadTimeOut为true则会关闭核心线程
        //如果>corePoolSize数则一直会在超时之后关闭线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //是否获取workQueue合理性校验
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        
        try {
            //timed为true两种情况:allowCoreThreadTimeOut为ture
            //或者工作线程>corePoolSize
            Runnable r = timed ?
                //如果超时会抛异常,获取不到也表明任务少,不需要这么多工作线程
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                //获取成功
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            //不抛异常,继续循环
            timedOut = false;
        }
    }
}

Executors工具生成的线程池

//获取可用cpu核心数
public static final int maxThreads = Runtime.getRuntime().availableProcessors();
//Executors工具初始化线程池
public static ExecutorService executor =  Executors.newFixedThreadPool(maxThreads);
//分配任务
Future<String> future = executor.submit(callable);
//关闭线程池
executorService.shutdown();

生成一个固定大小的线程池

最大线程数设置为与核心线程数相等,此时 keepAliveTime 设置为 0(因为这里它是没用的,即使不为 0,线程池默认也不会回收 corePoolSize 内的线程),任务队列采用 LinkedBlockingQueue,无界队列。

每提交一个任务都创建一个 worker,当 worker 的数量达到 nThreads 后,不再创建新的线程,而是把任务提交到 LinkedBlockingQueue 中,而且之后线程数始终为 nThreads。

//生成一个固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

生成只有一个线程的固定线程池

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

生成一个需要的时候就创建新的线程,同时可以复用之前创建的线程的线程池

//核心线程数为 0,最大线程数为 Integer.MAX_VALUE,keepAliveTime 为 60 秒,任务队列采用 SynchronousQueue
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

创建一个定长线程池,支持定时及周期性任务执行,定时线程池,schedule定时开启线程池

//最大线程数为 Integer.MAX_VALUE,任务队列采用 DelayedWorkQueue
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

static ScheduledExecutorService scheduledService = new ScheduledThreadPoolExecutor(2);

public static void main(String[] args) {
    long startNow = System.currentTimeMillis();
    //LocalDateTime处理时间
    LocalDateTime dateTime = LocalDateTime.of(2020, 8, 19, 17, 28);
    long time = Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant()).getTime();
    System.out.println("time:"+(time-startNow));

```
scheduledService.schedule(new Thread(()->{
    System.out.println("进入schedule处理的时间为:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}),(time-startNow),TimeUnit.MILLISECONDS);

相关文章

  • 探讨阻塞队列和线程池源码

    阻塞队列 非阻塞队列是一个先进先出的单向队列(Queue),而BlockingQueue阻塞队列实际是非阻塞队列的...

  • 线程池

    [TOC] 线程池 1. 并发队列:阻塞队列和非阻塞队列 区别如下: 入队: 非阻塞队列:当队列中满了的时候,放入...

  • 线程池

    线程池执行过程 线程池生命周期 线程池分类 阻塞队列 拒绝策略 - ThreadPoolExecutor.Abor...

  • 阻塞队列和线程池

    1.阻塞队列 1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。2)支持阻塞的移除...

  • 阻塞队列和线程池

    队列 队列,又称为伫列(queue),是先进先出(FIFO, First-In-First-Out)的线性表。在具...

  • Executors线程池

    newCacheThreadPool(缓存线程池):阻塞队列为SynchronousQueue,核心线程数0,最大...

  • 阻塞队列

    BlockingQueue线程池的数据结构是阻塞队列BlockingQueue。(在多线程领域:所谓阻塞,在某些情...

  • 阻塞队列和线程池原理

    阻塞队列和线程池原理 阻塞队列 队列 队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除...

  • 4、阻塞队列和线程池原理

    阻塞队列和线程池原理 阻塞队列 队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,...

  • 线程池

    1、为什么要使用线程池2、线程池的工作原理3、线程池参数4、阻塞队列5、饱和策略6、向线程池提交任务7、线程池的状...

网友评论

      本文标题:探讨阻塞队列和线程池源码

      本文链接:https://www.haomeiwen.com/subject/ikjonktx.html