美文网首页
11 并发框架的搭建

11 并发框架的搭建

作者: 攻城狮哦哦也 | 来源:发表于2019-08-27 14:07 被阅读0次

    1 业务逻辑

    2 框架分析

    图片.png

    3 类设计

    3.1 结果类

    装载任务的结果类型

    /**
     *
     *类说明:方法本身运行是否正确的结果类型
     */
    public enum TaskResultType {
        //方法成功执行并返回了业务人员需要的结果
        Success,
        //方法成功执行但是返回的是业务人员不需要的结果
        Failure,
        //方法执行抛出了Exception
        Exception;
    }
    

    装载任务结果

    /**
     *
     *类说明:任务处理返回结果实体类
     */
    public class TaskResult<R> {
        //方法本身运行是否正确的结果类型
        private final TaskResultType resultType;
        //方法的业务结果数据;
        private final R returnValue;
        //这里放方法失败的原因
        private final String reason;
        
        public TaskResult(TaskResultType resultType, R returnValue, String reason) {
            super();
            this.resultType = resultType;
            this.returnValue = returnValue;
            this.reason = reason;
        }
        
        //方便业务人员使用,这个构造方法表示业务方法执行成功返回的结果
        public TaskResult(TaskResultType resultType, R returnValue) {
            super();
            this.resultType = resultType;
            this.returnValue = returnValue;
            this.reason = "Success";
        }
    
        public TaskResultType getResultType() {
            return resultType;
        }
    
        public R getReturnValue() {
            return returnValue;
        }
    
        public String getReason() {
            return reason;
        }
    
        @Override
        public String toString() {
            return "TaskResult [resultType=" + resultType 
                    + ", returnValue=" + returnValue 
                    + ", reason=" + reason + "]";
        }   
        
        
        
    }
    

    3.2 工作的任务处理器

    接口定义,为调用者提供自定义实现任务逻辑的接口规范

    /**
     *
     *类说明:要求框架使用者实现的任务接口,因为任务的性质在调用时才知道,
     *所以传入的参数和方法的返回值均使用泛型
     */
    public interface ITaskProcesser<T, R> {
        /**
         * @param data 调用方法需要使用的业务数据
         * @return 方法执行后业务方法的结果
         */
        TaskResult<R> taskExecute(T data);
    }
    
    

    3.3 工作类

    同一类工作的定义,同类的工作可能多次执行
    包含任务的唯一标识、任务处理器、任务结果队列等信息

    /**
     *
     *类说明:提交给框架执行的工作实体类,工作:表示本批次需要处理的同性质任务(Task)的一个集合
     */
    public class JobInfo<R> {
        //区分唯一的工作
        private final String jobName;
        //工作的任务个数
        private final int jobLength;
        //这个工作的任务处理器
        private final ITaskProcesser<?,?> taskProcesser;
        //成功处理的任务数
        private final AtomicInteger  successCount;
        //已处理的任务数
        private final AtomicInteger  taskProcesserCount;
        //结果队列,拿结果从头拿,放结果从尾部放
        private final LinkedBlockingDeque<TaskResult<R>> taskDetailQueue;
        //工作的完成保存的时间,超过这个时间从缓存中清除
        private final long expireTime;
        
        //与课堂上有不同,修订为,阻塞队列不应该由调用者传入,应该内部生成,长度为工作的任务个数
        public JobInfo(String jobName, int jobLength, 
                ITaskProcesser<?, ?> taskProcesser,
                long expireTime) {
            super();
            this.jobName = jobName;
            this.jobLength = jobLength;
            this.taskProcesser = taskProcesser;
            this.successCount = new AtomicInteger(0);
            this.taskProcesserCount = new AtomicInteger(0);
            this.taskDetailQueue = new LinkedBlockingDeque<TaskResult<R>>(jobLength);;
            this.expireTime = expireTime;
        }
    
        public ITaskProcesser<?, ?> getTaskProcesser() {
            return taskProcesser;
        }
    
        //返回成功处理的结果数
        public int getSuccessCount() {
            return successCount.get();
        }
    
        //返回当前已处理的结果数
        public int getTaskProcesserCount() {
            return taskProcesserCount.get();
        }
        
        //提供工作中失败的次数,课堂上没有加,只是为了方便调用者使用
        public int getFailCount() {
            return taskProcesserCount.get() - successCount.get();
        }
        
        public String getTotalProcess() {
            return "Success["+successCount.get()+"]/Current["
                    +taskProcesserCount.get()+"] Total["+jobLength+"]";
        }
        
        //获得工作中每个任务的处理详情
        public List<TaskResult<R>> getTaskDetail(){
            List<TaskResult<R>> taskList = new LinkedList<>();
            TaskResult<R> taskResult;
            //从阻塞队列中拿任务的结果,反复取,一直取到null为止,说明目前队列中最新的任务结果已经取完,可以不取了
            while((taskResult=taskDetailQueue.pollFirst())!=null) {
                taskList.add(taskResult);
            }
            return taskList;
        }
        
        //放任务的结果,从业务应用角度来说,保证最终一致性即可,不需要对方法加锁.
        public void addTaskResult(TaskResult<R> result,CheckJobProcesser checkJob) {
            if (TaskResultType.Success.equals(result.getResultType())) {
                successCount.incrementAndGet();
            }
            taskDetailQueue.addLast(result);
            taskProcesserCount.incrementAndGet();
    
            //当所有任务结束后,将当前的JobInfo类放入延时队列
            if(taskProcesserCount.get()==jobLength) {
                checkJob.putJob(jobName, expireTime);
            }
            
        }
    }
    
    

    3.4 框架主体类

    • 将工作类JobInfo注册到ConcurrentHashMap<String, JobInfo<?>>中
    • 将任务放入BlockingQueue<Runnable>中
    • 将工作类JobInfo对应的任务处理封装在线程中,然后用线程池调用,每一个线程对应一个工作任务
    • 根据任务所属的JobInfo类型,在ConcurrentHashMap获取之前注册的JobInfo,然后用这个工作类处理当前任务
    • 改JobInfo类型的所有任务结果存储在JobInfo的LinkedBlockingDeque<TaskResult<R>>结果队列中
    
    /**
     * 框架的主体类,也是调用者主要使用的类
     */
    public class PendingJobPool {
        
        //保守估计
        private static final int THREAD_COUNTS = 
                Runtime.getRuntime().availableProcessors();
        //任务队列
        private static BlockingQueue<Runnable> taskQueue
         = new ArrayBlockingQueue<>(5000);
        //线程池,固定大小,有界队列
        private static ExecutorService taskExecutor = 
                new ThreadPoolExecutor(THREAD_COUNTS, THREAD_COUNTS, 60, 
                        TimeUnit.SECONDS, taskQueue);
        //job的存放容器
        private static ConcurrentHashMap<String, JobInfo<?>> jobInfoMap
           = new  ConcurrentHashMap<>();
        
        private static CheckJobProcesser checkJob
            = CheckJobProcesser.getInstance();
        
        public static Map<String, JobInfo<?>> getMap(){
            return jobInfoMap;
        }
        
        //单例模式------
        private PendingJobPool() {}
        
        private static class JobPoolHolder{
            public static PendingJobPool pool = new PendingJobPool();
        }
        
        public static PendingJobPool getInstance() {
            return JobPoolHolder.pool;
        }
        //单例模式------
        
        //对工作中的任务进行包装,提交给线程池使用,并处理任务的结果,写入缓存以供查询
        private static class PendingTask<T,R> implements Runnable{
            
            private JobInfo<R> jobInfo;
            private T processData;
    
            public PendingTask(JobInfo<R> jobInfo, T processData) {
                super();
                this.jobInfo = jobInfo;
                this.processData = processData;
            }
    
            @SuppressWarnings("unchecked")
            @Override
            public void run() {
                R r = null;
                ITaskProcesser<T,R> taskProcesser =
                        (ITaskProcesser<T, R>) jobInfo.getTaskProcesser();
                TaskResult<R> result = null;
                
                try {
                    //调用业务人员实现的具体方法
                    result = taskProcesser.taskExecute(processData);
                    //要做检查,防止开发人员处理不当
                    if (result == null) {
                        result = new TaskResult<R>(TaskResultType.Exception, r, 
                                "result is null");
                    }
                    if (result.getResultType() == null) {
                        if (result.getReason() == null) {
                            result = new TaskResult<R>(TaskResultType.Exception, r, "reason is null");
                        } else {
                            result = new TaskResult<R>(TaskResultType.Exception, r,
                                    "result is null,but reason:" + result.getReason());
                        }
                    } 
                } catch (Exception e) {
                    e.printStackTrace();
                    result = new TaskResult<R>(TaskResultType.Exception, r, 
                            e.getMessage());                
                }finally {
                    jobInfo.addTaskResult(result,checkJob);
                }
            }
        }
        
        //根据工作名称检索工作
        @SuppressWarnings("unchecked")
        private <R> JobInfo<R> getJob(String jobName){
            JobInfo<R> jobInfo = (JobInfo<R>) jobInfoMap.get(jobName);
            if(null==jobInfo) {
                throw new RuntimeException(jobName+"是个非法任务。");
            }
            return jobInfo;
        }
        
        //调用者提交工作中的任务
        public <T,R> void putTask(String jobName,T t) {
            JobInfo<R> jobInfo = getJob(jobName);
            PendingTask<T,R> task = new PendingTask<T,R>(jobInfo,t);
            taskExecutor.execute(task);
        }
        
        //调用者注册工作,如工作名,任务的处理器等等
        public <R> void registerJob(String jobName,int jobLength,
                ITaskProcesser<?, ?> taskProcesser,long expireTime) {
            JobInfo<R> jobInfo = new JobInfo(jobName,jobLength,
                    taskProcesser,expireTime);
            if (jobInfoMap.putIfAbsent(jobName, jobInfo)!=null) {
                throw new RuntimeException(jobName+"已经注册了!");
            }
        }
        
        //获得每个任务的处理详情
        public <R> List<TaskResult<R>> getTaskDetail(String jobName){
            JobInfo<R> jobInfo = getJob(jobName);
            return jobInfo.getTaskDetail();
        }
        
        //获得工作的整体处理进度
        public <R> String getTaskProgess(String jobName) {
            JobInfo<R> jobInfo = getJob(jobName);
            return jobInfo.getTotalProcess();   
        }
        
    }
    
    

    3.5 缓存类

    将任务处理结果放入延时队列,超时则清除出缓存

    
    /**
     *
     *类说明:任务完成后,在一定的时间供查询,之后为释放资源节约内存,需要定期处理过期的任务
     */
    public class CheckJobProcesser {
        private static DelayQueue<ItemVo<String>> queue 
            = new DelayQueue<ItemVo<String>>();//存放已完成任务等待过期的队列
        
        //单例模式------
        private CheckJobProcesser() {}
        
        private static class ProcesserHolder{
            public static CheckJobProcesser processer = new CheckJobProcesser();
        }
        
        public static CheckJobProcesser getInstance() {
            return ProcesserHolder.processer;
        }
        //单例模式------    
        
        //处理队列中到期任务的实行
        private static class FetchJob implements Runnable{
    
            @Override
            public void run() {
                while(true) {
                    try {
                        //拿到已经过期的任务
                        ItemVo<String> item = queue.take();
                        String jobName =  (String)item.getDate();
                        PendingJobPool.getMap().remove(jobName);
                        System.out.println(jobName+" is out of date,remove from map!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }           
            }
        }
        
        /*任务完成后,放入队列,经过expireTime时间后,从整个框架中移除*/
        public void putJob(String jobName,long expireTime) {
            ItemVo<String> item = new ItemVo<String>(expireTime,jobName);
            queue.offer(item);
            System.out.println("Job["+jobName+"已经放入了过期检查缓存,过期时长:"+expireTime);
        }
        
        static {
            Thread thread = new Thread(new FetchJob());
            thread.setDaemon(true);
            thread.start();
            System.out.println("开启任务过期检查守护线程................");
        }
        
        
    }
    
    

    3.6 调用实现类

    /**
     *
     *类说明:一个实际任务类,将数值加上一个随机数,并休眠随机时间
     */
    public class MyTask implements ITaskProcesser<Integer,Integer> {
    
        @Override
        public TaskResult<Integer> taskExecute(Integer data) {
            Random r = new Random();
            int flag = r.nextInt(500);
            SleepTools.ms(flag);
            if(flag<=300) {//正常处理的情况
                Integer returnValue = data.intValue()+flag;
                return new TaskResult<Integer>(TaskResultType.Success,returnValue);
            }else if(flag>301&&flag<=400) {//处理失败的情况
                return new TaskResult<Integer>(TaskResultType.Failure,-1,"Failure");
            }else {//发生异常的情况
                try {
                    throw new RuntimeException("异常发生了!!");
                } catch (Exception e) {
                    return new TaskResult<Integer>(TaskResultType.Exception,
                            -1,e.getMessage());
                }
            }
        }
    
    }
    
    
    /**
     *
     *类说明:模拟一个应用程序,提交工作和任务,并查询任务进度
     */
    public class AppTest {
        
        private final static String JOB_NAME = "计算数值";
        private final static int JOB_LENGTH = 1000;
        
        //查询任务进度的线程
        private static class QueryResult implements Runnable{
            
            private PendingJobPool pool;
    
            public QueryResult(PendingJobPool pool) {
                super();
                this.pool = pool;
            }
    
            @Override
            public void run() {
                int i=0;//查询次数
                while(i<350) {
                    List<TaskResult<String>> taskDetail = pool.getTaskDetail(JOB_NAME);
                    if(!taskDetail.isEmpty()) {
                        System.out.println(pool.getTaskProgess(JOB_NAME));
                        System.out.println(taskDetail);                 
                    }
                    SleepTools.ms(100);
                    i++;
                }
            }
            
        }
    
        public static void main(String[] args) {
            MyTask myTask = new MyTask();
            //拿到框架的实例
            PendingJobPool pool = PendingJobPool.getInstance();
            //注册job
            pool.registerJob(JOB_NAME, JOB_LENGTH, myTask,1000*5);
            Random r = new Random();
            for(int i=0;i<JOB_LENGTH;i++) {
                //依次推入Task
                pool.putTask(JOB_NAME, r.nextInt(1000));
            }
            Thread t = new Thread(new QueryResult(pool));
            t.start();
        }
    }
    

    相关文章

      网友评论

          本文标题:11 并发框架的搭建

          本文链接:https://www.haomeiwen.com/subject/oucdectx.html