美文网首页
并发编程时,如何写复杂的任务编排?

并发编程时,如何写复杂的任务编排?

作者: 周群力 | 来源:发表于2020-05-04 10:53 被阅读0次

    java的线程池ExecutorService适合提交互相之间没有依赖的任务,如果任务之间有依赖,就不能简单的调用ExecutorService.submit()了,可能出现死锁、饥饿等情况。
    如果任务之间的依赖很简单,靠CompletionService+future.get()可以搞定;如果是分治任务,可以使用Fork/join;而如果任务之间的依赖关系很复杂(比如是一个依赖图,如图),该如何写任务编排呢?


    image.png

    一、方案

    A. 水平切分:Parallel Workers(每个线程跑全套任务)

    image.png
    每个阶段是一个模块(比如一个独立的类),模块和模块之间通过同步调用来通信,只不过同时有多个线程在跑整个链路。
    缺点是没有把IO密集阶段和CPU密集阶段进行时间重叠。
    分布式的web应用可以看成是这种模式,每个请求会通过同步调用访问所有模块(分布式服务),同时可能有多个请求并发进行。
    并发编程模型管这个叫Parallel Workers模式

    B.垂直切分:pipeline模式(异构生产者-消费者)

    多线程优化-pipeline模式

    image.png

    类似于CPU流水线,每个阶段有独立的线程(池),阶段与阶段之间靠queue通信,每个阶段调用BlockingQueue.take()等待新的任务来。
    当然,如果用CompletionService,每个阶段可以调用上一个阶段的completionService等待新任务,即调用completionService.take().get(),内部其实还是一个BlockingQueue,区别是每个模块依赖CompletionService还是依赖BlockingQueue

    个人理解go中的csp模型就是这种方案,至于csp原始理论上的方案是否如此,没有细究

    pros/cons:


    image.png

    C. Consumer as Producer(同构生产者-消费者)

    pipeline模式每个任务阶段要独立部署一个模块(并发编程时模块==类,分布式编程时模块==服务),如果任务阶段是动态提交的,没法改变部署结构,那么可以使用同构的节点实现生产-消费者,每次消费完一个节点(执行完一个任务阶段),通过拓扑排序找到下一个要执行的节点、提交到队列

    image.png

    这种方案在分布式爬虫中有用到

    D. Actor模式

    并发编程模型把Actor也算做流水线的一种

    E.函数式并行

    image.png

    二、例题

    2.1. building-h2o

    抽象的任务依赖图为:


    image.png

    可以用pipeline模式,A Consumer听A queue,B Consumer听B queue,reset Consumer听 result queue。但是这题不好这样写,这题A consumer消费并打印不能自己起个线程while(true)取blockingQueue并消费,这题A consumer得由oj自己调用,调一次打一次,傻屌……

    那么就简单一点,A Consumer取A信号量,B Consumer取B信号量,reset Consumer听 result queue

    
    class H2O {
     private Semaphore              hReady = new Semaphore(2);
        private Semaphore              oReady = new Semaphore(1);
        private BlockingQueue<Integer> q      = new ArrayBlockingQueue<Integer>(3);
    
        public H2O() {
            new Thread(() -> {
                while (true){
                    try {
                        for (int i = 0; i < 3; i++) {
                            Integer take = q.poll(5, TimeUnit.MILLISECONDS);
                            if (take == null) {
                                return;
                            }
                        }
                        hReady.release();
                        hReady.release();
                        oReady.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    
        public void hydrogen(Runnable releaseHydrogen) throws InterruptedException {
            hReady.acquire();
            // releaseHydrogen.run() outputs "H". Do not change or remove this line.
            releaseHydrogen.run();
            q.add(1);
        }
    
        public void oxygen(Runnable releaseOxygen) throws InterruptedException {
            oReady.acquire();
            // releaseOxygen.run() outputs "O". Do not change or remove this line.
            releaseOxygen.run();
            q.add(1);
        }
    }
    

    2.2. LEETCODE 1242. Web Crawler Multithreaded

    https://zhang0peter.com/2020/02/12/LeetCode-1242-Web-Crawler-Multithreaded/
    图的遍历问题,用多线程写的话就选择bfs,Consumer as Producer
    任务依赖图为:

    image.png
    crawler和collector通信可以用通用套路(加个queue做pipeline),也可以用其他同步工具,毕竟简单。
    代码:
    public class Solution {
    
        private static final int    THREAD_COUNT    = 4;
        private static final String EMPTY           = "";
        private static final long   TIMEOUT_TO_POLL = 50;
    
        private final BlockingQueue<String> taskQueue = new LinkedBlockingQueue(1000);
        //worker thread
        private       ExecutorService       es        = new ThreadPoolExecutor(THREAD_COUNT, THREAD_COUNT,
                1000, TimeUnit.SECONDS, new LinkedBlockingQueue<>(THREAD_COUNT)
        );
        //private final        Queue<String>         result       = new ConcurrentLinkedQueue<>();
        private final Map<String, String>   visited   = new ConcurrentHashMap<>();
        private       String                hostName;
    
        public Solution() {
        }
    
        public List<String> crawl(String startUrl, HtmlParser htmlParser) {
            this.hostName = extractHostName(startUrl);
            taskQueue.add(startUrl);
            visited.put(startUrl, EMPTY);
            CountDownLatch closedLatch = new CountDownLatch(THREAD_COUNT);
            //    start consumer
            for (int i = 0; i < THREAD_COUNT; i++) {
                es.submit(() -> {
                    while (true) {
                        try {
                            String url = taskQueue.poll(TIMEOUT_TO_POLL, TimeUnit.MILLISECONDS);
                            if (url == null) {
                                closedLatch.countDown();
                                return;
                            }
                            List<String> urls = htmlParser.getUrls(url);
                            if (urls == null || urls.isEmpty()) {
                                continue;
                            }
                            for (String s : urls) {
                                if (!visited.containsKey(s) && validHost(s)) {
                                    taskQueue.add(s);
                                    visited.put(s, EMPTY);
                                }
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            //wait
            try {
                closedLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //    convert result
            Set<String> keys = visited.keySet();
            return new ArrayList<>(keys);
        }
    
        private boolean validHost(String s) {
            return this.hostName.equals(extractHostName(s));
        }
    
        private String extractHostName(String startUrl) {
            int i = startUrl.indexOf('/', 7);
            return (i == -1) ? startUrl : startUrl.substring(0, i);
        }
    
        @Test
        public void test1() {
            HtmlParser parser = new HtmlParser();
            List<String> crawl = new Solution().crawl("http://news.yahoo.com/news/topics/", parser);
            System.out.println(crawl);
        }
    
        @Test
        public void test2() {
            HtmlParser parser = new HtmlParser();
            List<String> crawl = new Solution().crawl("http://news.google.com", parser);
            System.out.println(crawl);
        }
    }
    

    另一种方案是递归写DFS,有依赖顺序的任务不好直接往线程池里扔,要用ForkJoinPool:

     class Solution {
    
        private static final int          THREAD_COUNT    = 4;
        private static final String       EMPTY           = "";
        private static final long         TIMEOUT_TO_POLL = 50;
        private              ForkJoinPool pool            = new ForkJoinPool(THREAD_COUNT);
    
        private final Map<String, String> visited = new ConcurrentHashMap<>();
        private       String              hostName;
    
        public List<String> crawl(String startUrl, HtmlParser htmlParser) {
            this.hostName = extractHostName(startUrl);
            visited.put(startUrl, EMPTY);
            CrawlerTask task = new CrawlerTask(startUrl, htmlParser);
            pool.invoke(task);
            //    convert result
            Set<String> keys = visited.keySet();
            return new ArrayList<>(keys);
        }
    
        class CrawlerTask extends RecursiveAction {
    
            String     url;
            HtmlParser htmlParser;
    
            public CrawlerTask(String url, HtmlParser htmlParser) {
                this.url = url;
                this.htmlParser = htmlParser;
            }
    
            @Override
            protected void compute() {
                List<String> urls = htmlParser.getUrls(url);
                if (urls == null || urls.isEmpty()) {
                    return;
                }
                List<CrawlerTask> subs = new ArrayList<>();
                for (int i = 0; i < urls.size(); i++) {
                    String s = urls.get(i);
                    if (!visited.containsKey(s) && validHost(s)) {
                        visited.put(s, EMPTY);
                        CrawlerTask subTask = new CrawlerTask(s, htmlParser);
                        if (i < urls.size() - 1) {
                            subTask.fork();
                            subs.add(subTask);
                        } else {
                            //visit last url
                            subTask.compute();
                        }
                    }
                }
                for (CrawlerTask sub : subs) {
                    sub.join();
                }
            }
        }
    
        private boolean validHost(String s) {
            return this.hostName.equals(extractHostName(s));
        }
    
        private String extractHostName(String startUrl) {
            int i = startUrl.indexOf('/', 7);
            return (i == -1) ? startUrl : startUrl.substring(0, i);
        }
    
        @Test
        public void test1() {
            HtmlParser parser = new HtmlParser();
            List<String> crawl = new Solution().crawl("http://news.yahoo.com/news/topics/", parser);
            System.out.println(crawl);
        }
    
        @Test
        public void test2() {
            HtmlParser parser = new HtmlParser();
            List<String> crawl = new Solution().crawl("http://news.google.com", parser);
            System.out.println(crawl);
        }
    }
    

    相关文章

      网友评论

          本文标题:并发编程时,如何写复杂的任务编排?

          本文链接:https://www.haomeiwen.com/subject/ukkeghtx.html