美文网首页小白架构师之路
批量任务的并发框架

批量任务的并发框架

作者: 王侦 | 来源:发表于2018-08-30 17:45 被阅读0次

批量任务的并发框架代码

1.需求的产生和分析

  • 需求1.某学校需要针对学生生成批量的试卷题目


  • 需求2.该学校的题库是由不同老师录入,因此需要对题目进行排重。或者对某个知识点要替换老的题目,此时需要根据条件批量修改题目内容。


  • 两个需求的共同痛点
    1、都有批量任务要完成且速度慢
    2、都要求可以查询进度
    3、在使用上尽可能的对业务开发人员友好

2.架构设计

2.1 针对批量任务

  • 提高性能,采用多线程,屏蔽细节
    封装线程池和阻塞队列

  • 每个批量任务拥有自己的上下文环境
    需要一个并发安全的容器保存每个任务
    这里可以采用ConcurrentHashMap

  • 自动清除已完成的过期任务
    可以使用DelayQueue

2.2 针对可查询进度

  • 用户如何提交他的工作任务和查询任务进度?

  • 如何执行用户的业务方法?
    用户提供自己的业务处理方法,并作为参数进行注册。

  • 用户业务方法的结果怎么表示?
    需要表示任务执行的状态:成功、失败、异常
    成功的时候要返回结果数据,失败的时候要返回失败的原因。

2.3 框架流程图

3.详细流程的说明

3.1 业务人员使用并发框架

    public static void main(String[] args) {
        MyTask myTask = new MyTask();
        //拿到框架的实例
        PendingJobPool pool = PendingJobPool.getInstance();
        //注册job
        pool.registerJob(JOB_NAME, JOB_LENGTH, myTask,1000*5);
        Random r = new Random();
        for(int i=0;i<JOB_LENGTH;i++) {
            //依次推入Task
            pool.putTask(JOB_NAME, r.nextInt(1000));
        }
        Thread t = new Thread(new QueryResult(pool));
        t.start();
    }

3.2 生成并发框架

PendingJobPool pool = PendingJobPool.getInstance();
public class PendingJobPool {

    //单例模式------
    private PendingJobPool() {}

    private static class JobPoolHolder{
        public static PendingJobPool pool = new PendingJobPool();
    }

    public static PendingJobPool getInstance() {
        return JobPoolHolder.pool;
    }
    //单例模式------
    //保守估计
    private static final int THREAD_COUNTS =
            Runtime.getRuntime().availableProcessors();
    //任务队列
    private static BlockingQueue<Runnable> taskQueue
            = new ArrayBlockingQueue<>(5000);
    //线程池,固定大小,有界队列
    private static ExecutorService taskExecutor =
            new ThreadPoolExecutor(THREAD_COUNTS, THREAD_COUNTS, 60,
                    TimeUnit.SECONDS, taskQueue);
    //job的存放容器
    private static ConcurrentHashMap<String, JobInfo<?>> jobInfoMap
            = new  ConcurrentHashMap<>();

    private static CheckJobProcesser checkJob
            = CheckJobProcesser.getInstance();

3.3 注册工作,依次将任务提交给框架(这里只注册了一个批量任务)

3.3.1 注册工作

要指定工作名称、批量任务数量、处理任务的方法以及已完成任务过期时间。

pool.registerJob(JOB_NAME, JOB_LENGTH, myTask,1000*5);

这里注册,就是生成一个JobInfo<R>任务,放入jobInfoMap

    public <R> void registerJob(String jobName, int jobLength,
                                ITaskProcesser<?, ?> taskProcesser, long expireTime) {
        JobInfo<R> jobInfo = new JobInfo(jobName,jobLength,
                taskProcesser,expireTime);
        if (jobInfoMap.putIfAbsent(jobName, jobInfo)!=null) {
            throw new RuntimeException(jobName+"已经注册了!");
        }
    }
  • MyTask模拟任务处理,随机返回三种结果类型
public class MyTask implements ITaskProcesser<Integer,Integer> {

