顺序执行在大多数情况下都挺好的, 简单明了, 一个时间专心做一件事, 不容易出错.
但是在多核时代, 追求更高更快更强, 应对复杂的计算和逻辑处理, 并发是不二法门.
这方面的经典书籍有两本我很喜欢
1. Pattern-Oriented Software Architecture Volume 2: Patterns for Concurrent and Networked Objects Volume 2 Edition
by Douglas Schmidt (Author), Michael Stal (Author), Hans Rohnert (Author), Frank Buschmann (Author)

2. Java Concurrency in Practice 1st Edition
by Brian Goetz (Author), Tim Peierls (Author), Joshua Bloch (Author), Joseph Bowbeer (Author), David Holmes (Author), Doug Lea (Author)

所谓并发, 这里主要讲的是多线程编程, 当然多进程, 分布式多服务器也是并发编程的范畴, 这方面的东西以后再说
从 Java 语言的角度来看, Java 提供了基础的多线程库
Thread 早期是JVM中的一个分时调用的所谓绿色线程, 并不是真正的线程, 现在的版本多是一一对应于系统的 pthread , 线程的优先级从1到10, MAX_PRIORITY是最高优先级.
Java线程分为守护线程(thread.setDaemon(true))和非守护线程, 应该避免直接调用 thread 的 stop, suspend 和 resume , 这些方法并不可靠
Java 5 是一个分水岭, 它提供了一个非常好用的 concurrent 包, 有
- 线程执行服务 ExecutorService
- 线程安全容器 ConcurrentHashMap 等
- 阻塞队列 BlockingQueue
- 信号量 Semaphore
- 屏障 CyclicBarrier
- 倒数计量锁 CountDownLatch
- 相位器 Phaser
- 交换器 Exechanger
Java 7 提供了 Fork/Join 框架
Java 8 又是一个里程碑, 它提供了 并行流和 ComparableFuture
举例如下:
比如我们有一个图书馆系统,提供按照书名, 作者和出版日期查询的API。
现在我想查一本书, 它是本和 java 语言有关的书, 大约是2005年之后出的书,依稀记得作者是 "Brian Goetz", "Tim Peierls", "Joshua Bloch", "Joseph Bowbeer", "David Holmes" 或 "Doug Lea" 这几个人, 由于系统所提供 API 的限制,我可以按人名一个一个顺序查找,直到找到结果.
也可以并发查找,要求按照人名顺序为优先级取结果.
让我们看看具体实现和查询结果
package com.github.walterfan.example.concurrent;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Created by walter on 15/04/2017.
*/
public class FutureTest {
public static final String AUTHOR_NAME = "Joseph Bowbeer";
private AtomicInteger counter = new AtomicInteger(0);
private Logger logger = LoggerFactory.getLogger(FutureTest.class);
private ExecutorService pool;
private String[] authors = new String[] {
"Brian Goetz",
"Tim Peierls",
"Joshua Bloch",
"Joseph Bowbeer",
"David Holmes",
"Doug Lea" };
private Instant earliestDate = Instant.parse("2005-12-31T00:00:00.00Z");
public static class Book {
final String title;
final String author;
final String isbn;
final Instant publicationDate;
public Book(String title, String author, String isbn, Instant publicationDate) {
this.title = title;
this.author = author;
this.isbn = isbn;
this.publicationDate = publicationDate;
}
@Override
public String toString() {
return "Book{" +
"title='" + title + '\'' +
", author='" + author + '\'' +
", isbn='" + isbn + '\'' +
", publicationDate=" + publicationDate +
'}';
}
}
public Book queryBook(String title, String author, Instant earliestDate) {
int num = counter.incrementAndGet();
int ms = 80;
logger.info("{}. query book, times {} " , num, author);
if("Joshua Bloch".equals(author)) {
ms = 120;
}
Uninterruptibles.sleepUninterruptibly(ms, TimeUnit.MILLISECONDS);
if(Arrays.asList(new String[]{"Brian Goetz","Tim Peierls","Joshua Bloch"}).contains(author)) {
return null;
}
return new Book("Java Concurrency in Practice", author, String.valueOf(num), Instant.parse("2006-05-19T00:00:00.00Z"));
}
@BeforeClass
public void init() {
int corePoolSize = 4;
int maxPoolSize = 16;
long keepAliveTime = 5000;
int queueCapacity = 200;
pool = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity)
);
}
@AfterClass
public void clean() {
if(null == pool) {
return;
}
try {
logger.info("attempt to shutdown executor");
pool.shutdown();
pool.awaitTermination(2, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
logger.error("tasks interrupted");
}
finally {
if (!pool.isTerminated()) {
logger.error("cancel non-finished tasks");
}
pool.shutdownNow();
logger.info("shutdown finished");
}
}
public Optional<Book> querySequentially() {
for (String author : authors) {
Optional<Book> book = Optional.ofNullable(queryBook("java", author, earliestDate));
if (book.isPresent()) {
return book;
}
}
return Optional.empty();
}
public Optional<Book> queryConcurrently () {
List<Future<Book>> futureBooks = new ArrayList<>(authors.length);
for (String author : authors) {
futureBooks.add(pool.submit(() -> queryBook("java", author, earliestDate)));
}
Optional<Book> ret = null;
for (Future<Book> futureBook : futureBooks) {
try {
ret = Optional.ofNullable(futureBook.get(100, TimeUnit.MILLISECONDS));
} catch (TimeoutException |InterruptedException|ExecutionException e) {
continue;
}
if (ret.isPresent()) {
break;
}
}
return ret;
}
public Optional<Book> queryParallelly () {
List<Optional<Book>> books = Arrays.asList(authors).parallelStream()
.map(x -> queryBook("java", x, earliestDate))
.map(x -> Optional.ofNullable(x))
.collect(Collectors.toList());
//books.stream().filter(x -> x.isPresent()).forEach(x -> logger.info(x.get().toString()));
for(String author: authors) {
Optional<Book> opt = books.stream().filter(x -> x.isPresent()).map(x -> x.get())
.filter(x -> author.equals(x.author)).findFirst();
if(opt.isPresent())
return opt;
}
return Optional.empty();
}
@Test
public void testQuery() {
logger.info("-- querySequentially ---");
long durationSequentially = recordExecutionTime(() -> querySequentially());
logger.info("-- queryConcurrently ---");
long durationConcurrently = recordExecutionTime( () -> queryConcurrently());
logger.info("-- queryParallelly ---");
long durationParallelly = recordExecutionTime(() -> queryParallelly());
logger.info("duration: querySequentially={} > queryParallelly={} > queryConcurrently={},", durationSequentially, durationParallelly, durationConcurrently );
Assert.assertTrue(durationSequentially > durationConcurrently
&& durationSequentially > durationConcurrently
&& durationParallelly > durationConcurrently);
}
public long recordExecutionTime(Supplier<Optional<Book>> supplier) {
counter.set(0);
final Stopwatch stopwatch = Stopwatch.createStarted();
Optional<Book> book = supplier.get();
stopwatch.stop();
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
Assert.assertEquals(book.map(x -> x.author).orElse(null), AUTHOR_NAME);
return duration;
}
}
执行结果如下, 当然是顺序执行耗时最长, 并行流多等待了一会儿, 所以是第二种方法效率最高
duration: querySequentially=380 > queryParallelly=225 > queryConcurrently=137
线程池的线程数不宜过多,以免造成对于cpu和内存资源的竞争,频繁的上下文切换,也不宜过少,以免影响性能,白白闲置cpu资源,Brian Goetz 有个公式可以参考
线程数=CPU 个数 * CPU 利用率 * (等待时间/计算时间 + 1)
IO 密集型的应用,等待时间较多,线程数可以适当增加点
而计算密集型,耗费CPU 本来就多,线程数适可而止,不宜过多
比如4核CPU, CPU 利用率为0.5(50%), 大约80%的时间在等待API 的响应
4*0.5*(80/20+1)=10 个线程
网友评论