java的线程池ExecutorService适合提交互相之间没有依赖的任务,如果任务之间有依赖,就不能简单的调用ExecutorService.submit()了,可能出现死锁、饥饿等情况。
如果任务之间的依赖很简单,靠CompletionService+future.get()可以搞定;如果是分治任务,可以使用Fork/join;而如果任务之间的依赖关系很复杂(比如是一个依赖图,如图),该如何写任务编排呢?
image.png
一、方案
A. 水平切分:Parallel Workers(每个线程跑全套任务)
image.png每个阶段是一个模块(比如一个独立的类),模块和模块之间通过同步调用来通信,只不过同时有多个线程在跑整个链路。
缺点是没有把IO密集阶段和CPU密集阶段进行时间重叠。
分布式的web应用可以看成是这种模式,每个请求会通过同步调用访问所有模块(分布式服务),同时可能有多个请求并发进行。
并发编程模型管这个叫Parallel Workers模式
B.垂直切分:pipeline模式(异构生产者-消费者)
image.png类似于CPU流水线,每个阶段有独立的线程(池),阶段与阶段之间靠queue通信,每个阶段调用BlockingQueue.take()等待新的任务来。
当然,如果用CompletionService,每个阶段可以调用上一个阶段的completionService等待新任务,即调用completionService.take().get(),内部其实还是一个BlockingQueue,区别是每个模块依赖CompletionService还是依赖BlockingQueue
个人理解go中的csp模型就是这种方案,至于csp原始理论上的方案是否如此,没有细究
pros/cons:
image.png
C. Consumer as Producer(同构生产者-消费者)
pipeline模式每个任务阶段要独立部署一个模块(并发编程时模块==类,分布式编程时模块==服务),如果任务阶段是动态提交的,没法改变部署结构,那么可以使用同构的节点实现生产-消费者,每次消费完一个节点(执行完一个任务阶段),通过拓扑排序找到下一个要执行的节点、提交到队列
image.png这种方案在分布式爬虫中有用到
D. Actor模式
并发编程模型把Actor也算做流水线的一种
E.函数式并行
image.png二、例题
2.1. building-h2o
抽象的任务依赖图为:
image.png
可以用pipeline模式,A Consumer听A queue,B Consumer听B queue,reset Consumer听 result queue。但是这题不好这样写,这题A consumer消费并打印不能自己起个线程while(true)取blockingQueue并消费,这题A consumer得由oj自己调用,调一次打一次,傻屌……
那么就简单一点,A Consumer取A信号量,B Consumer取B信号量,reset Consumer听 result queue
class H2O {
private Semaphore hReady = new Semaphore(2);
private Semaphore oReady = new Semaphore(1);
private BlockingQueue<Integer> q = new ArrayBlockingQueue<Integer>(3);
public H2O() {
new Thread(() -> {
while (true){
try {
for (int i = 0; i < 3; i++) {
Integer take = q.poll(5, TimeUnit.MILLISECONDS);
if (take == null) {
return;
}
}
hReady.release();
hReady.release();
oReady.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
public void hydrogen(Runnable releaseHydrogen) throws InterruptedException {
hReady.acquire();
// releaseHydrogen.run() outputs "H". Do not change or remove this line.
releaseHydrogen.run();
q.add(1);
}
public void oxygen(Runnable releaseOxygen) throws InterruptedException {
oReady.acquire();
// releaseOxygen.run() outputs "O". Do not change or remove this line.
releaseOxygen.run();
q.add(1);
}
}
2.2. LEETCODE 1242. Web Crawler Multithreaded
https://zhang0peter.com/2020/02/12/LeetCode-1242-Web-Crawler-Multithreaded/
图的遍历问题,用多线程写的话就选择bfs,Consumer as Producer
任务依赖图为:
crawler和collector通信可以用通用套路(加个queue做pipeline),也可以用其他同步工具,毕竟简单。
代码:
public class Solution {
private static final int THREAD_COUNT = 4;
private static final String EMPTY = "";
private static final long TIMEOUT_TO_POLL = 50;
private final BlockingQueue<String> taskQueue = new LinkedBlockingQueue(1000);
//worker thread
private ExecutorService es = new ThreadPoolExecutor(THREAD_COUNT, THREAD_COUNT,
1000, TimeUnit.SECONDS, new LinkedBlockingQueue<>(THREAD_COUNT)
);
//private final Queue<String> result = new ConcurrentLinkedQueue<>();
private final Map<String, String> visited = new ConcurrentHashMap<>();
private String hostName;
public Solution() {
}
public List<String> crawl(String startUrl, HtmlParser htmlParser) {
this.hostName = extractHostName(startUrl);
taskQueue.add(startUrl);
visited.put(startUrl, EMPTY);
CountDownLatch closedLatch = new CountDownLatch(THREAD_COUNT);
// start consumer
for (int i = 0; i < THREAD_COUNT; i++) {
es.submit(() -> {
while (true) {
try {
String url = taskQueue.poll(TIMEOUT_TO_POLL, TimeUnit.MILLISECONDS);
if (url == null) {
closedLatch.countDown();
return;
}
List<String> urls = htmlParser.getUrls(url);
if (urls == null || urls.isEmpty()) {
continue;
}
for (String s : urls) {
if (!visited.containsKey(s) && validHost(s)) {
taskQueue.add(s);
visited.put(s, EMPTY);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
//wait
try {
closedLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// convert result
Set<String> keys = visited.keySet();
return new ArrayList<>(keys);
}
private boolean validHost(String s) {
return this.hostName.equals(extractHostName(s));
}
private String extractHostName(String startUrl) {
int i = startUrl.indexOf('/', 7);
return (i == -1) ? startUrl : startUrl.substring(0, i);
}
@Test
public void test1() {
HtmlParser parser = new HtmlParser();
List<String> crawl = new Solution().crawl("http://news.yahoo.com/news/topics/", parser);
System.out.println(crawl);
}
@Test
public void test2() {
HtmlParser parser = new HtmlParser();
List<String> crawl = new Solution().crawl("http://news.google.com", parser);
System.out.println(crawl);
}
}
另一种方案是递归写DFS,有依赖顺序的任务不好直接往线程池里扔,要用ForkJoinPool:
class Solution {
private static final int THREAD_COUNT = 4;
private static final String EMPTY = "";
private static final long TIMEOUT_TO_POLL = 50;
private ForkJoinPool pool = new ForkJoinPool(THREAD_COUNT);
private final Map<String, String> visited = new ConcurrentHashMap<>();
private String hostName;
public List<String> crawl(String startUrl, HtmlParser htmlParser) {
this.hostName = extractHostName(startUrl);
visited.put(startUrl, EMPTY);
CrawlerTask task = new CrawlerTask(startUrl, htmlParser);
pool.invoke(task);
// convert result
Set<String> keys = visited.keySet();
return new ArrayList<>(keys);
}
class CrawlerTask extends RecursiveAction {
String url;
HtmlParser htmlParser;
public CrawlerTask(String url, HtmlParser htmlParser) {
this.url = url;
this.htmlParser = htmlParser;
}
@Override
protected void compute() {
List<String> urls = htmlParser.getUrls(url);
if (urls == null || urls.isEmpty()) {
return;
}
List<CrawlerTask> subs = new ArrayList<>();
for (int i = 0; i < urls.size(); i++) {
String s = urls.get(i);
if (!visited.containsKey(s) && validHost(s)) {
visited.put(s, EMPTY);
CrawlerTask subTask = new CrawlerTask(s, htmlParser);
if (i < urls.size() - 1) {
subTask.fork();
subs.add(subTask);
} else {
//visit last url
subTask.compute();
}
}
}
for (CrawlerTask sub : subs) {
sub.join();
}
}
}
private boolean validHost(String s) {
return this.hostName.equals(extractHostName(s));
}
private String extractHostName(String startUrl) {
int i = startUrl.indexOf('/', 7);
return (i == -1) ? startUrl : startUrl.substring(0, i);
}
@Test
public void test1() {
HtmlParser parser = new HtmlParser();
List<String> crawl = new Solution().crawl("http://news.yahoo.com/news/topics/", parser);
System.out.println(crawl);
}
@Test
public void test2() {
HtmlParser parser = new HtmlParser();
List<String> crawl = new Solution().crawl("http://news.google.com", parser);
System.out.println(crawl);
}
}
网友评论