第8章 - Java 多线程2
作者:vwFisher
时间:2019-09-04
GitHub代码:https://github.com/vwFisher/JavaBasicGuide
目录
- 3 锁的优化及注意事项
- 4. 并行模式与算法
- 5 Java8与并发
3 锁的优化及注意事项
3.1 提高"锁"性能的建议
3.1.1 减小锁持有时间
在锁竞争过程中,单个线程对锁的持有时间与系统性能有着直接的关系。如果线程持有锁的时间很长,对应地,多个线程的耗时也就很长。
减少锁持有时间,其实就是:只在必要的时候进行同步(实际上就是减少同步的代码执行量)
3.1.2 减小锁粒度
缩小锁定对象的范围,减少锁冲突的可能性。
经典例子就是 JDK1.7 的 ConcurrentHashMap 的 put() 和 size() 方法。
对于 Map 的 put() 方法,如果做到同步,最自然的想法就是将整个 put 方法同步(但是粒度太大),而 ConcurrentHashMap 的设计默认有16个段(segment),当调用 put(),先通过 hashcode 得到该数据要存放在哪一段,再对这一段进行同步加锁(减小了颗粒度)。换句话说,最好的情况是同时支持 16 个 put() 方法操作,提供性能
但是对于 size() 方法,就意味着要得到所有段的锁,才能统计总数。所以要结合实际业务来考虑锁的粒度。
3.1.3 锁分离
读写锁分离,在读多写少的场景下,使用读写锁可以提高性能
对于读写锁,其实本身就是一种锁分离。对于读和写的分离。那么对于其他场景,例如链表插入的场景,前后2端也可以使用2个锁,头/尾写锁
3.1.4 锁粗化
减少锁申请同步次数就是锁粗化,锁粗化与减少锁持有时间和减少锁粒度相反。根据实际场景来,如果一味的增加锁来提高性能,但是却提高了对锁的申请同步次数。要结合根据实际情况考虑
3.2 Java虚拟机对锁优化所做的努力
JDK 内部的 锁
优化策略
3.2.1 锁偏移
核心思想:如果一个线程获得了锁,那么锁就进入偏向模式。当这个线程再次申请锁时,无须再做任何同步操作。节省了锁的申请,提高性能
应用场景:对于几乎没有锁竞争的场合(连续多次都是同一个线程请求相同的锁)的场景。但对于锁竞争激烈的(每次几乎都是不同线程申请锁),优化效果不明显
使用:通过参数 -XX:+UseBiasedLocking
开启偏移锁
3.2.2 轻量级锁
如果偏向锁失败,虚拟机并不会立刻挂起,他会使用轻量级锁来进行优化:简单地将对象头部作为指针,指向持有锁的线程的堆栈内部。来判断一个线程是否持有对象锁。如果线程获得轻量级锁成功,则可以顺利进入临界区。否则加锁失败,代表其他线程获得了锁。那么当前线程的锁申请就会膨胀为重量级锁。
3.2.3 自旋锁
如果锁膨胀为重量级锁,虚拟机还是使用自旋锁优化:由于当前线程暂时无法获得锁,也不确定什么时候获得锁(可能很久,也可能在 CPU 几个周期后就能获得)。但是立刻对线程挂起可能会是一种得不偿失的做法。因此,虚拟机会让当前线程做几个空循环。如果还不能获得,才真实地将线程在操作系统层面挂起。
其实就是用一个空循环让一个线程等待,不像传统方法 wait()、sleep()、yield() 它们都放弃了CPU控制。目的是为了保留 CPU 缓存,在多核系统中,一个等待线程醒来的时候可能会在另一个内核运行,这样会重建缓存。为了避免重建缓存和减少等待重建的时间就可以使用它了
3.2.4 锁消除
核心思想:在 JIT 编译时,对运行上下文扫描,去除不可能存在共享资源竞争的锁,节省毫无意义的请求锁时间。
涉及技术:逃逸分析:分析观察一个变量是否会逃出一个作用域
使用:通过 -XX:DoEscapeAnalysis
打开逃逸分析,-XX:EliminateLocks
打开锁消除
3.3 ThreadLocal
这是线程的局部变量,只有在当前线程可以访问。线程安全
ThreadLocal 提供了一个线程局部的容器,我们可以通过使用 set 和 get 方法来使用。当然注意 set 的对象不是指向同一个地址(如果指向同一个地址,那么一样会出现线程不安全)
public void set(T value) // 设置局部变量
public T get() // 获取局部变量
基本使用:
ThreadLocal<SimpleDateFormat> tl = new ThreadLocal<>()
tl.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
SimpleDateFormat sdf = tl.get();
实现原理:
set():
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
先获取当前线程,然后获取 map(可以理解为 HashMap)。该 map 对象在 Thread 内部维护 ThreadLocalMap 的局部变量,key 是 ThreadLocal 当前对象,value 是我们自定义的 T 值。
get():
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T) e.value;
return result;
}
}
return setInitialValue();
}
先获取当前线程,获取 ThreadLocalMap,通过自身的 key 来获取 value
Thread的退出清理,exit():
private void exit() {
if (group != null) {
group.threadTerminated(this);
group = null;
}
/* Aggressively null out all reference fields: see bug 4006245 */
target = null;
/* Speed the release of some of these resources */
threadLocals = null;
inheritableThreadLocals = null;
inheritedAccessControlContext = null;
blocker = null;
uncaughtExceptionHandler = null;
}
默认线程退出,会对 ThreadLocal 进行清理,但是如果采用线程池,就不一定会清理了。
那我们可以手动设置其为 null,那么当 GC 回收时,他就会被回收,具体原理在于 ThreadLocalMap 的实现采用了弱引用。
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
当外部强引用被设置为 null 时,弱引用 key 也会变为 null,那么当 GC 回收时,它也就被回收了
3.4 无锁
对于并发控制而言,锁是一种悲观策略。当多个线程访问临界区资源,宁可让其他线程阻塞等待。
而无锁是一种乐观策略。无锁面对冲突使用 CAS(比较交换技术,Compare And Swap)来鉴别冲突。一旦检测到冲突,就重试当前操作直到没有冲突为止
3.4.1 CAS - 比较交换
算法过程:它包含三个参数 CAS(V, E, N)。V = 要更新的变量,E = 预期值,N = 新值。
- 仅当 V = E 时,才执行 V = N。并返回新的 V
- 那么同一时刻只有一个操作变量会生效。如果 V != E,说明有其他线程在操作。则操作失败,但允许在此尝试。
3.4.2 AtomicInteger - 无锁线程安全整数
它是线程安全的,对其进行修改等任何操作,都是通过 CAS 指令进行的。对于其他原子类,同理
常用方法:
public final int get() // 获取当前值
public final void set(int newValue) // 设置当前值
public final int getAndSet(int newValue) // 设置新值,返回旧值
public final boolean compareAndSet(int expect, int u) // 如果当前值为expect,则设置为u
public final int getAndIncrement() // 加1,返回旧值
public final int getAndDecrement() // 减1,返回旧值
public final int getAndAdd(int delta) // 加delta,返回旧值
public final int incrementAndGet() // 加1,返回新值
public final int decrementAndGet() // 减1,返回新值
public final int addAndGet(int delta) // 添加delta,返回新值
其他类还有 AtomicLong、AtomicBoolean 等。内部维护主要有2个字段
private volatile int value; // 实际取值
private static final long valueOffset; // value字段的再AtomicInteger对象中的偏移量
3.4.4 Unsafe类
看看 AtomicInteger 的 incremenetAndGet() 的实现
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
3个参数就是,给定的对象 o,对象内的偏移量(就是一个字段到对象头部的偏移量,通过这个偏移量可以快速定位字段),添加的值
这里还有个特殊变量 unsafe,它是 sun.misc.Unsafe 类型。大家应该知道指针是不安全的,这也是 Java 去掉指针的重要原因,如果指针指错了位置,或计算指针偏移量时出错,会导致覆盖别人的内存,系统崩溃。
而这里的 Unsafe 就是封装了一些类似指针的操作。我们来看看 Unsafe 类的方法
public native int getInt(Object o, long offset); // 获取给定对象偏移量上的int值
public native void putInt(Object o, long offset, int x) // 设置给定对象偏移量上的int值
public native long objectFieldOffset(Field f); // 获得字段在对象中的偏移量
// 设置给定对象的int值,使用 volatile 语义
public native void putIntVolatile(Object o, long offset, int x);
// 获得给定对象的int值,使用 volatile 语义
public native int getIntVolatile(Object o, long offset);
// 和 putIntVolatile() 一样,但是它要求被操作字段就是 volatile 类型的
public native void putOrderedInt(Object o, long offset, int x);
内部都是采用跟指针有关的 CAS 操作。虽然 Java 抛弃了指针,但是内部实现还有会使用指针技术
3.4.5 AtomicReference - 无锁的对象引用
与 AtomicInteger 类似,AtomicReference 对应普通的对象引用。但是对于 Atomic 的 CAS 操作。其实会存在一种情况,A 线程预期 i 值为 2,当 i 被修改为 1,再修改为 2,该值符合预期,但无法判断该值是否被修改过。这个影响得看实际的业务需求。
当然针对这种情况,可以使用 AtomicStampedReference 对象,这是带时间戳的 CAS,只有操作时间和预期值符合才进行修改。
3.4.6 AtomicIntegerArray - 无锁的整数熟组
当然也会有 AtomicLongArray、AtomicReferenceArray。本质是对 int[] 进行操作,使用 Unsafe 类通过 CAS 的方式控制 int[] 在多线程下的安全性,核心 API:
public final int get(int i) // 获取数组第 i 个元素
public final int length() // 数组的长度
public final int getAndSet(int i, int newValue) // 修改下标 i 的值,返回旧值
public final boolean compareAndSet(int i, int expect, int update) // 下标 i 值符合预期,就修改
public final int getAndIncrement(int i) // 下标 i 的值 +1,返回旧值
public final int getAndDecrement(int i) // 下标 i 的值 -1,返回旧值
public final int getAndAdd(int i, int delta) // 下标 i 的值 +dealta,返回旧值
3.4.7 AtomicIntegerFieldUpdater - 普通变量原子操作
同样还有AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
使用方法:
public static class Candidate {
int id;
volatile int score;
}
public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
注意事项:
- Updater 只能修改它可见范围的变量。因为 Updater 时用反射来使用变量的,所以变量不能用 private
- 为了确保变量被正确的读取,就必须用 volatile
- 由于 CAS 操作会通过对象实例中的偏移量直接进行赋值,因此,不支持 static 字段( Unsafe.objectFieldOffset() 不支持静态变量)
3.5 有关死锁的问题
通俗的说,死锁就是两个或多个线程,相互占用对方需要的资源,而都不进行释放,导致彼此之间相互等待对方资源,产生无限等待的现象。死锁一旦发生,没有外力介入,这种等待将永远存在。
当遇到这种问题,通常的表现就是相关的进程不再工作,且 CPU 占用率为 0。我们也可以使用JDK提供的工具
首先先用 jps 得到 java 进程的进程ID,接着使用 jstack 查看程序堆栈。如果发生死锁,会看到有类似以下输出
Found one Java-level deallock:
====================
"线程名1"
waiting to lock monitor 0x1234 which is held by "线程名2"
"线程名2"
waiting to lock monitor 0x1235 which is held by "线程名1"
且该两个线程的STATE都是BLOCKED。
避免死锁的方法,可以使用重入锁,通过中断或者限时等待可以有效规避死锁带来的问题。
4. 并行模式与算法
常见的并行模式设计方法的介绍
4.1 单例模式
单利模式在程序中的出现非常频繁。线程安全最佳实践的案例:
class SingleInnerClass {
private SingleInnerClass(){ }
public static SingleInnerClass getInstance() {
return SingleInnerClassHolder.instance;
}
static class SingleInnerClassHolder {
private static SingleInnerClass instance = new SingleInnerClass();
}
}
可以做到不采用 synchronized,又可做到懒加载,而且线程安全的单例
4.2 不变模式
当多线程对同一个对象进行操作时,为了保证对象数据的一致性和正确性,有必要对对象进行同步。而同步操作又对系统性能有相当的损耗。为了减少同步操作,可以使用不可改变的对象,依赖对象不变性,确保其在没有同步操作的多线程环境始终保持内部状态的一致性和正确性。这就是不变模式。
核心思想:一个对象一旦创建,则它的内部状态将永远不会发生改变。与只读属性有所区别。只读属性的对象:对象本身不被其他线程修改,但是对象的自己状态可以自行修改。
不变模式的主要使用场景条件:
- 对象创建后,内部状态和数据不再发生变化
- 对象需要被共享,被多线程频繁访问
在 Java 中,实现不变模式只需要注意以下4点:
- 去除 setter 方法,以及所有修改自身属性的方法
- 用 private final 标记,确保其不可修改,子类也不发重载修改它的行为
- 有一个可以创建完整的对象的构造函数
JDK 中,最为典型的不变模式就是 String,此外所有的元数据包装类都是不变模式实现的
4.3 生产者 - 消费者 模式
生产者 - 消费者是一个经典的多线程设计模式,为多线程的协作提供了良好的解决方案,角色如下表:
角色 | 作用 |
---|---|
生产者 | 用户提交请求,提取用户任务,并装入内存缓存区 |
消费者 | 在内存缓存区中提取并处理任务 |
内存缓存区 | 缓存生产者提交的任务或数据,供消费者使用 |
任务 | 生产者向内存缓存区提交的数据结构 |
Client | 使用生产者和消费者的客户端 |
内存缓存区,可以用BlockingQueue来实现,用于维护任务或数据队列
优点:生产者 - 消费者模式,可以很好的对2者解耦。就是异步模式,对生产者的任务延迟消费(消息队列)
(basic.multhread.producer_consumer.basic.TaskData)
public class TaskData {
private final int value;
public TaskData(int value) {
this.value = value;
}
public TaskData(String value) {
this.value = Integer.valueOf(value);
}
public int getValue() {
return value;
}
@Override
public String toString() {
return this.hashCode() + "[" + this.getClass().getName() + ",value:" + value + "]";
}
}
(basic.multhread.producer_consumer.basic.Producer)
public class Producer implements Runnable {
private volatile boolean isRunning = true;
/** 内存缓存区(队列) */
private BlockingQueue<TaskData> queue;
private static AtomicInteger count = new AtomicInteger();
private static final int SLEEP_TIME = 1000;
public Producer(BlockingQueue<TaskData> queue) {
this.queue = queue;
}
@Override
public void run() {
TaskData data = null;
System.out.println("Start Producer ID=" + Thread.currentThread().getId());
try {
while (isRunning) {
Thread.sleep(SLEEP_TIME);
data = new TaskData(count.incrementAndGet());
System.out.println("--> queue put into data:" + data);
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.err.println("==> error failed to put data:" + data);
}
}
System.out.println("Stop Producer ID=" + Thread.currentThread().getId());
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop() {
isRunning = false;
}
}
(basic.multhread.producer_consumer.basic.Consumer)
public class Consumer implements Runnable {
/** 内存缓存区(队列) */
private BlockingQueue<TaskData> queue;
private static final int SLEEP_TIME = 1000;
public Consumer(BlockingQueue<TaskData> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("Start Consumer id=" + Thread.currentThread().getId());
try {
while (true) {
TaskData data = queue.take();
if (null != data) {
int value = data.getValue();
System.out.println(MessageFormat.format("{0} * {1} = {2}", value, value, value * value));
Thread.sleep(SLEEP_TIME);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
System.out.println("Stop Consumer id=" + Thread.currentThread().getId());
}
}
}
(basic.multhread.producer_consumer.basic.Client)
public class Client {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<TaskData> queue = new LinkedBlockingQueue<>(10);
Producer producer1 = new Producer(queue);;
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
ExecutorService service = Executors.newCachedThreadPool();
// 生产者
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
// 消费者
service.execute(new Consumer(queue));
service.execute(new Consumer(queue));
service.execute(new Consumer(queue));
Thread.sleep(10 * 1000);
producer1.stop(); producer2.stop(); producer3.stop();
Thread.sleep(3000);
service.shutdown();
}
}
4.4 高性能的 生产者 - 消费者 模式 - 无锁实现
BlockingQueue 可以作为生产者和消费者的中间件。但是 BlockingQueue 并不是一个高性能的实现。
因为它完全使用锁和阻塞等待来实现线程间的同步。在高并发场合,它的性能并不是特别优越。与之比较 ConcurrentLinkedQueue 是一个高性能的队列,他采用无锁的 CAS 操作。
4.4.1 Disruptor框架 - 无锁的缓存框架
这是一个高效的无锁内存队列。它使用无锁的方式实现了一个环形队列。非常适合生产者 - 消费者模式。
对比一般队列,需要提供 头部head 和 尾部tail 两个指针,用于出队和入队。而环形队列只需要提供当前位置 cursor,利用这个指针可以操作出/入队。
但是唤醒队列的大小必须事先制定,不能动态扩容。且 Disruptor 要求我们必须定义数组大小为2的倍数。目的是为了可以通过:sequence & (queueSize - 1) 就能立即定位到实际的元素位置 index。这个要比取余(%)操作快得多。
4.4.2 提高消费者的响应时间:选择合适的策略
当有新数据在唤醒队列中产生时,消费者如何知道新数据的产生呢。Disruptor提供了几种策略
- BlockingWaitStrategy:默认策略。与 BlockingQueue 类似,都使用锁和条件(Condition)进行数据的监控和线程的唤醒。该策略最节省 CPU,但是在高并发性能下表现比较差
- SleepingWaitStrategy:对 CPU 使用率也非常保守。他会在循环中不断等待数据,进行自旋等待,如果不成功,则使用 Thread.yield() 让出CPU,并最终使用 LockSupport.parkNanos(1) 进行线程休眠,以确保不占用太多CPU资源。因此,这个策略:可能产生比较高的平均延时,适用于对延时要求不是特别高的场合(异步日志),对生产者线程影响最小
- YieldingWaitStrategy:适用于低延时场合。相当于消费者线程变身成为了一个内部执行 Thread.yield() 的死循环。因此建议,有多余消费者线程数量的逻辑 CPU 数量(指的是CPU的双核四线程的数量)
- BusySpinWaitStrategy:最疯狂的等待策略。他就是一个死循环。消费者会一致监控缓冲区的变化。因此会导致 CPU 占用率很高。只有在延迟非常苛刻的场景下考虑使用,注意:你的物理CPU数量要大于消费者线程数。如果在一个物理核上使用超线程技术模拟两个逻辑核,另外一个逻辑核会受到这种超密集计算的影响而不能正常工作。
4.4.3 CPU Cache的优化 - 解决伪共享问题
为了提高 CPU 的速度,CPU 有一个高速缓存 Cache。读写数据的最小单位为缓存行(Cache Line),它是从主存(memory)复制到缓存(Cache)的最小单位,一般为 32字节 到 128字节。
伪共享问题:假设 X 和 Y,两个变量在同一个缓存行。运行在 CPU1 上的线程更新了 X,那么 CPU2 上的缓存行就会失效,同一行的 Y 即使没有修改也会变成无效,导致 Cache 无法命中。同理 CPU2 修改 Y。如果 CPU 不能命中缓存,必然导致吞吐量急剧下降。
Disruptor 框架的解决方案:它的核心组件 Sequence 会被非常频繁的访问(每次入队,它都会被 加1),其基本结构如下:
ckass LhsPadding {
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding {
protected volatile long value;
}
class RhsPadding extends Value {
protected long p1, p2, p3, p4, p5, p6, p7;
}
public Sequence extends RhsPadding { }
虽然在 Sequence 主要使用只有 value,但是通过 RhsPadding、LhsPadding,在 value 前后安置了一些占位空间,来避免 value 的冲突。
此外,对于 Disruptor 的环形缓冲区 RingBuffer,内部是通过以下语句构造的:
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
所以实际上,数组大小是缓冲区实际大小,再加上两倍的BUFFER_PAD的大小。就等于在这个数组的头部和尾部两端各增加了 BUFFER_PAD 这个填充,使得整个数组被载入Cache时不会受到其他变量的影响而失效。
4.5 Future模式
Future 核心思想:异步调用,让被调用者立即返回,后台慢慢处理这个请求。调用者可以先处理其他任务,在真正需要数据的场合再去尝试获得需要的数据。
JDK 内部已经提供了一套完整的实现。如图所示:
Future类关系图.pngCallable:该接口代表一段可以调用并返回结果的代码,用于产生结果
Future:该接口表示异步任务,是还没有完成的任务给出的未来结果,用于获取结果
Future方法:
boolean cancel(boolean mayInterruptIfRunning); // 取消任务
boolean isCancelled(); // 是否已经取消
boolean isDone(); // 是否已经完成
V get() throws InterruptedException, ExecutionException; // 取得返回对象
// 取得返回对象,设置超时时间
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
FutureTask 实现了 RunnableFuture 接口,这个接口的构造如下:
public FutureTask(Callable<V> callable)
public FutureTask(Runnable runnable, V result)
实例:
public class FutureTaskDemo {
static class MyCallableTask implements Callable<String> {
private long waitTime;
public MyCallableTask(int timeInMillis) {
this.waitTime = timeInMillis;
}
@Override
public String call() throws Exception {
Thread.sleep(waitTime);
return Thread.currentThread().getName();
}
}
public static void main(String[] args) {
MyCallableTask callable1 = new MyCallableTask(1000); // 要执行的任务
MyCallableTask callable2 = new MyCallableTask(2000);
FutureTask<String> futureTask1 = new FutureTask<String>(callable1);// 将Callable写的任务封装到一个由执行者调度的FutureTask对象
FutureTask<String> futureTask2 = new FutureTask<String>(callable2);
ExecutorService executor = Executors.newFixedThreadPool(2); // 创建线程池并返回ExecutorService实例
executor.execute(futureTask1); // 执行任务
executor.execute(futureTask2);
System.out.println("--> 请求完毕");
System.out.println(futureTask1.isDone());
try {
System.out.println("数据:" + futureTask1.get(1, TimeUnit.MICROSECONDS));
} catch (TimeoutException e1) {
System.out.println("futureTask1 数据未处理完");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
while (true) {
try {
if (futureTask1.isDone() && futureTask2.isDone()) {// 两个任务都完成
System.out.println("Done");
executor.shutdown(); // 关闭线程池和服务
return;
}
if (!futureTask1.isDone()) { // 任务1没有完成,会等待,直到任务完成
System.out.println("FutureTask1 output=" + futureTask1.get());
}
System.out.println("Waiting for FutureTask2 to complete");
String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);
if (s != null) {
System.out.println("FutureTask2 output=" + s);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
}
}
}
}
4.6 并行算法思想 介绍
4.6.1 并行流水线
并发算法虽然可以发挥多核CPU的性能,但是如果前后相互依赖的程序就不能随意打断顺序了。比如有如下计算步骤:A=B+C;D=A*B;D=D/2
那么我们可以拆分成3个线程,P1负责A=B+C;P2负责D=A*B;P3负责D=D/2。每个线程维护一个消息队列,当P1处理完毕放入P2的队列,P2处理完毕后放入P3的队列
虽然P1、P2、P3不能再一开始就并行处理,但是可以像流水线一样将他们串起来,从而达到提高性能的目的
4.6.2 并行搜索
在串行对一个数组遍历,只要做一个循环就可以了。
而并行的模式,则可以采用N个线程,将数组拆分成N个数组,利用Future思想进行分组搜索,将结果收集在一起返回。
4.6.3 并行排序
大部分排序算法,串行执行都比较容易。但对于并行排序,复杂度就会提高很多,这里介绍一些平行排序算法
- 奇偶交换排序:分离数据相关性
- 希尔排序:改进的插入排序
5 Java8与并发
JDK8新特性提供一些对于并发游泳的东西
- 并行流对数据进行并行操作
- 新的锁StampedLock
- 增强的Future
- 原子类的增强
5.1 parallelStream - 并行流
对于集合,都新增了2个转化流的方法:
default Stream< E> stream() : 返回一个顺序流
default Stream< E> parallelStream() : 返回一个并行流
则paralletStream则是获取并行流的方式,可以对数据进行并行排序,当数据量大时,会比串行排序高出一个量级
5.2 CompletableFuture - 增强的Future
CompletableFuture - 实现了Future接口,并实现了CompletionStage接口。通过该接口可以在一个执行结果上进行多次流式处理。如:
stage.thenApply(x -> square(x)).thenAccept(x -> System.out.pring(x)).thenRun(() -> System.out.println())
与Future一样,可以作为函数调用的契约,但是在CompletableFuture可以手动设置完成状态。以及流式的操作方式,他的工厂方法有如下几个:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
其中supplyAsync()适用于需要返回数据的场景,runAsync()则适用于不需要返回数据的场景。而executor参数,默认使用ForkJoinPool.common线程池中执行,可以指定线程池
而它有几个手动设置完成的方法
public static <U> CompletableFuture<U> completedFuture(U value) // 适用于静态工厂方法的流式操作
public boolean complete(T value)
public boolean completeExceptionally(Throwable ex)
实例:
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
CompletableFutureDemo demo = new CompletableFutureDemo();
// demo.completeDemo();
demo.streamFutureDemo();
}
class AskThread implements Runnable {
CompletableFuture<Integer> futrue = null;
public AskThread(CompletableFuture<Integer> futrue) {
this.futrue = futrue;
}
@Override
public void run() {
int result = 0;
try {
result = futrue.get() * futrue.get();
} catch (Exception e) {
}
System.out.println(result);
}
}
public void completeDemo() throws InterruptedException {
System.out.println("\n--> completeDemo");
final CompletableFuture<Integer> futrue = new CompletableFuture<>();
// 会一直阻塞,因为跟没有它需要的数据
new Thread(new AskThread(futrue)).start();
Thread.sleep(1000);
// 手动完成,填入数据
futrue.complete(6);
}
static Integer calc(Integer para) {
try {
// 模拟一个长时间计算
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return para * para;
}
public void streamFutureDemo() throws InterruptedException, ExecutionException {
System.out.println("\n--> 流式 异步");
final CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> calc(50));
System.out.println(future.isDone());
System.out.println(future.get());
System.out.println(future.isDone());;
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> calc(50)).completedFuture(40);
System.out.println(future2.get());
}
}
5.2.1 CompletionStage
CompletionStage提供40个接口都是为了函数式编程准备的,上面的例子可以改造成
System.out.println("\n--> 流式 操作");
final CompletableFuture<Void> future2 = CompletableFuture
.supplyAsync(() -> calc(10))
.thenCompose((result) -> CompletableFuture.supplyAsync(() -> calc(result)))
.exceptionally(ex -> {
ex.printStackTrace();
return 0;
})
.thenApply((result) -> Integer.toString(result))
.thenAccept(System.out::println);
future2.get();
通过这个接口,可以对任务处理结果进行加工处理
- thenCompose:将计算结果,传入下一个Future,组合CompletableFuture
- exceptionally:添加异常处理
- thenApply:加工处理
- thenAccept:输出
5.2.2 组合多个CompletableFuture
也可以使用thenCombine进行组合
public void thenCombineDemo() throws InterruptedException, ExecutionException {
System.out.println("\n--> 流式 操作");
final CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> calc(50));
final CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> calc(20));
final CompletableFuture<Void> futureResult = future1
.thenCombine(future2, (i, j) -> (i + j))
.exceptionally(ex -> {
ex.printStackTrace();
return 0;
})
.thenApply((result) -> Integer.toString(result))
.thenAccept(System.out::println);
future2.get();
}
5.3 StampedLock - 读写锁的改进
对于ReadWriteLock,虽然分离了读写操作,但是读写还是会相互阻塞操作,使用的依然是悲观锁。而StampedLock则提供一种乐观的读策略。这种乐观锁非常类似无锁的操作,使得乐观锁完全不会阻塞写操作
public class StampedLockDemo {
private double x, y;
private final StampedLock stampedLock = new StampedLock();
/** 这是一个排它锁 */
void move(double deltaX, double deltaY) {
long stamp = stampedLock.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
stampedLock.unlockWrite(stamp);
}
}
/** 只读方法 */
double distanceFromOrigin() {
long stamp = stampedLock.tryOptimisticRead();
double currentX = x;
double currentY = y;
if (stampedLock.validate(stamp)) {
stamp = stampedLock.readLock();
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
对于写操作:
- 先用tryOptimisticRead()尝试一次乐观读,他会返回一个stamp(即这次锁获得的凭证)
- 然后获取x和y值,这时候使用validate(stamp)来验证stamp是否发生不一致
- 如果stamp发生不一致,这里可以使用CAS的写法,直到成功读取。当然如果不一致,可以使用readLock()获取悲观的读锁,进一步读取数据。这就跟读写锁差不多了。
网友评论