美文网首页Java
ThreadPoolExecutor优先级队列PriorityB

ThreadPoolExecutor优先级队列PriorityB

作者: 丶含光 | 来源:发表于2020-05-20 00:41 被阅读0次

前两天重构代码,调试的时候,发现有个使用到线程池的地方抛出java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to异常
这个代码是线上在跑的一个逻辑,不该出现问题才对,最后还是翻了下源码确定原因

原因:之前向线程池提交任务用的是execute方法,复制的时候错用成了submit方法,改回execute方法即可 =。=

既然遇到了,顺便记录下

自定义提交到线程池的任务
@Data
@AllArgsConstructor
class TestRunnable implements Runnable {

    private Integer i;

    @Override
    public void run() {
        try {
            Thread.sleep(1000 * i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(i);
    }
}
  • 线程池配合优先级队列的使用execute
    public static void main(String[] args) {
        // 创建优先级队列,指定队列初始大小 指定队列中的任务比较器
        // 优先级队列是无界的 指定的只是初始大小
        // 可以使用lambda简化
        PriorityBlockingQueue queue = new PriorityBlockingQueue(100, new Comparator<TestRunnable>() {

            @Override
            public int compare(TestRunnable o1, TestRunnable o2) {
                return o1.getI() - o2.getI();
            }
        });
        // 创建线程池 传入优先级队列
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, queue);

        threadPoolExecutor.execute(new TestRunnable(5));

        threadPoolExecutor.execute(new TestRunnable(2));

        threadPoolExecutor.execute(new TestRunnable(1));

        threadPoolExecutor.execute(new TestRunnable(3));
    }

执行结果

5
1
2
3

根据执行结果可以看到任务已经被排过序

源码笔记
ThreadPoolExecutor execute
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 线程池有正在执行的任务,其余任务会在指定的队列上排队
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

这里指定的是PriorityBlockingQueue,可以看下该队列增加任务的方法

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            // cmp是创建队列时指定的比较器
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

siftUpUsingComparator方法

    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                       Comparator<? super T> cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            // 可以看到调用了比较器的compare方法,以确定任务的优先级
            // x e是指定的任务
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }
  • 线程池配合优先级队列的使用submit
    public static void main(String[] args) {
        // 创建优先级队列,指定队列初始大小 指定队列中的任务比较器
        // 优先级队列是无界的 指定的只是初始大小
        // 可以使用lambda简化
        PriorityBlockingQueue queue = new PriorityBlockingQueue(4, new Comparator<TestRunnable>() {

            @Override
            public int compare(TestRunnable o1, TestRunnable o2) {
                return o1.getI() - o2.getI();
            }
        });
        // 创建线程池 传入优先级队列
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, queue);

        threadPoolExecutor.submit(new TestRunnable(5));

        threadPoolExecutor.submit(new TestRunnable(2));

        threadPoolExecutor.submit(new TestRunnable(1));

        threadPoolExecutor.submit(new TestRunnable(3));
    }

执行结果

Exception in thread "main" java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to com.tianwen.jdk.TestRunnable
    at com.tianwen.jdk.DemoApplication$1.compare(DemoApplication.java:17)
    at java.util.concurrent.PriorityBlockingQueue.siftUpUsingComparator(PriorityBlockingQueue.java:375)
    at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:492)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.tianwen.jdk.DemoApplication.main(DemoApplication.java:31)
5
2

可以看到,除了一个正在被执行的任务和一个排在队列头部的任务,其余的任务添加时,在调用compare方法都会抛出异常

源码笔记
ThreadPoolExecutor submit
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 封装成一个Future
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        // 同样的在执行ThreadPoolExecutor的execute方法
        // 但是传入的已经不是最初的任务,而是一个封装过的Future
        execute(ftask);
        return ftask;
    }

实际是在父类AbstractExecutorService中,可以看到submit方法内先将提交给线程池的任务封装成一个Future,再同样执行ThreadPoolExecutorexecute方法。但其实这里的任务已经不是最初指定的任务,而是一个Future,所以在最后尝试将任务放进优先级队列时,调用比较器的compare方法时自然会抛出
java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to

相关文章

网友评论

    本文标题:ThreadPoolExecutor优先级队列PriorityB

    本文链接:https://www.haomeiwen.com/subject/ygswohtx.html