    @Override
    public TaskResult<Integer> taskExecute(Integer data) {
        Random r = new Random();
        int flag = r.nextInt(500);
        SleepTools.ms(flag);
        if(flag<=300) {//正常处理的情况
            Integer returnValue = data.intValue()+flag;
            return new TaskResult<Integer>(TaskResultType.Success,returnValue);
        }else if(flag>301&&flag<=400) {//处理失败的情况
            return new TaskResult<Integer>(TaskResultType.Failure,-1,"Failure");
        }else {//发生异常的情况
            try {
                throw new RuntimeException("异常发生了!!");
            } catch (Exception e) {
                return new TaskResult<Integer>(TaskResultType.Exception,
                        -1,e.getMessage());
            }
        }
    }
}

3.3.2 依次将任务提交给框架——也即将任务提交给线程池执行

这里T是MyTask任务处理方法的参数类型;R是MyTask返回类型中的类型——TaskResult<Integer> taskExecute(Integer data)。

       Random r = new Random();
        for(int i=0;i<JOB_LENGTH;i++) {
            //依次推入Task
            pool.putTask(JOB_NAME, r.nextInt(1000));
        }
    public <T,R> void putTask(String jobName,T t) {
        JobInfo<R> jobInfo = getJob(jobName);
        PendingTask<T,R> task = new PendingTask<T,R>(jobInfo,t);
        taskExecutor.execute(task);
    }

这里主要是调用业务人员开发的taskProcesser进行处理!

    //线程池,固定大小,有界队列
    private static ExecutorService taskExecutor =
            new ThreadPoolExecutor(THREAD_COUNTS, THREAD_COUNTS, 60,
                    TimeUnit.SECONDS, taskQueue);

    private static class PendingTask<T,R> implements Runnable{

        private JobInfo<R> jobInfo;
        private T processData;

        public PendingTask(JobInfo<R> jobInfo, T processData) {
            super();
            this.jobInfo = jobInfo;
            this.processData = processData;
        }

