一、简述
Java8 的并行流(封装ForkJoin)就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。如果某一个线程队列执行完成,其他队列还在执行,这个时候执行完成的队列就是空闲状态。Java8 中将并行进行了优化,使用的是工作窃取模式(work-stealing),在一个队列的任务执行完成之后,它会去其他没有执行完成的任务队列里面窃取尾部的任务来执行。
Stream API 可以声明性地通过 parallel() 与 sequential() 在并行流与顺序流之间进行切换。
二、示例
import java.util.concurrent.RecursiveTask;
public class ForkJoinCaculate extends RecursiveTask<Long> {
private long start;
private long end;
public ForkJoinCaculate(long start, long end) {
this.start = start;
this.end = end;
}
private static final long THRESHOLD = 10000L;
@Override
protected Long compute() {
long length = end - start;
if (length < THRESHOLD) {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
long middle = (end + start) / 2; //中间位置,拆分成两个任务
ForkJoinCaculate left = new ForkJoinCaculate(start, middle);
left.fork(); //拆分子任务,同时压入线程队列
ForkJoinCaculate right = new ForkJoinCaculate(middle + 1, end);
right.fork();
return left.join() + right.join();
}
}
}
测试:
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class TestForkJoin {
public long max = 1000000000L;
//多线程fork Join 方式执行相加
@Test
public void test01() {
Instant start = Instant.now();
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinCaculate(0, max);
Long sum = pool.invoke(task);
System.out.println(sum);
Instant end = Instant.now();
System.out.println("耗时:" + Duration.between(start, end));
}
//单线程普通for循环
@Test
public void test02() {
Instant start = Instant.now();
long sum = 0L;
for (long i = 0; i <= max; i++) {
sum += i;
}
System.out.println(sum);
Instant end = Instant.now();
System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());
}
//Java8 并行流
@Test
public void test03() {
Instant start = Instant.now();
long sum = LongStream.rangeClosed(0, max)
.parallel()
.reduce(Long::sum).getAsLong();
System.out.println(sum);
Instant end = Instant.now();
System.out.println("执行耗时:" + Duration.between(start, end).toMillis());
}
}
网友评论