美文网首页Java
多线程应用设计 Master-Worker模式

多线程应用设计 Master-Worker模式

作者: ThingLin | 来源:发表于2017-06-03 00:15 被阅读56次

    Master-Worker模式,Master线程接收任务分配给Worker线程并统计Worker线程执行结果,Worker线程真正处理任务。

    image.png

    Task定义

    
    package cn.thinglin.mw;
    
    public abstract class Task {
    
        private String taskId;
        
        public String getTaskId(){
            return this.taskId;
        }
        
    }
    
    

    Worker的实现

    
    package cn.thinglin.mw;
    
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;
    /**
     * Worker
     * @author ThingLin
     *
     * @param <T> 任务
     * @param <V> 结果
     */
    public abstract class Worker<T extends Task,V> implements Runnable {
    
        /* 任务队列 */
        protected ConcurrentLinkedQueue<T> tasks = new ConcurrentLinkedQueue<T>();
        
        /* 结果集 */
        protected ConcurrentHashMap<String,V> results = new ConcurrentHashMap<String,V>();
        
        @Override
        public void run() {
            while(true){
                T task = this.tasks.poll(); //取出任务
                if(null == task){ //没有任务可执行跳出循环
                    break;
                }
                V v = dispose(task);
                this.results.put(task.getTaskId(), v);
            }
        }
        
        public void setTasks(ConcurrentLinkedQueue<T> tasks){
            this.tasks = tasks;
        }
        
        public void setResults(ConcurrentHashMap<String,V> results){
            this.results = results;
        }
        
        /**
         * 处理任务
         * @param task
         * @return
         */
        abstract V dispose(T task);
        
    }
    
    
    

    Master的实现

    
    package cn.thinglin.mw;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;
    /**
     * Master
     * @author ThingLin
     *
     * @param <T> 任务
     * @param <V> 结果
     */
    public abstract class Master<T extends Task,V> {
    
        /* 任务队列 */
        protected ConcurrentLinkedQueue<T> tasks = new ConcurrentLinkedQueue<T>();
        
        /* Worker管理 */
        protected HashMap<Integer,Thread> workers = new HashMap<Integer,Thread>();
        
        /* 结果集 */
        protected ConcurrentHashMap<String,V> results = new ConcurrentHashMap<String,V>();
        
        public Master(Worker<T,V> worker,int workerCount){
            worker.setTasks(this.tasks);
            worker.setResults(this.results);
            for(int i=0;i<workerCount;i++){
                this.workers.put(i, new Thread(worker,String.format("%s,%d","worker",i)));
            }
        }
        
        public void submit(T task){
            tasks.add(task);
        }
        
        public void excute(){
            for(Map.Entry<Integer,Thread> item : workers.entrySet()){
                item.getValue().start();
            }
        }
        
        /**
         * 
         * @return 线程全部执行完成时任务执行完true
         */
        public boolean isComplete(){
            for(Map.Entry<Integer,Thread> item : workers.entrySet()){
                if(item.getValue().getState() != Thread.State.TERMINATED){
                    return false;
                }
            }
            return true;
        }
        
        /**
         * 结果集计算
         * @return
         */
        abstract V result();
        
    }
    
    
    

    测试:

    
    package cn.thinglin.mw;
    
    import java.util.Map;
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    /**
     * 测试
     * @author ThingLin
     *
     */
    public class Main {
    
        /* 线程数 */
        private static final int process = Runtime.getRuntime().availableProcessors(); 
    
        public static void main(String[] args) {
            
            System.out.println("线程数量:"+process);
            
            //创建Master
            Master<MyTask,Integer> master = new Master<MyTask,Integer>(new Worker<MyTask, Integer>() {
    
                @Override
                Integer dispose(MyTask task) {
                    try {
                        TimeUnit.MICROSECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return task.getData();
                }
            }, process) {
    
                @Override
                Integer result() {
                    int result = 0;
                    int i=0;
                    for(Map.Entry<String,Integer> item : this.results.entrySet()){
                        result += item.getValue();
                        i++;
                    }
                    System.out.println("有"+i+"个结果");
                    return result;
                }
            };
            
            //提交任务
            for(int i=0;i<1000;i++){
                master.submit(new MyTask(UUID.randomUUID().toString(),i));
            }
            
            //执行任务
            long beginTime = System.currentTimeMillis();
            master.excute();
            
            while(true){
                if(master.isComplete()){
                    System.out.println("耗时 :"+(System.currentTimeMillis() - beginTime));
                    System.out.println("结果:"+master.result());
                    break;
                }
            }
            
            
        }
        
    }
    
    
    /**
     * 任务
     */
    class MyTask extends Task{
        
        private String taskId;
        private int data;
        
        public MyTask(String taskId,int data){
            this.taskId = taskId;
            this.data = data;
        }
    
        public String getTaskId() {
            return taskId;
        }
    
        public void setTaskId(String taskId) {
            this.taskId = taskId;
        }
    
        public int getData() {
            return data;
        }
    
        public void setData(int data) {
            this.data = data;
        }
        
    }
    
    
    testmw.gif

    相关文章

      网友评论

        本文标题:多线程应用设计 Master-Worker模式

        本文链接:https://www.haomeiwen.com/subject/zpfpfxtx.html