美文网首页
具有优先级的线程池

具有优先级的线程池

作者: 不1见2不3散4 | 来源:发表于2018-10-20 23:36 被阅读0次

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/russle/article/details/83218137

问题由来:
多线程接收kafka的消息,有时消息几乎同时达到,先简单处理后提交给线程池再次处理,结果出现当先到达的消息msgA和后到达的消息msgB到达时间相差很小时,例如10毫秒,几乎同时提交到线程池,因为提交的线程池的时间相差更小有时几乎是完全相同的时间,导致偶然消息处理乱序,消息msgB被先处理了。

解决思路:
所有提交给线程池的Runnable带有优先级,虽然几乎同时提交到线程池,但是线程执行任务时从自己的任务队列中找当前优先级最高的先处理。基于上面的情况,可以根据消息的到达时间进行优先级比较,先到达的优先级高。

具体实现
分为两个类,第一个可比较优先级的Runnable,第二个线程池的创建时使用PriorityBlockingQueue而不是LinkedBlockingQueue。

Runnable代码

package com.yq;

import lombok.extern.slf4j.Slf4j;
import java.util.Random;

@Slf4j
public class PrioritizedRunnable implements Runnable, Comparable<PrioritizedRunnable> {
    private long rts;
    private String name;
    PrioritizedRunnable(long rts, String name) {
        this.rts = rts;
        this.name = name;
    }
    public long getRts() {
        return rts;
    }
    public String getName() {
        return name;
    }
    @Override
    public int compareTo(PrioritizedRunnable secondOne) {
        // 时间越小越优先
       log.info("compareTo. this.name={}, secondOne.name={}", this.getName(), secondOne.getName());
        if (this.getRts() < secondOne.getRts()) {
            return -1;
        }else if(this.getRts()> secondOne.getRts()){
            return 1;
        } else {
            return 0;
        }
    }
    @Override
    public void run() {
        Random random = new Random();
        log.info("rts={}, name={}", rts, name);
        try {
            int sleepRandom = random.nextInt(200);
            Thread.sleep(sleepRandom);
        } catch (Exception ex) {
            log.info("sleep exception", ex);
        }
    }
}

具体调用的示例代码

package com.yq;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
public class PriorityDemo {

    public static void main(String[] args) throws Exception {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,
                2,
                Long.MAX_VALUE, /* timeout */
                TimeUnit.NANOSECONDS,
                new PriorityBlockingQueue<Runnable>(),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        PrioritizedRunnable p1 = new PrioritizedRunnable(1234, "name1-1");
        PrioritizedRunnable p2 = new PrioritizedRunnable(1500, "name4-2");
        PrioritizedRunnable p3 = new PrioritizedRunnable(1590, "name5-3");
        PrioritizedRunnable p4 = new PrioritizedRunnable(1490, "name3-4");
        PrioritizedRunnable p5 = new PrioritizedRunnable(1290, "name2-5");

        executor.execute(p4);
        executor.execute(p1);
        executor.execute(p2);
        executor.execute(p3);
        executor.execute(p5);
        log.info("submit 5 Runnable");
        Thread.sleep(30*1000);
        ruleExecutor.shutdown();
        log.info("done!");
    }
}

备注:提交到线程池使用execute方法,不要使用submit要不然回报

java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable
    at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:357) ~[na:1.8.0_161]
    at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489) ~[na:1.8.0_161]
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371) ~[na:1.8.0_161]
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) ~[na:1.8.0_161]

运行结果
可以看到当线程池执行任务,虽然p4先提交,但是因为p1优先级更高,所以先执行p1 (23:22:33.283 [pool-1-thread-2] INFO com.yq.PrioritizedRunnable - rts=1234, name=name1-1), 然后是p4.

23:22:33.283 [main] INFO com.yq.PrioritizedRunnable - compareTo. this.name=name5-3, o.name=name4-2
23:22:33.283 [pool-1-thread-2] INFO com.yq.PrioritizedRunnable - rts=1234, name=name1-1
23:22:33.283 [pool-1-thread-1] INFO com.yq.PrioritizedRunnable - rts=1490, name=name3-4
23:22:33.288 [main] INFO com.yq.PrioritizedRunnable - compareTo. this.name=name2-5, o.name=name4-2
23:22:33.288 [main] INFO com.yq.PriorityDemo - submit 5 Runnable
23:22:33.440 [pool-1-thread-1] INFO com.yq.PrioritizedRunnable - compareTo. this.name=name4-2, o.name=name5-3
23:22:33.440 [pool-1-thread-1] INFO com.yq.PrioritizedRunnable - rts=1290, name=name2-5
23:22:33.451 [pool-1-thread-2] INFO com.yq.PrioritizedRunnable - rts=1500, name=name4-2
23:22:33.497 [pool-1-thread-2] INFO com.yq.PrioritizedRunnable - rts=1590, name=name5-3
23:23:03.289 [main] INFO com.yq.PriorityDemo - done!

Process finished with exit code 0

作者:russle
来源:CSDN
原文:https://blog.csdn.net/russle/article/details/83218137
版权声明:本文为博主原创文章,转载请附上博文链接!

相关文章

  • 具有优先级的线程池

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/russle/a...

  • [笔记]Java多线程概略

    线程的定义和状态 创建、就绪、运行、阻塞、停止 线程优先级 线程优先级的系统规则线程是具有优先级的,高优先级的线程...

  • 7-线程的优先级

    线程的优先级 在操作系统中,线程存在优先级关系,具有高优先级的线程会获得更多CPU的资源,优先级低的线程会获得少的...

  • Android实现具有优先级的线程池

    在安卓开发中为了复用线程以及节约线程开销,线程池是一种比较多的方法,有时也会有这样的需求,不同的线程执行任务的紧急...

  • 线程相关

    线程池:1.共有四种:Executors 2.改线程设置优先级: 3.AsyncTask的使用:线程优先级为bac...

  • HiExecutor

    全局通用的线程池组件-HiExecutor 支持任务优先级 支持线程池暂停、恢复、关闭 支持异步任务结果回调 Co...

  • 6. ThreadPool.h

    线程池,这个线程池暂时不具有自动扩容缩小的动能。 逻辑 正常的线程池的线是这个样子的。 创建一个线程池对象, 执行...

  • 一些小的Java知识点

    线程池参数优先级。 coreThread_size > queue_size > maxThread_size ...

  • 【代码】Java多线程实现

    一、什么情况下创建线程不使用线程池? 1、需要定义线程的优先级; 2、需要创建前台线程; 3、需要手动终止线程; ...

  • 异步任务的正确使用

    1、线程 注意线程是可以设置优先级的 特定场景下(例如App启动阶段为避免在主线程创建线程池的资源消耗)使用的话务...

网友评论

      本文标题:具有优先级的线程池

      本文链接:https://www.haomeiwen.com/subject/whsvzftx.html