        @SuppressWarnings("unchecked")
        @Override
        public void run() {
            R r = null;
            ITaskProcesser<T,R> taskProcesser =
                    (ITaskProcesser<T, R>) jobInfo.getTaskProcesser();
            TaskResult<R> result = null;

            try {
                //调用业务人员实现的具体方法
                result = taskProcesser.taskExecute(processData);
                //要做检查,防止开发人员处理不当
                if (result == null) {
                    result = new TaskResult<R>(TaskResultType.Exception, r,
                            "result is null");
                }
                if (result.getResultType() == null) {
                    if (result.getReason() == null) {
                        result = new TaskResult<R>(TaskResultType.Exception, r, "reason is null");
                    } else {
                        result = new TaskResult<R>(TaskResultType.Exception, r,
                                "result is null,but reason:" + result.getReason());
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                result = new TaskResult<R>(TaskResultType.Exception, r,
                        e.getMessage());
            }finally {
                jobInfo.addTaskResult(result,checkJob);
            }
        }
    }

3.3.3 任务的结果类型设计

public enum TaskResultType {
    //方法成功执行并返回了业务人员需要的结果
    Success,
    //方法成功执行但是返回的是业务人员不需要的结果
    Failure,
    //方法执行抛出了Exception
    Exception;
}
public class TaskResult<R> {
    //方法本身运行是否正确的结果类型
    private final TaskResultType resultType;
    //方法的业务结果数据;
    private final R returnValue;
    //这里放方法失败的原因
    private final String reason;

    public TaskResult(TaskResultType resultType, R returnValue, String reason) {
        super();
        this.resultType = resultType;
        this.returnValue = returnValue;
        this.reason = reason;
    }

    //方便业务人员使用,这个构造方法表示业务方法执行成功返回的结果
    public TaskResult(TaskResultType resultType, R returnValue) {
        super();
        this.resultType = resultType;
        this.returnValue = returnValue;
        this.reason = "Success";
    }

3.3.4 执行完一个任务后的处理——更新任务的进度

jobInfo.addTaskResult(result,checkJob);
    public void addTaskResult(TaskResult<R> result, CheckJobProcesser checkJob) {
        if (TaskResultType.Success.equals(result.getResultType())) {
            successCount.incrementAndGet();
        }
        taskDetailQueue.addLast(result);
        taskProcesserCount.incrementAndGet();

        if (taskProcesserCount.get() == jobLength) {
            checkJob.putJob(jobName, expireTime);
        }

    }
  • 任务结构的设计
public class JobInfo<R> {
    //区分唯一的工作
    private final String jobName;
    //工作的任务个数
    private final int jobLength;
    //这个工作的任务处理器
    private final ITaskProcesser<?, ?> taskProcesser;
    //成功处理的任务数
    private final AtomicInteger successCount;
    //已处理的任务数
    private final AtomicInteger taskProcesserCount;
    //结果队列,拿结果从头拿,放结果从尾部放
    private final LinkedBlockingDeque<TaskResult<R>> taskDetailQueue;
    //工作的完成保存的时间,超过这个时间从缓存中清除
    private final long expireTime;

    //与课堂上有不同,修订为,阻塞队列不应该由调用者传入,应该内部生成,长度为工作的任务个数
    public JobInfo(String jobName, int jobLength,
                   ITaskProcesser<?, ?> taskProcesser,
                   long expireTime) {
        super();
        this.jobName = jobName;
        this.jobLength = jobLength;
        this.taskProcesser = taskProcesser;
        this.successCount = new AtomicInteger(0);
        this.taskProcesserCount = new AtomicInteger(0);
        this.taskDetailQueue = new LinkedBlockingDeque<TaskResult<R>>(jobLength);
        ;
        this.expireTime = expireTime;
    }

    public ITaskProcesser<?, ?> getTaskProcesser() {
        return taskProcesser;
    }

    //返回成功处理的结果数
    public int getSuccessCount() {
        return successCount.get();
    }

    //返回当前已处理的结果数
    public int getTaskProcesserCount() {
        return taskProcesserCount.get();
    }

    //提供工作中失败的次数,课堂上没有加,只是为了方便调用者使用
    public int getFailCount() {
        return taskProcesserCount.get() - successCount.get();
    }

    public String getTotalProcess() {
        return "Success[" + successCount.get() + "]/Current["
                + taskProcesserCount.get() + "] Total[" + jobLength + "]";
    }

    //获得工作中每个任务的处理详情
    public List<TaskResult<R>> getTaskDetail() {
        List<TaskResult<R>> taskList = new LinkedList<>();
        TaskResult<R> taskResult;
        //从阻塞队列中拿任务的结果,反复取,一直取到null为止,说明目前队列中最新的任务结果已经取完,可以不取了
        while ((taskResult = taskDetailQueue.pollFirst()) != null) {
            taskList.add(taskResult);
        }
        return taskList;
    }

    //放任务的结果,从业务应用角度来说,保证最终一致性即可,不需要对方法加锁.
    public void addTaskResult(TaskResult<R> result, CheckJobProcesser checkJob) {
        if (TaskResultType.Success.equals(result.getResultType())) {
            successCount.incrementAndGet();
        }
        taskDetailQueue.addLast(result);
        taskProcesserCount.incrementAndGet();

        if (taskProcesserCount.get() == jobLength) {
            checkJob.putJob(jobName, expireTime);
        }

    }
}

3.4 模拟查询的线程

        Thread t = new Thread(new QueryResult(pool));
        t.start();
    private static class QueryResult implements Runnable{

        private PendingJobPool pool;

        public QueryResult(PendingJobPool pool) {
            super();
            this.pool = pool;
        }

        @Override
        public void run() {
            int i=0;//查询次数
            while(i<350) {
                List<TaskResult<String>> taskDetail = pool.getTaskDetail(JOB_NAME);
                if(!taskDetail.isEmpty()) {
                    System.out.println(pool.getTaskProgess(JOB_NAME));
                    System.out.println(taskDetail);
                }
                SleepTools.ms(100);
                i++;
            }
        }

    }

3.5 对已完成工作的过期处理

public class CheckJobProcesser {
    private static DelayQueue<ItemVo<String>> queue 
        = new DelayQueue<ItemVo<String>>();//存放已完成任务等待过期的队列
    
    //单例模式------
    private CheckJobProcesser() {}
    
    private static class ProcesserHolder{
        public static CheckJobProcesser processer = new CheckJobProcesser();
    }
    
    public static CheckJobProcesser getInstance() {
        return ProcesserHolder.processer;
    }
    //单例模式------    
    
    //处理队列中到期任务的实行
    private static class FetchJob implements Runnable{

        @Override
        public void run() {
            while(true) {
                try {
                    //拿到已经过期的任务
                    ItemVo<String> item = queue.take();
                    String jobName =  (String)item.getDate();
                    PendingJobPool.getMap().remove(jobName);
                    System.out.println(jobName+" is out of date,remove from map!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }           
        }
    }
    
    /*任务完成后,放入队列,经过expireTime时间后,从整个框架中移除*/
    public void putJob(String jobName,long expireTime) {
        ItemVo<String> item = new ItemVo<String>(expireTime,jobName);
        queue.offer(item);
        System.out.println("Job["+jobName+"已经放入了过期检查缓存,过期时长:"+expireTime);
    }
    
    static {
        Thread thread = new Thread(new FetchJob());
        thread.setDaemon(true);
        thread.start();
        System.out.println("开启任务过期检查守护线程................");
    }
    
    
}

参考

  • 1)享学课堂Mark老师笔记

相关文章

  • 批量任务的并发框架

    批量任务的并发框架代码 1.需求的产生和分析 需求1.某学校需要针对学生生成批量的试卷题目 需求2.该学校的题库是...

  • Go执行任务,这一文就说明了

    大家在开发过程中肯定遇见过批量任务处理的,一般都是串行执行,今天我们来个并发版本的 创建一个批量执行任务的结构体(...

  • 一次性能调优总结

    批量任务和异步任务,批量任务采用分发方式,批量任务生产异步任务。总的情况是批量任务的生产速度大于异步任务的消耗速度...

  • Swift 中的 Task

    Swift 中的 Task 是 WWDC 2021 引入的并发框架的一部分。任务允许我们从非并发方法创建并发环境,...

  • Mac 下使用 Python+Selenium 实现西瓜视频自动

    背景 研究下 Python+Selenium 自动化测试框架,简单实现 Mac 下自动化批量上传视频西瓜视频并发布...

  • TBSchedule使用简介

    TBSchedule是什么 TBSchedule是一个支持分布式的调度框架,让批量任务或者不断变化的任务能够被动态...

  • java fork/join框架下的归并排序和快排

    前言 Fork/Join框架是Java 7提供的一个用于并发执行任务的框架,其主要思想就是把大任务分割成若干的小任...

  • JDK线程池源码分析之ThreadPoolExecutor

    前言 JDK中为我们提供了一个并发线程框架,它是的我们可以在有异步任务或大量并发任务需要执行时可以使用它提供的线程...

  • Scala和Golang并发实现对比

    系统中有多个任务同时存在称之为“并发”,并发设计已然成为大规模集群框架的必要特征,本文简单的介绍Scala和gol...

  • Netty源码分析-03 Netty线程池

    线程池是一个在多线程场景中运用很广泛的并发框架,需要异步执行或并发执行任务的程序都可以使用线程池。有任务到来时,如...

网友评论

    本文标题:批量任务的并发框架

    本文链接:https://www.haomeiwen.com/subject/zmnlwftx.html