美文网首页
并发模型

并发模型

作者: FlySheep_ly | 来源:发表于2017-03-29 08:21 被阅读29次

    Master-Worker 模式

    Task.class

    public class Task {
    
        private int id;
    
        private String name;
    
        private int price;
    
        public int getId() {
            return id;
        }
    
        public void setId(int id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public int getPrice() {
            return price;
        }
    
        public void setPrice(int price) {
            this.price = price;
        }
    }
    

    Master.class

    public class Master {
    
        // 1. 应该有一个承载任务的集合
        private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<>();
    
        // 2. 使用 HashMap 去承载所有的 worker 对象
        private HashMap<String, Thread> works = new HashMap<>();
    
        // 3. 使用一个容器承载每一个 worker 并发执行任务的结果集
        private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<>();
    
        // 4. 构造方法
        public Master(Worker worker, int workerCount) {
            // 每一个 worker 对象都需要有 Master 的引用,workQueue 用于任务的领取,resultMap 用于任务的提交
            worker.setWorkQueue(this.workQueue);
            worker.setResultMap(this.resultMap);
            for (int i = 0; i < workerCount; i++) {
                // key 表示每一个 worker 的名字,value 表示线程执行对象
                works.put("子节点" + Integer.toString(i), new Thread(worker));
            }
        }
    
        // 5. 提交方法
        public void submit(Task task) {
            this.workQueue.add(task);
        }
    
        // 6. 需要有一个执行的方法(启动应用程序 让所有的 worker 工作)
        public void execute() {
            for (Map.Entry<String, Thread> me : works.entrySet()) {
                me.getValue().start();
            }
        }
    
    
        // 7. 判断线程是否执行完毕
        public boolean isComplete() {
            for (Map.Entry<String, Thread> me : works.entrySet()) {
                if (me.getValue().getState() != Thread.State.TERMINATED) {
                    return false;
                }
            }
            return true;
        }
    
        // 8. 返回结果集数据
        public Long getResult() {
            Long ret = 0L;
            for (Map.Entry<String, Object> me : resultMap.entrySet()) {
                Integer value = (Integer) me.getValue();
                ret += value;
            }
            return ret;
        }
    }
    

    Worker.class

    public class Worker implements Runnable {
        private ConcurrentLinkedQueue<Task> workQueue;
        private ConcurrentHashMap<String, Object> resultMap;
    
        public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
            this.workQueue = workQueue;
        }
    
        public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
            this.resultMap = resultMap;
        }
    
        @Override
        public void run() {
            while (true) {
                Task input = this.workQueue.poll();
                if (input == null) {
                    break;
                }
                // 真正的去做业务处理
                Object output = handle(input);
    
                this.resultMap.put(Integer.toString(input.getId()), output);
            }
        }
    
        public Object handle(Task input) {
            return null;
        }
    
    }
    

    MyWorker.class

    public class MyWorker extends Worker {
    
        public Object handle(Task input) {
            Object output = null;
            try {
                // 表示处理 task 任务的耗时,可能是做数据的加工,也可能是操作数据库...
                Thread.sleep(5000);
    
                output = input.getPrice();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return output;
        }
    }
    

    Main.class

    public class Main {
    
        public static void main(String[] args) {
            Master master = new Master(new MyWorker(), Runtime.getRuntime().availableProcessors());
            System.out.println("机器线程数量:" + Runtime.getRuntime().availableProcessors());
    
            Random random = new Random();
            for (int i = 1; i <= 100; i++) {
                Task task = new Task();
                task.setId(i);
                task.setName("任务" + i);
                task.setPrice(random.nextInt(1000));
                master.submit(task);
            }
            master.execute();
    
            long start = System.currentTimeMillis();
            while (true) {
                if (master.isComplete()) {
                    long end = System.currentTimeMillis() - start;
                    Long ret = master.getResult();
                    System.out.println("最终结果:" + ret + ",耗时:" + end);
                    break;
                }
            }
    
        }
    }
    

    并发模型(一)——Future模式

    并发模型(二)——Master-Worker模式

    相关文章

      网友评论

          本文标题:并发模型

          本文链接:https://www.haomeiwen.com/subject/erjsottx.html