美文网首页
手写并发框架(一)

手写并发框架(一)

作者: Theodore的技术站 | 来源:发表于2019-01-26 19:45 被阅读13次

    并发框架是为了将业务代码和多线程代码隔离开来,为了可以让不懂并发的人员也可以开发使用这个框架。

    框架中提供线程池、任务储存容器、使用者需要实现的任务接口、提交给框架执行的工作实体类、任务返回结果实体类、有可能还需要缓存定期清理已完成的任务。

    框架业务示意图:

    不论那种类型的任务都需要兼容,支持用户注册提交任务,查询任务进度和结果。


    框架流程图:

    框架内部跟业务代码是松耦合的,业务人员不需要了解内部代码,用线程池支持并发,提供给业务人员注册任务,并提供查询任务结果方法。内部需要并发容器储存任务,已经完成的任务放入队列中过期清除。


    废话不多说,直接上代码:

    这个接口需要业务人员实现,实现想要执行的任务

    /*
    *要求框架使用者实现的任务接口。
    * data 是方法使用的业务数据类型
    * return 方法执行后业务返回的结果
    * */
    public interface ITaskProcesser<T,R> {
        TaskResult<R> taskExcutor(T data);
    }
    

    工作实体类:

    用泛型实现,提供了任务计数器,任务计数器用到了原子类型,避免内存重排序导致的错误。
    提供了放回任务结果的方法供开发人员查看。
    提供了将完成的任务放入队列的方法。
    其中任务计数器是不能让业务人员随意操作的,所以构造的时候没有传入参数。

    import java.util.LinkedList;
    import java.util.List;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /*
    * 提交给框架执行的工作实体类
    * */
    public class JobInfo<R> {
        //工作名字
        private final String jobName;
        //工作的任务个数
        private final int jobLength;
        //工作任务处理器
        private final ITaskProcesser<?,?> taskProcesser;
        //任务计数器
        private AtomicInteger successCount;//原子类型
        private AtomicInteger taskProcesserCount;
        //放入已完成的队列
        private LinkedBlockingDeque<TaskResult<R>> taskDetailQueue;
        //完成任务超时 定时器 超时清除
        private final long expireTime;
    
        //构造方法
        public JobInfo(String jobName, int jobLength, ITaskProcesser<?, ?> taskProcesser, long expireTime) {
            this.jobName = jobName;
            this.jobLength = jobLength;
            this.taskProcesser = taskProcesser;
            //开发人员不应该修改任务的计数 由框架来控制
            this.successCount = new AtomicInteger();
            this.taskProcesserCount = new AtomicInteger();
            this.taskDetailQueue = new LinkedBlockingDeque<TaskResult<R>>(jobLength);
            this.expireTime = expireTime;
        }
    
        public ITaskProcesser<?, ?> getTaskProcesser() {
            return taskProcesser;
        }
    
        //返回成功处理的结果数
        public int getSuccessCount() {
            return successCount.get();
        }
    
        //返回当前已处理的结果数
        public int getTaskProcesserCount() {
            return taskProcesserCount.get();
        }
    
        //提供工作中失败的次数
        public int getFailCount() {
            return taskProcesserCount.get() - successCount.get();
        }
    
    
        public String getTotalProcess() {
            return "Success["+successCount.get()+"]/Current["
                    +taskProcesserCount.get()+"] Total["+jobLength+"]";
        }
    
        //获取每个任务结果,从头部获取
        public List<TaskResult<R>> getTaskDetail(){
            List<TaskResult<R>> taskList = new LinkedList<>();
            TaskResult<R> taskResult;
            while((taskResult = taskDetailQueue.pollFirst()) != null){
                taskList.add(taskResult);
            }
            return  taskList;
        }
    
        //从业务角度来讲,保证最终一致性就行,不需要加锁,影响性能。已经用了源自操作和并发安全队列
        public void addTaskResult(TaskResult<R> result, CheckJobProcesser checkJob){
            if (TaskResultType.Success.equals(result.getResultType())){
                successCount.incrementAndGet();
            }
            taskDetailQueue.addLast(result);//结果从尾部添加
            taskProcesserCount.incrementAndGet();
            if (taskProcesserCount.get() == jobLength){
                checkJob.putJob(jobName,expireTime);
            }
    
        }
    }
    

    任务结果类

    提供返回任务结果和原因的方法。

    /*
    * 任务返回结果实体类
    * */
    public class TaskResult<R> {
        private final TaskResultType resultType;
        private final R returnValue;//业务结果数据
        private final String reason;//方法失败原因
    
        public TaskResult(TaskResultType resultType, R returnValue, String reason) {
            this.resultType = resultType;
            this.returnValue = returnValue;
            this.reason = reason;
        }
    
        public TaskResult(TaskResultType resultType, R returnValue) {
            this.resultType = resultType;
            this.returnValue = returnValue;
            this.reason = "Success";
        }
    
        public TaskResultType getResultType() {
            return resultType;
        }
    
        public R getReturnValue() {
            return returnValue;
        }
    
        public String getReason() {
            return reason;
        }
    }
    

    任务执行的结果类:

    /*
    *
    * 任务执行的结果类
    * */
    public enum TaskResultType {
        Success,Failure,Exception;
        //返回业务人员需要的结果
        //返回业务人员不需要的结果
        //返回异常
    }
    

    查询结果类:

    主要实现将完成的任务放入队列,供查询,如果一点时间过了就将任务清除,防止占用大量内存。
    使用了单例模式,也是为了节省内存。
    启用线程清理任务,设置为守护线程。

    /*
    * 任务完成后,在一定的时间供查询,之后为释放资源节约内存,需要定期处理过期的任务
    * */
    public class CheckJobProcesser {
    
        private static DelayQueue<ItemVo<String>> queue = new DelayQueue<>();//存放已完成任务,超时过期
    
        //单例模式------
        private CheckJobProcesser(){}
    
        private static class ProcesserHolder{
            public static CheckJobProcesser processer = new CheckJobProcesser();
        }
    
        public static CheckJobProcesser getInstance(){
            return ProcesserHolder.processer;
        }
        //单例模式------
    
        //处理队列中到期任务的实行
        private static class FetchJob implements Runnable{
    
            @Override
            public void run() {
                while (true){
                    try{
                        ItemVo<String> item = queue.take();
                        String jobName = (String)item.getDate();
                        PendingJobPool.getMap().remove(jobName);
                        System.out.println(jobName + "is out of date , remove from map");
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            }
        }
        /*任务完成后,放入队列,经过expireTime时间后,从整个框架中移除*/
        public void putJob(String jobName,long expireTime){
            ItemVo<String> item = new ItemVo<>(expireTime,jobName);
            queue.offer(item);
            System.out.println("job[" + jobName + "已经放入过期检查缓存,过期时长:" + expireTime);
        }
    
        //类初始化的时候就运行线程
        static {
            Thread thread = new Thread(new FetchJob());
            thread.setDaemon(true);
            thread.start();
            System.out.println("开启守护线程");
        }
    }
    

    框架主体:

    这里使用了线程池,保守估计线程个数,使用的个数和cpu数量相同,这个可以根据业务需求修改。

    线程池没有用 JDK 提供的那几个,主要考虑到想要用有界队列,就自己定义线程池。
    用 ConcurrentHashMap 存放任务。

    提供了注册任务方法、提交任务方法、获得任务结果等。

    对工作中的任务进行了包装,提交给线程池使用,并处理任务的结果,写入缓存以供查询
    同样使用了单例模式。

    import com.enjoy.MultiThread.ch8a.vo.ITaskProcesser;
    import com.enjoy.MultiThread.ch8a.vo.JobInfo;
    import com.enjoy.MultiThread.ch8a.vo.TaskResult;
    import com.enjoy.MultiThread.ch8a.vo.TaskResultType;
    
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.*;
    /**
     * 框架的主体类,也是调用者主要使用的类
     */
    public class PendingJobPool {
    
        //保守估计
        private static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors();
    
        //有界队列
        private static BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(5000);
    
        //创建固定大小有界队列线程池
        private static ExecutorService taskExecutor
                = new ThreadPoolExecutor(THREAD_COUNT,THREAD_COUNT,60, TimeUnit.SECONDS,taskQueue);
    
        //job 存放容器
        private static ConcurrentHashMap<String, JobInfo<?>> jobInfoMap = new ConcurrentHashMap<>();
    
        public static Map<String, JobInfo<?>> getMap(){
            return jobInfoMap;
        }
    
        private static CheckJobProcesser checkJob
                = CheckJobProcesser.getInstance();
    
        //单例模式---
        private PendingJobPool(){}
    
        private static class JobPoolHolder{
            public static PendingJobPool pool = new PendingJobPool();
        }
    
        public static PendingJobPool getInstance(){
            return JobPoolHolder.pool;
        }
        //单例模式---
    
        //调用者注册工作,如工作名,任务的处理器等等
        public<R> void registerJob(String jobName, int jobLength, ITaskProcesser<?,?> taskProcesser,long expireTime){
            JobInfo<R> jobInfo = new JobInfo(jobName,jobLength,taskProcesser,expireTime);
            if (jobInfoMap.putIfAbsent(jobName,jobInfo) != null){
                throw new RuntimeException("当前任务已经注册");
            }
    
        }
    
        //调用者提交任务
        public <T,R> void putTask(String jobName,T t){
            JobInfo<R> jobInfo = getJob(jobName);
            PendingTask<T,R> task = new PendingTask<>(jobInfo,t);
            taskExecutor.execute(task);
        }
    
        //根据工作名称检索工作
        private <R> JobInfo<R> getJob(String jobName){
            JobInfo<R> jobInfo = (JobInfo<R>) jobInfoMap.get(jobName);
            if (null == jobInfo){
                throw new RuntimeException(jobName + "是个非法任务");
            }
            return jobInfo;
        }
    
        //对工作中的任务进行包装,提交给线程池使用,并处理任务的结果,写入缓存以供查询
        private static class PendingTask<T,R> implements Runnable{
            private JobInfo<R> jobInfo;
            private T processData;
    
            public PendingTask(JobInfo<R> jobInfo,T processData){
                this.jobInfo = jobInfo;
                this.processData = processData;
            }
    
            @Override
            public void run(){
                R r = null;
                ITaskProcesser<T,R> taskProcesser = (ITaskProcesser<T,R>) jobInfo.getTaskProcesser();
                TaskResult<R> result = null;
                //调用业务员人员实现的方法
                result = taskProcesser.taskExcutor(processData);
                //要做检查,防止异常
    
                try{
                    if (result == null){
                        result = new TaskResult<R>(TaskResultType.Exception,r,"result id null");
                    }
                    if (result.getResultType() == null) {
                        if (result.getReason() == null) {
                            result = new TaskResult<R>(TaskResultType.Exception, r, "reason is null");
                        } else {
                            result = new TaskResult<R>(TaskResultType.Exception, r,
                                    "result is null,but reason:" + result.getReason());
                        }
                    }
                }catch(Exception e){
                        e.printStackTrace();
                        result = new TaskResult<R>(TaskResultType.Exception,r,e.getMessage());
                }finally {
                    jobInfo.addTaskResult(result,checkJob);
                }
            }
        }
    
        //获得每个任务的处理详情
        public <R> List<TaskResult<R>> getTaskDetail(String jobName){
            JobInfo<R> jobInfo = getJob(jobName);
            return jobInfo.getTaskDetail();
        }
    
        //获得工作的整体处理进度
        public <R> String getTaskProgess(String jobName) {
            JobInfo<R> jobInfo = getJob(jobName);
            return jobInfo.getTotalProcess();
        }
    }
    

    实现了自己的任务测试一下整个框架,构造了成功失败异常的情况:

    import com.enjoy.MultiThread.ch8a.vo.ITaskProcesser;
    import com.enjoy.MultiThread.ch8a.vo.TaskResult;
    import com.enjoy.MultiThread.ch8a.vo.TaskResultType;
    import com.enjoy.MultiThread.tools.SleepTools;
    
    import java.util.Random;
    /**
     *类说明:一个实际任务类,将数值加上一个随机数,并休眠随机时间
     */
    public class MyTask implements ITaskProcesser<Integer,Integer> {
    
        @Override
        public TaskResult<Integer> taskExcutor(Integer data) {
            Random r = new Random();
            int flag = r.nextInt(500);
            SleepTools.ms(flag);
            if(flag<=300) {//正常处理的情况
                Integer returnValue = data.intValue()+flag;
                return new TaskResult<Integer>(TaskResultType.Success,returnValue);
            }else if(flag>301&&flag<=400) {//处理失败的情况
                return new TaskResult<Integer>(TaskResultType.Failure,-1,"Failure");
            }else {//发生异常的情况
                try {
                    throw new RuntimeException("异常发生了!!");
                } catch (Exception e) {
                    return new TaskResult<Integer>(TaskResultType.Exception,
                            -1,e.getMessage());
                }
            }
        }
    }
    

    测试代码:

    import com.enjoy.MultiThread.ch8a.PendingJobPool;
    import com.enjoy.MultiThread.ch8a.vo.TaskResult;
    import com.enjoy.MultiThread.tools.SleepTools;
    
    import java.util.List;
    import java.util.Random;
    
    public class AppTest {
        private final static String JOB_NAME = "计算数值";
        private final static int JOB_LENGTH = 1000;
    
        //查询任务进度的线程
        private static class QueryResult implements Runnable{
    
            private PendingJobPool pool;
    
            public QueryResult(PendingJobPool pool) {
                super();
                this.pool = pool;
            }
    
            @Override
            public void run() {
                int i=0;//查询次数
                while(i<350) {
                    List<TaskResult<String>> taskDetail = pool.getTaskDetail(JOB_NAME);
                    if(!taskDetail.isEmpty()) {
                        System.out.println(pool.getTaskProgess(JOB_NAME));
                        System.out.println(taskDetail);
                    }
                    SleepTools.ms(100);
                    i++;
                }
            }
        }
    
        public static void main(String[] args) {
            MyTask myTask = new MyTask();
            //拿到框架的实例
            PendingJobPool pool = PendingJobPool.getInstance();
            //注册job
            pool.registerJob(JOB_NAME, JOB_LENGTH, myTask,1000*5);
            Random r = new Random();
            for(int i=0;i<JOB_LENGTH;i++) {
                //依次推入Task
                pool.putTask(JOB_NAME, r.nextInt(1000));
            }
            Thread t = new Thread(new QueryResult(pool));
            t.start();
        }
    }
    

    测试结果:

    结果比较多没有放全,可以自己本地跑一下。


    image.png

    代码 github 地址:

    https://github.com/theodore816/javastudy/tree/master/com/enjoy/MultiThread/ch8a

    相关文章

      网友评论

          本文标题:手写并发框架(一)

          本文链接:https://www.haomeiwen.com/subject/jdhijqtx.html