本文是我自己在秋招复习时的读书笔记,整理的知识点,也是为了防止忘记,尊重劳动成果,转载注明出处哦!如果你也喜欢,那就点个小心心,文末赞赏一杯豆奶吧,嘻嘻。 让我们共同成长吧……
第1章 并发编程的挑战
并发编程的目的是让程序运行得更快,但是并不是启动更多的线程就能让程序最大限度地并发执行。并发编程会遇到许多挑战,例如:上下文切换问题、死锁问题、以及首受限于硬件和软件的资源限制问题。
1.1 上下文切换
进行上下文切换之前,会保存上一个任务的状态,以便下次切换回这个任务时,可以再加载到这个状态。任务从保存到再加载的过程就是一次上下文切换。
1、多线程一定快吗?
不一定。因为创建线程和上下文切换会占用一定的时间。
2、如何减少上下文切换
方法:无锁并发编程、CAS算法、使用最少线程、使用协程。
1.2 死锁
常见避免死锁方法:
1)避免一个线程同时获得多个锁;
2)避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源;
3)尝试使用定时锁,使用lock.tryLock(timeout)来代替使用内部锁
4)对于数据库锁,加锁和解锁必须在一个数据库连接里,否则会出现解锁失败情况。
1.3 资源限制的挑战
资源限制:指的是在进行并发编程时,程序的执行速度受限于计算机硬件资源或软件资源。
1.4 本章小结
第2章 Java并发机制的底层实现原理
Java代码在编译后变成字节码,字节码被类加载器加载到JVM中,JVM执行字节码,最终转换为汇编指令在CPU上执行,Java中所使用的并发机制依赖于JVM的实现和CPU的指令。
2.1 volatile的应用
在并发编程中synchronized和volatile都具有重要的作用,volatile是轻量级的synchronized,保证了共享变量的可见性。volatile使用得当,会比synchronized的使用和执行成本更低,因为volatile不会引起线程上下文切换和调度。
1、volatile的定义和实现原理
在Java语言规范中对volatile的定义如下:Java编程语言中允许线程访问共享变量,为了确保共享变量能被准确和一致的更新,线程应该确保通过排他锁来确保单独获取这个变量。Java提供了volatile,在某些情况下比锁更加方便。
与volatile实现相关的CPU术语:内存屏障(memory barriers)、缓冲行(cache line)、原子操作(atomic operations)、缓冲行填充(cache line fill)、缓冲命中(cache hit)、写命中(write hit)、写缺失(write misses the cache)
volatile如何保证可见性?被volatile修饰的共享变量进行写操作时会多出加了lock的汇编代码,lock指令在多核处理器下会依法2件事情:
1)将当前处理器缓冲行的数据写回到系统内存
2)这个写回内存操作会使在其他CPU里缓存了该内存地址的数据无效
在多处理器下,为了保证各个处理器的缓存一致性,就会实现缓存一致性协议,每个处理器通过嗅探总线上传播的数据来检查自己缓存是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就将当前处理器的缓存行设置为无效状态,当处理器对这个数据进行修改操作时,会重新从内存系统中把数据读到处理器缓存里。
volatile的两条实现原则:
1)Lock前缀指令会引起处理器缓存写回到内存。
2)一个处理器的缓存写回到内存会导致其他处理器的缓存无效
2、volatile的使用优化
JDK 7的并发包中新增一个队列集合类LinkedTransferQueue,它在使用volatile变量时,用一种追加字节的方式来优化队列的出队、入队的性能。LinkedTransferQueue里的PaddedAtomicRefernce内部类只做了1件事,就是讲volatile共享变量追加到64字节(一个对象引用占4字节、追加15个变量就是60字节,加上父类的value变量,一共64字节)。对于64位的处理器,追加64字节能提高并发编程的效率,因为64位处理器不支持部分填充缓冲行,这就意味着,如果队列的头结点和尾结点不足64字节,处理器会将它们读取到一个缓冲行中,再多处理器下的每个处理器都会缓存同样的头、尾结点,当一个处理器锁定缓冲行进行修改时,那么在缓存一致性机制的作用下,会导致其他处理器不能访问自己高速缓存中的尾结点,导致队列的出队入队效率低下。追加字节后,避免了头结点和尾结点在一个缓冲行中,可以使得头尾节点修改时不会相互锁定。
当缓存行不是64字节宽的处理器;共享变量不会频繁地写,就不需要追加到64字节。
2.2 synchronized的实现原理与应用
synchronized是重量级锁,在Java SE 1.6 对synchronized进行了各种优化,就没那么重了。Java SE 1.6中为了减少获得锁和释放锁带来的性能消耗问题而引入了偏向锁、轻量级锁等优化措施。
synchronized实现同步的基础:Java中的每一个对象都可以作为锁。具体表现形式:对于普通同步方法,锁是当前实例对象;对于静态同步方法,锁是当前类的Class对象;对于同步代码块,锁是synchronized括号里配置的对象。当一个线程试图访问同步代码块/同步方法时,必须先得到锁,退出或者抛出异常时必须释放锁。
synchronized在JVM的实现原理是:JVM基于进入和退出Monitor对象实现方法或者代码块同步。但是二者实现细节不一样。同步代码块是使用monitorenter和monitorexit指令实现,而同步方法时其他方式。但是同步方法也可以使用这两个指令实现。
2.2.1 Java对象头
synchronized用的锁是存在Java对象头里的。如果对象是数组类型,则虚拟机用3个字宽存储对象头,非数组类型使用2字宽存储对象头。1宽等于4字节。
Java对象头:Mark Word(存对象的hashCode、分代年龄、锁信息等)、Class Metadata Address、Array Length
2.2.2 锁的升级与对比
在JavaSE1.6中,锁的状态有:无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态。锁可以升级,但是不可以降级,目的是为了提高获得锁和释放锁的效率。
1 偏向锁
在大多的情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低,引入了偏向锁 。当一个线程访问同步块并获取锁时,会在对象头和栈帧的锁记录里边存储偏向锁的线程ID,以后该线程在进入和退出同步块是不需要进行CAS操作来加解锁,只需要测试一下对象头中的Mark Word中是否存储着指向该线程的偏向锁。若是测试成功,表示线程已经获得了锁;若是测试不成功,则需要在测试一下Mark Word中偏向锁的标志是否设置为1(表示当前已经处于偏向锁状态):若是没有设置,则使用CAS锁竞争机制来竞争锁;若是设置了,则尝试使用CAS将对象头的偏向锁指向当前线程。
(1)偏向锁的撤销
偏向锁使用一种等到竞争出现才释放锁的机制,所以当其它线程尝试竞争偏向锁时,持有偏向锁的线程才会释放。
偏向锁的撤销,需要等到全局安全点(在这个时间点上没有正在执行的字节码)。它首先会暂停拥有偏向锁的线程,然后检查持有偏向锁的线程是否活着,如果不处于活动状态,则将对象头设置成无锁状态;如果线程仍然活着,拥有偏向锁的栈会被执行,遍历偏向对象的锁记录,栈中的锁记录和对象头的Mark Word要么重新偏向于其他线程,要么恢复到无锁或者标记对象不适合作为偏向锁,最后唤醒暂停的线程。
(2)关闭偏向锁
Java 6和Java7默认是启动偏向锁的。是在应用程序启动后几秒钟才激活。可以使用 -XX:BiasedLockingStartupDelay=0设置立即启动。可以使用 -XX:-UsebiasedLocking-false关闭偏向锁,那么程序默认进入轻量级锁。
2 轻量级锁
(1)轻量级锁加锁
线程在执行同步块之前,JVM会先在当前线程的栈帧中创建用于存储锁记录的空间,并将对象头中的Mark Word复制到锁记录中,官方称为“Displaced Mark Word”。然后线程尝试使用CAS将对象头中的MarkWord替换为指向锁记录的指针。如果成功,当前线程获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋来获取锁。
(2)轻量级锁解锁
轻量级解锁时,会使用原子的CAS造作将Displaced Mark Word替换回到对象头,如果成功,则表示没有发生竞争。如果失败,表示当前锁存在竞争,锁会膨胀为重量级锁。
因为自旋会消耗CPU,为了避免无用的自旋(获得锁的线程被阻塞了),一旦锁升级成重量级锁,就不会再恢复到轻量级锁状态。当锁处于这个状态下时,其他线程试图获取锁是都会被阻塞,当持有锁的线程释放锁之后会唤醒这些线程。
3、锁的优缺点对比
2.3 原子操作的实现原理
原子操作也就是说这个操作是不可以在进行细分的,必须一次性全部执行完成,不可以执行一部分之后被中断去执行另一个操作。
1、相关术语
缓存行(cache line)、比较并交换(Compare and Swap)、CPU流水线(CPU pipeline)、内存顺序冲突
2、处理器是如何实现原子操作的
对于处理器而言,原子操作也就是说在同一时间只能有一个处理器对数据进行处理,而且这个操作是原子性的,不可分割的。常见的有两种实现方式:一种是通过总线锁来保证原子性,另一种是通过缓存锁来保证原子性。
总线锁:总线锁就是使用处理器提供的一个LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,这样发出信号的处理器就可以独占内存,保证操作的原子性。
缓存锁:缓存锁是为了优化总线锁而设计出来的。因为总线锁在被锁住期间,其他的处理器是无法处理其他的数据的,只能等待锁释放开。但是若是对缓存进行加锁就可以减少这个影响。他是指内存区域如果被缓存在处理器的缓存行中,并且咋Lock操作期间被锁定,那么当他执行锁操作写回到内存时,处理器直接修改内部的内存地址,并允许他的缓存一致性机制来保证操作的原子性,因为缓存一致性机制会阻止同时修改由两个以上处理器缓存的内存区域数据。
3、Java如何实现原子操作
在Java中采用了锁和循环CAS的方式来保证原子操作
(1) 循环CAS:JVM的CAS操作正式利用了处理器提供的CMPXCHG指令实现的。基本思路就是通过循环进行CAS操作直到成功为止。
(2)CAS实现原子操作的三大问题
ABA问题(JDK1.5使用AtomicStampedReference解决ABA问题)、循环时间长开销大、只能保证一个共享变量的原子操作
(3)锁机制:锁机制保证只有获得锁的线程才能操作锁定的内存区域,但是Java中的多个锁,除了偏向锁以外,JVM实现锁的方式都是循环CAS,即当一个线程想进入同步块时使用循环CAS获取所;当退出同步块时使用循环CAS释放锁。
2.4 本章小结
第3章 Java内存模型
本节分为4部分:
Java内存模型基础:介绍内存模型相关的基本概念
Java内存模型中的顺序一致性:介绍重排序与顺序一致性内存模型
同步原语:介绍synchronized、volatile、final的内存语义以及重排序规则在处理器中的实现
Java内存模型的设计:介绍Java内存模型的设计原理以及其与处理器内存模型和顺序一致性内存模型的关系。
3.1 Java内存模型的基础
3.1.1 并发编程模型的两个关键问题
先发编程中需要处理两个关键问题:线程之间如何通信;线程之间如何同步
通信:指线程之间以何种机制来交换信息。线程之间的通信机制:共享内存、消息传递
同步:指程序中用于控制不同线程间操作发生相对顺序的机制。
3.1.2 Java内存模型的抽象结构
Java线程间的通信由JMM控制,JMM决定一个线程对共享变量的写入何时对另一个线程可见。从抽象的角度来看,JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存(Main Memory)中,每个线程都有一个私有的本地内存(Local Memory),本地内存中存储了该线程以读/写共享变量的副本。本地内存是JMM的一个抽象概念,并不真实存在。它涵盖了缓存、写缓冲区、寄存器以及其他的硬件和编译器优化。
Java内存模型的抽象结构示意图如果线程A与线程B之间要通信的话,必须要经历下面2个步骤。
1)线程A把本地内存A中更新过的共享变量刷新到主内存中去。
2)线程B到主内存中去读取线程A之前已更新过的共享变量。
3.1.3 从源代码到指令序列的重排序
在执行程序时,为了提高性能,编译器和处理器常常会对指令做重排序。重排序分3种类型:
1)编译器优化的重排序。编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
2)指令级并行的重排序。现代处理器采用了指令级并行技术(Instruction-LevelParallelism,ILP)来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。
3)内存系统的重排序。由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。
从源代码到最终执行的指令序列示意图上图的1属于编译器重排序,2和3属于处理器重排序。这些重排序可能会导致多线程程序出现内存可见性问题。对于编译器,JMM的编译器重排序规则会禁止特定类型的编译器重排序(不是所有的编译器重排序都要禁止)。对于处理器重排序,JMM的处理器重排序规则会要求Java编译器在生成指令序列时,插入特定类型的内存屏障(Memory Barriers,Intel称之为Memory Fence)指令,通过内存屏障指令来禁止特定类型的处理器重排序。
JMM属于语言级的内存模型,它确保在不同的编译器和处理器上,通过禁止特定类型的重排序,为程序员提供一致的内存可见性保证。
3.1.4 并发编程模型的分类
为了保证内存可见性,Java编译器在生成指令序列的适当位置插入内存屏障指令来禁止特定类型的处理器重排序。JMM内存屏障有4类:
屏障类型 说明
LoadLoad Barriers 确保Load1数据的装载先于Load2及所有后序装置指令的装载
StoreStore Barriers 确保Store1数据对其他处理器可见(刷新到内存)先于Store2及所有后序存储指令的存储
LoadStore Barriers 确保Load1数据装载先于Store2及所有后序的存储指令刷新到内存
StoreLoad Barriers 确保Store1数据对其他处理器变得可见(指刷新到内存)先于Load2及所有后序装载指令的装载。StoreLoad Barriers
会使改屏障之前的所有内存访问指令(存储和装载指令)完成之后,才执行该屏障之后的内存访问指令
StoreLoad Barriers 具有其他3中屏障效果。
3.1.5 happens-before简介
JDK 5开始,Java使用新的JSR-133内存模型,该模型使用happens-before来阐述操作之间的内存可见性。在JMM中,一个操作执行的结果需要对另外一个操作可见,那么这两个操作之间必须存在happens-before关系。
1)程序顺序规则:一个线程中的每个操作,happens-before于该线程中的任意后续操作。
2)监视器锁规则:对一个锁的解锁,happens-before于随后对这个锁的加锁。
3)volatile变量规则:对一个volatile域的写,happens-before于任意后续对这个volatile域的读。
4)传递性:如果A happens-before B,且B happens-before C,那么A happens-before C。
5)start()规则:如果线程A执行操作ThreadB.start()(启动线程B),那么A线程的ThreadB.start()操作happens-before于线程B中的任意操作。
6)join()规则:如果线程A执行操作ThreadB.join()并成功返回,那么线程B中的任意操作happens-before于线程A从ThreadB.join()操作成功返回。
7)线程终止规则
8)对象终结规则
3.2 重排序
重排序:指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段。
3.2.1 数据依赖
如果两个操作访问同一个变量,且这两个操作中有一个为写操作,此时这两个操作之间就存在数据依赖性。数据依赖分下列三种类型:
数据依赖类型表上面三种情况,只要重排序两个操作的执行顺序,程序的执行结果将会被改变。编译器和处理器在重排序时,会遵守数据依赖性,编译器和处理器不会改变存在数据依赖关系的两个操作的执行顺序。
注意,这里所说的数据依赖性仅针对单个处理器中执行的指令序列和单个线程中执行的操作,不同处理器之间和不同线程之间的数据依赖性不被编译器和处理器考虑。
3.2.2 as-if-serial语义
as-if-serial语义的意思指:不管怎么重排序(编译器和处理器为了提高并行度),(单线程)程序的执行结果不能被改变。编译器,runtime 和处理器都必须遵守as-if-serial语义。
为了遵守as-if-serial语义,编译器和处理器不会对存在数据依赖关系的操作做重排序,因为这种重排序会改变执行结果。但是,如果操作之间不存在数据依赖关系,这些操作可能被编译器和处理器重排序。
3.2.3 程序顺序规则
3.2.4 重排序对多线程的影响
3.3 顺序一致性
3.3.1 数据竞争与顺序一致性
当程序未正确同步时,就会存在数据竞争。java内存模型规范对数据竞争的定义如下:在一个线程中写一个变量,在另一个线程读同一个变量,而且写和读没有通过同步来排序。
JMM对正确同步的多线程程序的内存一致性做了如下保证:如果程序是正确同步的,程序的执行将具有顺序一致性(sequentially consistent)–即程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同。这里的同步是指广义上的同步,包括对常用同步原语(synchronized,volatile和final)的正确使用。
3.3.2顺序一致性内存模型
顺序一致性内存模型有两大特性:一个线程中的所有操作必须按照程序的顺序来执行;(不管程序是否同步)所有线程都只能看到一个单一的操作执行顺序。在顺序一致性内存模型中,每个操作都必须原子执行且立刻对所有线程可见。
在概念上,顺序一致性模型有一个单一的全局内存,这个内存通过一个左右摆动的开关可以连接到任意一个线程。同时,每一个线程必须按程序的顺序来执行内存读/写操作。在任意时间点最多只能有一个线程可以连接到内存。当多个线程并发执行时,图中的开关装置能把所有线程的所有内存读/写操作串行化。
3.3.3同步程序的顺序一致性效果
在顺序一致性模型中,所有操作完全按程序的顺序串行执行。而在JMM中,临界区内的代码可以重排序(但JMM不允许临界区内的代码“逸出”到临界区之外,那样会破坏监视器的语义)。JMM会在退出监视器和进入监视器这两个关键时间点做一些特别处理,使得线程在这两个时间点具有与顺序一致性模型相同的内存视图(具体细节后文会说明)。虽然线程A在临界区内做了重排序,但由于监视器的互斥执行的特性,这里的线程B根本无法“观察”到线程A在临界区内的重排序。这种重排序既提高了执行效率,又没有改变程序的执行结果。从这里我们可以看到JMM在具体实现上的基本方针:在不改变(正确同步的)程序执行结果的前提下,尽可能的为编译器和处理器的优化打开方便之门。
3.3.4未同步程序的执行特性
JMM不保证未同步程序的执行结果与该程序在顺序一致性模型中的执行结果一致。因为未同步程序在顺序一致性模型中执行时,整体上是无序的,其执行结果无法预知。保证未同步程序在两个模型中的执行结果一致毫无意义。
JMM不保证对64位的long型和double型变量的读/写操作具有原子性,而顺序一致性模型保证对所有的内存读/写操作都具有原子性。
在计算机中,数据通过总线在处理器和内存之间传递。每次处理器和内存之间的数据传递都是通过一系列步骤来完成的,这一系列步骤称之为总线事务(bus transaction)。总线事务包括读事务(read transaction)和写事务(write transaction)。读事务从内存传送数据到处理器,写事务从处理器传送数据到内存,每个事务会读/写内存中一个或多个物理上连续的字。这里的关键是,总线会同步试图并发使用总线的事务。在一个处理器执行总线事务期间,总线会禁止其它所有的处理器和I/O设备执行内存的读/写。下面让我们通过一个示意图来说明总线的工作机制:
总线的这些工作机制可以把所有处理器对内存的访问以串行化的方式来执行;在任意时间点,最多只能有一个处理器能访问内存。这个特性确保了单个总线事务之中的内存读/写操作具有原子性。
在一些32位的处理器上,如果要求对64位数据的写操作具有原子性,会有比较大的开销。为了照顾这种处理器,java语言规范鼓励但不强求JVM对64位的long型变量和double型变量的写具有原子性。当JVM在这种处理器上运行时,会把一个64位long/ double型变量的写操作拆分为两个32位的写操作来执行。这两个32位的写操作可能会被分配到不同的总线事务中执行,此时对这个64位变量的写将不具有原子性。从JSR -133内存模型开始(即从JDK5开始),仅仅只允许把一个64位long/ double型变量的写操作拆分为两个32位的写操作来执行,任意的读操作在JSR -133中都必须具有原子性(即任意读操作必须要在单个读事务中执行)。
3.4 volatile的内存语义
3.4.1 volatile的特性
可见性。对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。
原子性:对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性。
3.4.2 volatile写-读建立的happens-before关系
从内存语义的角度来说,volatile写和锁的释放有相同的内存语义,相当于退出同步块;volatile读与锁的获取有相同的内存语义,相当于进入同步代码块。
3.4.3 volatile 写-读的内存语义
volatile写的内存语义如下:
当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量刷新到主内存。
volatile读的内存语义如下:
当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量。
下面对volatile写和volatile读的内存语义做个总结:
线程A写一个volatile变量,实质上是线程A向接下来将要读这个volatile变量的某个线程发出了(其对共享变量所在修改的)消息。
线程B读一个volatile变量,实质上是线程B接收了之前某个线程发出的(在写这个volatile变量之前对共享变量所做修改的)消息。
线程A写一个volatile变量,随后线程B读这个volatile变量,这个过程实质上是线程A通过主内存向线程B发送消息。
3.4.4 volatile内存语义的实现
前文我们提到过重排序分为编译器重排序和处理器重排序。为了实现volatile内存语义,JMM会分别限制这两种类型的重排序类型。下面是JMM针对编译器制定的volatile重排序规则表:
3.4.5 JSR-133为什么要增强volatile的内存语义
JSR-133专家组决定增强volatile的内存语义:严格限制编译器和处理器对volatile变量与普通变量的重排序,确保volatile的写-读和锁的释放-获取一样,具有相同的内存语义。从编译器重排序规则和处理器内存屏障插入策略来看,只要volatile变量与普通变量之间的重排序可能会破坏volatile的内存语意,这种重排序就会被编译器重排序规则和处理器内存屏障插入策略禁止。
由于volatile仅仅保证对单个volatile变量的读/写具有原子性,而锁的互斥执行的特性可以确保对整个临界区代码的执行具有原子性。在功能上,锁比volatile更强大;在可伸缩性和执行性能上,volatile更有优势。这里说下使用volatile的条件:
对变量的写不依赖于变量的当前值,访问变量是不需要加锁。
3.5 锁的内存语义
3.5.1 锁的释放-获取建立的happens-before关系
锁除了可以让临界区互斥执行外,还可让释放锁的线程向获取同一个锁的线程发送消息。
3.5.2 锁的释放-获取的内存语义
当线程获取锁时,JMM会把该线程对应的本地内存置为无效。
当线程释放锁时,JMM会把该线程对应的本地内存中的共享变量刷新到主内存中。
对比锁的释放-获取的内存语义与volatile写-读的内存语义可以看出:锁的释放与volatile的写具有相同的内存语义;锁的获取与volatile的读具有相同的内存语义。
下面对锁的释放和锁的获取的内存语义做个总结:
线程A释放一个锁,实质上是线程A向接下来将要获得这个锁的某个线程发出了(线程A对共享变量所在修改的)消息。
线程B获取一个锁,实质上是线程B接收了之前某个线程发出的(在释放这个锁之前对共享变量所做修改的)消息。
线程A释放锁,随后线程B获取这个锁,这个过程实质上是线程A通过主内存向线程B发送消息。
3.5.3 锁内存语义的实现
借助ReentrantLock源代码,来分析锁内存语义的集体实现机制。ReentrantLock
3.5.4 cncurrent包的实现
Java的CAS会使用现代处理器上提供的高效机器级别原子指令,这些原子指令以原子方式对内存执行读-改-写操作,这是在多处理器中实现同步的关键。同时,volatile变量的读/写和CAS可以实现线程之间的通信。把这些特性整合在一起,就形成了整个concurrent包得以实现的基石。
如果我们仔细分析concurrent包的源代码实现,会发现一个通用化的实现模式:
首先,声明共享变量为volatile;
然后,使用CAS的原子条件更新来实现线程之间的同步;
同时,配合以volatile的读/写和CAS所具有的volatile读和写的内存语义来实现线程之间的通信。
AQS,非阻塞数据结构和原子变量类(java.util.concurrent.atomic包中的类),这些concurrent包中的基础类都是使用这种模式来实现的,而concurrent包中的高层类又是依赖于这些基础类来实现的。从整体来看,concurrent包的实现示意图如下:
3.6 final域的内存语义
3.6.1 final域的重排序规则
对于final域,编译器和处理器要遵守两个重排序规则:
1)在构造函数内对一个final域的写入,与随后把这个被构造对象的引用赋值给一个引用变量,这两个操作之间不能重排序。
2) 初次读一个包含final域的对象的引用,与随后初次读这个final域,这两个操作之间不能重排序。
3.6.2 写final域的重排序规则
写final域的重排序规则禁止把final域的写重排序到构造函数之外。这个规则的实现包含下面2个方面:
1)JMM禁止编译器把final域的写重排序到构造函数之外。
2)编译器会在final域的写之后,构造函数返回之前,插入一个StoreStore屏障。这个屏障禁止处理器把final域的写重排序到构造函数之外。
写final域的重排序规则可以确保:在对象引用为任意线程可见之前,对象的final域已经被正确初始化过了,而普通域不具有这个保障。
3.6.3 读final域的重排序规则
读final域的重排序规则如下:
在一个线程中,初次读对象引用与初次读该对象包含的final域,JMM禁止处理器重排序这两个操作(注意,这个规则仅仅针对处理器)。编译器会在读final域操作的前面插入一个LoadLoad屏障。
读final域的重排序规则可以确保:在读一个对象的final域之前,一定会先读包含这个final域的对象的引用。
3.6.4 final域是引用类型
对于引用类型,写final域的重排序规则对编译器和处理器增加了如下约束:在构造函数内对一个final引用的对象的成员域的写入,与随后在构造函数外把这个被构造对象的引用赋值给一个引用变量,这两个操作之间不能重排序。
3.6.5 为什么final引用不能从构造函数内“溢出”
前面我们提到过,写final域的重排序规则可以确保:在引用变量为任意线程可见之前,该引用变量指向的对象的final域已经在构造函数中被正确初始化过了。其实要得到这个效果,还需要一个保证:在构造函数内部,不能让这个被构造对象的引用为其他线程可见,也就是对象引用不能在构造函数中“逸出”。
3.6.6 final语义在处理器中的实现
现在我们以x86处理器为例,说明final语义在处理器中的具体实现。
上面我们提到,写final域的重排序规则会要求译编器在final域的写之后,构造函数return之前,插入一个StoreStore障屏。读final域的重排序规则要求编译器在读final域的操作前面插入一个LoadLoad屏障。
由于x86处理器不会对写-写操作做重排序,所以在x86处理器中,写final域需要的StoreStore障屏会被省略掉。同样,由于x86处理器不会对存在间接依赖关系的操作做重排序,所以在x86处理器中,读final域需要的LoadLoad屏障也会被省略掉。也就是说在x86处理器中,final域的读/写不会插入任何内存屏障!
3.7 happens-before
3.7.1 JMM的设计
为了一方面要为程序员提供足够强的内存可见性保证;另一方面对编译器和处理器的限制要尽可能的放松,设计JMM时需要进行平衡。JMM把happens- before要求禁止的重排序分为了下面两类:
1)会改变程序执行结果的重排序。
2)不会改变程序执行结果的重排序。
JMM对这两种不同性质的重排序,采取了不同的策略:
1)对于会改变程序执行结果的重排序,JMM要求编译器和处理器必须禁止这种重排序。
2)对于不会改变程序执行结果的重排序,JMM对编译器和处理器不作要求(JMM允许这种重排序)。
下面是JMM的设计示意图:
JMM的设计示意图从上图可以看出两点:
JMM向程序员提供的happens- before规则能满足程序员的需求。JMM的happens- before规则不但简单易懂,而且也向程序员提供了足够强的内存可见性保证(有些内存可见性保证其实并不一定真实存在,比如上面的A happens- before B)。
JMM对编译器和处理器的束缚已经尽可能的少。从上面的分析我们可以看出,JMM其实是在遵循一个基本原则:只要不改变程序的执行结果(指的是单线程程序和正确同步的多线程程序),编译器和处理器怎么优化都行。比如,如果编译器经过细致的分析后,认定一个锁只会被单个线程访问,那么这个锁可以被消除。再比如,如果编译器经过细致的分析后,认定一个volatile变量仅仅只会被单个线程访问,那么编译器可以把这个volatile变量当作一个普通变量来对待。这些优化既不会改变程序的执行结果,又能提高程序的执行效率。
3.7.2 happens-before的定义
happens-before的定义:
1)如果一个操作 happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。
2)两个操作之间存在 happens-before关系,并不意味着Java平台的具体实现必须要按照 happens-before关系指定的顺序执行。只要重排序的执行结果与按照 happens-before执行的一致,是可以的。
3.7.3 happens-before 规则
8个规则,不再重复
3.8 双重检查锁定与优化
在Java多线程程序中,有时候需要延迟初始化来降低初始化类或创建对象的开销。双重检查锁定是常见的延迟初始化的技术,但它是一个错误的用法。
错误根源:例如使用双重检查锁定实现单例模式,当第一次判断实例不为null时,很可能instance引用的对象还没初始化完成。因为instance=new Singleton()创建一个对象,可以分解为如下3行伪代码:
1、 memory=allocate(); //分配对象的内存空间
2、 ctorInstance(memory); //初始化对象
3、 instance=memory;//设置instance指向刚刚分配的内存地址
上述伪代码2 、3 之间很可能重排序。
解决办法:基于volatile的解决方案;基于静态内部类初始化解决方案
3.9 Java内存模型综述
3.9.1 处理器内存模型
顺序一致性内存模型是一个理论参考模型,JMM和处理器内存模型在设计时通常会把顺序一致性内存模型作为参照。JMM和处理器内存模型在设计时会对顺序一致性模型做一些放松,因为如果完全按照顺序一致性模型来实现处理器和JMM,那么很多的处理器和编译器优化都要被禁止,这对执行性能将会有很大的影响。
由于常见的处理器内存模型比JMM要弱,java编译器在生成字节码时,会在执行指令序列的适当位置插入内存屏障来限制处理器的重排序。JMM屏蔽了不同处理器内存模型的差异,它在不同的处理器平台之上为java程序员呈现了一个一致的内存模型。
3.9.2 JMM的设计
3.9.3JMM的内存可见性保证
Java程序的内存可见性保证按程序类型可以分为下列三类:
单线程程序。单线程程序不会出现内存可见性问题。编译器,runtime和处理器会共同确保单线程程序的执行结果与该程序在顺序一致性模型中的执行结果相同。
正确同步的多线程程序。正确同步的多线程程序的执行将具有顺序一致性(程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同)。这是JMM关注的重点,JMM通过限制编译器和处理器的重排序来为程序员提供内存可见性保证。
未同步/未正确同步的多线程程序。JMM为它们提供了最小安全性保障:线程执行时读取到的值,要么是之前某个线程写入的值,要么是默认值(0,null,false)。
只要多线程程序是正确同步的,JMM保证该程序在任意的处理器平台上的执行结果,与该程序在顺序一致性内存模型中的执行结果一致。
3.9.4 JSR-133对旧内存模型的修补
JSR-133对JDK5之前的旧内存模型的修补主要有两个:
增强volatile的内存语义。旧内存模型允许volatile变量与普通变量重排序。JSR-133严格限制volatile变量与普通变量的重排序,使volatile的写-读和锁的释放-获取具有相同的内存语义。
增强final的内存语义。在旧内存模型中,多次读取同一个final变量的值可能会不相同。为此,JSR-133为final增加了两个重排序规则。现在,final具有了初始化安全性。
第4章 Java并发编程基础
4.1 线程简介
4.1.1 什么是线程
线程是轻量级进程,一个进程可以创建多个线程,各个线程拥有各自的计数器、堆栈和局部变量等属性。
4.1.2 为什么要使用多线程
原因有:更多的处理器核心;更快的响应速度;更好的编程模型
4.1.3 线程优先级
在java中,优先级范围:1~10,可以通过setPriority(int)修改优先级,默认是5.优先级高的线程分配时间片的数量多。
4.1.4 线程状态
Java线程的状态:NEW、RUNNABLE、BLOCKED、WAITING、TIME_WAITING、TERMINATED
4.1.5 Daemon线程
可以通过Thread.setDaemon(true)设置。
4.2 启动和终止线程
4.2.1 构造线程
继承Thread类,实现Runnable接口、Callable接口
4.2.2 启动线程
调用start()方法
4.2.3 理解中断
线程可以通过调用其它线程的interrupt()对其他线程进行终端操作。
线程可以通过isInterrupted()判断是否被中断
也可以通过静态方法Thread.interrupted()对当前线程的中断标识位进行复位。
4.2.4 过期的suspend()、resume()和stop()
4.2.5 安全地终止线程
4.3 线程间通信
4.3.1 volatile和synchronized关键字
线程开始运行,拥有自己的栈空间,Java支持多个线程同时访问一个对象或者对象的成员变量,由于每个线程可以拥有这个变量的拷贝(虽然对象以及成员变量分配的内存是在共享内存中的,但是每个执行的线程还是可以拥有一份拷贝,这样做的目的是加速程序的执行,这是现代多核处理器的一个显著特性),所以程序在执行过程中,一个线程看到的变量并不一定是最新的。
关键字volatile可以用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性。
关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。
同步代码就不在贴出来,对于同步块的实现使用了monitorenter和monitorexit指令,而同步方法则是依靠方法修饰符上的ACC_SYNCHRONIZED来完成的。无论采用哪种方式,其本质是对一个对象的监视器(monitor)进行获取,而这个获取过程是排他的,也就是同一时刻只能有一个线程获取到由synchronized所保护对象的监视器。
任意一个对象都拥有自己的监视器,当这个对象由同步块或者这个对象的同步方法调用时,执行方法的线程必须先获取到该对象的监视器才能进入同步块或者同步方法,而没有获取到监视器(执行该方法)的线程将会被阻塞在同步块和同步方法的入口处,进入BLOCKED状态。
从图4-2中可以看到,任意线程对Object(Object由synchronized保护)的访问,首先要获得Object的监视器。如果获取失败,线程进入同步队列,线程状态变为BLOCKED。当访问Object的前驱(获得了锁的线程)释放了锁,则该释放操作唤醒阻塞在同步队列中的线程,使其重新尝试对监视器的获取。
4.3.2等待/通知机制
这个机制背景就是为了解耦生产者、消费者的问题,简单的办法是使用轮询,但是轮询缺点是及时性、性能不能保证,所以采用通知机制避免轮询带来的性能损失。
等待/通知的相关方法是任意Java对象都具备的,因为这些方法被定义在所有对象的超类java.lang.Object上,方法和描述如表4-2所示。
等待/通知机制,是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。上述两个线程通过对象O来完成交互,而对象上的wait()和notify/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。
4.3.3等待/通知的经典范式
从上节的示例中可以提取经典范式,分为等待方(消费者)和通知方(生产者)
等待方:
1)获取对象锁;
2)如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。
3)条件满足则执行对应的逻辑。
对应的伪代码如下:
通知方:
1)获得对象的锁;
2)改变条件;
3)通知所有等待在对象上的线程。
对应的伪代码如下:
4.3.4 管道的输入、输出流
管道的输入、输出流主要用于线程间的数据传输,传输的媒介为内存。
这块只是淡出提了下,属于nio的范畴,需单独整理,例子就不贴了。
4.3.5 Thread.join的使用
如果一个线程A执行了thread.join()语句,其含义是:当前线程A等待thread线程终止之后才从thread.join()返回。线程Thread除了提供join()方法之外,还提供了join(long millis)和join(longmillis,int nanos)两个具备超时特性的方法。这两个超时方法表示,如果线程thread在给定的超时时间里没有终止,那么将会从该超时方法中返回。
4.3.6 ThreadLocal的使用
作者只是举例演示使用方式,这里注意使用场景,具体参见这篇文章:ThreadLock.
4.4线程应用实例
这里不细写了,作者分别介绍了数据库连接池示例、线程池技术、基于线程池的简单web服务器。可以参照原书去理解。
第5章 Java中的锁
5.1 Lock接口
在Lock出现之前,Java程序只能靠synchronized实现锁的功能,在JavaSE5之后,有了Lock接口。虽然缺少了synchronized的隐式获取和释放锁的方便,但是拥有了锁获取与释放的可操作性、可中断的获取所以及超时获取锁等synchronized不具备的同步特性。
Lock接口提供的synchronized关键字不具备的主要特性:
Lock是一个接口,它定义了锁获取与释放的基本操作:
void lock() 、void lockInterruptibly() throws InterruptedException、boolean tryLock()
boolean tryLock(long time,TimeUnit unit)throws InterruptedException、void unlock()
Condition newCondition()
5.2 队列同步器(AQS)
队列同步器AbstractQueuedSynchronizer,是用来构建锁或其他同步组件的基础框架,它使用一个int类型的成员变量表示同步状态,通过内置的FIFO队列完成资源获取线程的排队工作。
同步器的主要使用方式是继承,子类继承同步同步器并实现它的抽象方法来管理同步状态,可以通过同步器提供的如下3个方法进行访问和修改同步器状态:
getState():获取当前同步状态
setState(int newState):设置当前同步状态
compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。
以上3种方法能够保证状态改变的线程安全,同步器支持独占式和共享式获取同步状态。
5.2.1 队列同步器的接口与示例
同步器的设计是基于模板方法模式的,也就是说使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,而这些模板方法将会调用使用者重写的方法。
重写同步器指定方法时,使用以下3种方法访问和修改同步状态:
1、getState():获取当前同步状态
2、setState(int newState):设置当前同步状态
3、compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。
同步器可以重写的方法:
同步器可以重写的方法实现自定义同步组件时,将会调用同步器提供的模板方法,同步器提供的模板方法如下:
同步器提供的模板方法5.2.2 队列同步器的实现分析
接下来从实现角度分析同步器是如何完成线程同步的,主要包括:同步队列、独占式同步状态获取与释放、共享式同步状态获取与释放以及超时获取同步状态等同步器核心数据结构与模板方法。
1、同步队列
同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点。节点的属性类型与名称以及描述如表所示:
节点的属性类型与名称以及描述节点是构成同步队列(等待队列,在5.6节中将会介绍)的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部,同步队列的基本结构如图所示。
同步队列的基本结构在图中,同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Nodeupdate),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。
同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可。
2、独占式同步状态获取与释放
通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出,该方法代码:
同步器的acquire方法上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
下面分析一下相关工作,首先是节点的构造以及加入同步队列:
同步器的addWaiter和enq方法上述代码通过使用compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全添加。在enq(final Node node)方法中,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。可以看出,enq(final Node node)方法将并发添加节点的请求通过CAS变得“串行化”了。
节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程):
同步器的acquireQueued方法在acquireQueued(final Node node,int arg)方法中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态,这是为什么?原因有两个:
第一,头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。
第二,维护同步队列的FIFO原则。该方法中,节点自旋获取同步状态的行为如图
节点自旋获取同步状态由于非首节点线程前驱节点出队或者被中断而从等待状态返回,随后检查自己的前驱是否是头节点:如果是,则尝试获取同步状态。可以看到节点和节点之间在循环检查的过程中基本不相互通信,而是简单地判断自己的前驱是否为头节点,这样就使得节点的释放规则符合FIFO,并且也便于对过早通知的处理(过早通知是指前驱节点不是头节点的线程由于中断而被唤醒)。
独占式同步状态获取流程,也就是acquire(int arg)方法调用流程:
独占式同步状态获取流程前驱节点为头节点且能够获取同步状态的判断条件和线程进入等待状态是获取同步状态的自旋过程。当同步状态获取成功之后,当前线程从acquire(int arg)方法返回,如果对于锁这种并发组件而言,代表着当前线程获取了锁。
当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)
同步器的release方法该方法执行时,会唤醒头节点的后继节点线程,unparkSuccessor(Node node)方法使用LockSupport(在后面的章节会专门介绍)来唤醒处于等待状态的线程。
分析了独占式同步状态获取和释放过程后,适当做个总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。
3、共享式同步状态获取与释放
通过调用同步器的acquireShared(int arg)方法可以获取同步状态
在acquireShared(int arg)中,同步器代用了tryAcquireShared(int arg)尝试获取同步状态,当其返回值大于等于0时,表示能够获取同步状态。因此,在共享获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是tryAcquireShared(int arg)的返回值大于等于0.在doAcquireShared(int arg)的自旋过程中,如果当前节点的前驱为头结点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并且从自旋过程中退出。
与独占式一样。共享式获取也需要释放同步状态,通过调用releaseShared(int arg)可以释放同步状态。
releaseShared(int arg)释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线程同时访问的并发组件(如Semaphore),它和独占式的主要区别在于tryReleaseShared(int arg)方法必须确保同步状态线程安全释放,一般通过循环和CAS来保证的,因为释放同步状态的操作会同时来自多个线程。
4.独占式超时获取同步状态
通过调用同步器的doAcquireNanos(int arg,long nanosTimeout)方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则,返回false。该方法提供了传统Java同步操作(比如synchronized关键字)所不具备的特性。
在分析该方法的实现前,先介绍一下响应中断的同步状态获取过程。在Java 5之前,当一个线程获取不到锁而被阻塞在synchronized之外时,对该线程进行中断操作,此时该线程的中断标志位会被修改,但线程依旧会阻塞在synchronized上,等待着获取锁。在Java 5中,同步器提供了acquireInterruptibly(int arg)方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出InterruptedException。
超时获取同步状态过程可以被视作响应中断获取同步状态过程的“增强版”,doAcquireNanos(int arg,long nanosTimeout)方法在支持响应中断的基础上,增加了超时获取的特性。针对超时获取,主要需要计算出需要睡眠的时间间隔nanosTimeout,为了防止过早通知,nanosTimeout计算公式为:nanosTimeout-=now-lastTime,其中now为当前唤醒时间,lastTime为上次唤醒时间,如果nanosTimeout大于0则表示超时时间未到,需要继续睡眠nanosTimeout纳秒,反之,表示已经超时。
该方法在自旋过程中,当节点的前驱节点为头节点时尝试获取同步状态,如果获取成功则从该方法返回,这个过程和独占式同步获取的过程类似,但是在同步状态获取失败的处理上有所不同。如果当前线程获取同步状态失败,则判断是否超时(nanosTimeout小于等于0表示已经超时),如果没有超时,重新计算超时间隔nanosTimeout,然后使当前线程等待nanosTimeout纳秒(当已到设置的超时时间,该线程会从LockSupport.parkNanos(Object blocker,long nanos)方法返回)。
如果nanosTimeout小于等于spinForTimeoutThreshold(1000纳秒)时,将不会使该线程进行超时等待,而是进入快速的自旋过程。原因在于,非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让nanosTimeout的超时从整体上表现得反而不精确。因此,在超时非常短的场景下,同步器会进入无条件的快速自旋。
独占式超时获取同步状态doAcquireNanos(int arg,long nanosTimeout)和独占式获取同步状态acquire(int args)在流程上非常相似,其主要区别在于未获取到同步状态时的处理逻辑。acquire(int args)在未获取到同步状态时,将会使当前线程一直处于等待状态,而doAcquireNanos(int arg,long nanosTimeout)会使当前线程等待nanosTimeout纳秒,如果当前线程在nanosTimeout纳秒内没有获取到同步状态,将会从等待逻辑中自动返回。
5、自定义同步组件——TwinsLock
5.3 重入锁
重入锁ReentrantLock,能够支持一个线程对资源的重复加锁,除此之外,该锁还支持获取锁时的公平与非公平选择。
1、实现重进入
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决以下两个问题。
1)线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
2)锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁,锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。
ReentrantLock是通过组合自定义同步器来实现锁的获取与释放,以非公平行(默认的)实现为例:
ReentrantLock的nonfairTryAcquire方法该方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。
ReentrantLock的tryRelease方法如果该锁被获取了n次,那么前n-1次tryRelease(int releases)方法必须返回false,而只有同步状态安全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。
2、公平与非公平获取锁的区别
公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。
ReentrantLock的tryAcquire方法该方法与nofairTryAcquire(int acquires)比较,唯一不同的位置为判断条件多了hasQueuedPredecessors()方法,即加入同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,表示有线程比当前线程更早地请求获取锁,因为需要等待前驱线程获取并释放锁之后才能继续获取锁。
公平性锁保证了锁的获取按照FIFO原则,而代价是进行大量的线程切换。非公平性锁虽然可能造成线程”饥饿”,但极少的线程切换,保证了其更大的吞吐量。
5.4 读写锁
读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
一般情况下,读写锁性能都会比排它锁好,适用于读多于写的情况。Java并发包提供的读写锁的实现是ReentrantReadWriteLock,提供的特性如下:公平性选择、重入性、锁降级。
5.4.1 读写锁的接口与示例
ReadWriteLock仅定义了获取读锁和写锁这两个方法,即readLock()和writeLock()方法,而其实现类——ReentrantReadWriteLock,处理接口方法之外,还提供了一些便于外界监控其内部工作状态的方法,如下:
ReentrantReadWriteLock展示内部工作状态的方法5.4.2 读写锁的实现分析
接下来分析ReentrantReadWriteLock的实现,主要包括:读写状态设计、写锁的获取与释放、读锁的获取与释放以及锁降级
1、读写状态设计
读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。回想ReentrantLock中自定义同步器的实现,同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整数变量)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。
如果在一个整数变量上维护多种状态,就一定需要”按位切割使用“这个变量,读写锁将变量切分了两个部分,高16位表示读,低16位表示写。
读写锁状态划分的方式 上图中的同步状态表示一个线程已经获取了写锁,并且重进入了2次;同时也连续获取了2次读锁。读写锁是如何迅速确定读和写各自的状态呢?答案是:通过位运算。假设当前同步状态值为S,写状态等于S&0x0000FFFF(将高16位全部抹去),读状态等于S>>>16(无符号补0右移16位)。当写状态增加1时,等于S+1,当读状态增加1时,等于S+(1<<16),也就是S+0x00010000。
根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读状态(S>>>16)大于0,即读锁已被获取。
2、写锁的获取与释放
写锁是一个支持重进入的排他锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程已经获取写锁的线程,则当前线程进入等待状态。
ReentrantReadWriteLock的tryAcquire方法该方法除了重入条件(当前线程为获取了写锁的线程)之外,增加了一个读锁是否存在的判断。如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。
写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0时,表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。
3、读锁的获取与释放
读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已经被其他线程获取,则进入等待状态。
ReentrantReadWriteLock的tryAcquireShared方法在tryAcquireShared(int unused)方法中,如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态,成功获取读锁。读锁的每次释放均减少读状态,减少的值是(1<<16)。
4、锁降级
锁降级:指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级,锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
锁降级中读锁的获取是必要的。主要是为了保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。
5.5 LockSupport工具
LockSupport定义了一组公共的静态方法,这些方法提供了最基本的阻塞或者唤醒的功能,也是构建同步组件的基础工具。
LockSupport提供的阻塞和唤醒方法5.6 Condition接口
任意一个Java对象都拥有一组监视器方法(定义在java.lang.Object上),主要包括:wait()、wait(long timeout)、notify()、notifyAll(),这些方法和synchronized关键字结合使用,可以实现等待/通知模式。
Condition接口也提供了类似Object的监视器方法,与Lock接口配合使用实现等待/通知模式。对比Object的监视器方法和Condition接口,可以更加详细的了解Condition 的特性:
5.6.1 Condition接口与示例
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的。
Condition定义的(部分)方法以及描述:
Condition定义的(部分)方法以及描述5.6.2 Condition的实现分析
ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。 关于Condition的实现,主要包括:等待队列、等待和通知。
1、等待队列
等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入到等待队列并进入等待状态。事实上,同步队列和等待队列中的节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node。
一个Condition包含一个等待队列,Condition有首节点和尾节点。当前线程调用Condition.await()时会将当前线程构造节点,并将节点从尾部加入到等待队列。基本结构如下:
将新增节点添加到等待队列尾部不需要CAS操作,因为调用await()方法的线程肯定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
在Object的监视器模型上,一个对象拥有一个同步队列和一个等待队列;而并发包中的Lock拥有一个同步队列和多个等待队列,其对应关系是:
2、等待
调用Condition的await(),或以await开头的方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法中返回时,一定是获取了Condition相关联的锁。
如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到了等待队列中。
Condition的await()方法源码如下:
调用了该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造出节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继结点,然后当前线程会进入等待状态。
当等待队列中的结点被唤醒,则唤醒结点的线程开始尝试获取同步状态。如果不是通过其他线程调用的signal()唤醒,而是对等待线程进行中断,则会抛出InterruptedException.
当前线程加入到等待队列的过程:
3、通知
调用Condition的signal(),将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。 Condition的signal()方法源码:
调用该方法的前置条件是当前线程必须获取锁,接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。
节点从等待队列移到同步队列的过程:
Condition的signalAll()方法,相当于等待队列中的每个节点均被执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。
5.7 本章小结
第6章 Java并发容器和框架
6.1 ConcurrentHashMap的实现原理与使用
ConcurrentHashMap是线程安全且高效的HashMap。
6.1.1 为什么要使用ConcurrentHashMap
在并发编程中使用HashMap可能导致程序死循环,而使用线程安全的HashTable效率又非常低,基于以上2个原因,便有了ConcurrentHashMap的出现。
(1)线程不安全的HashMap
再多线程环境下,使用HashMap进行put操作时会引起死循环,导致CPU利用率接近100%,原因是多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。
(2)效率低下的HashTable
HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。
(3)ConcurrentHashMap的锁分段技术可优先提升并发访问效率
首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
6.1.2 ConcurrentHashMap的结构
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap类似,是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。
6.1.3 ConcurrentHashMap的初始化
ConcurrentHashMap初始化方法是通过initialCapacity、loadFactor、concurrencyLevel几个参数来初始化segments数组、段偏移量segmentShift,段掩码segmentMask和每个segment里的HashEntry数组 。
1、初始化segments数组
由上面的代码可知segments数组的长度ssize通过concurrencyLevel计算得出。为了能通过按位与的哈希算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方(power-of-two size),所以必须计算出一个是大于或等于concurrencyLevel的最小的2的N次方值来作为segments数组的长度。假如concurrencyLevel等于14,15或16,ssize都会等于16,即容器里锁的个数也是16。
2、初始化segmentShift和segmentMask:
这两个全局变量在定位segment时的哈希算法里需要使用,sshift等于ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16,1需要向左移位移动4次,所以sshift等于4。segmentShift用于定位参与hash运算的位数,segmentShift等于32减sshift,所以等于28,这里之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的。segmentMask是哈希运算的掩码,等于ssize减1,即15,掩码的二进制各个位的值都是1。因为ssize的最大长度是65536,所以segmentShift最大值是16,segmentMask最大值是65535,对应的二进制是16位,每个位都是1。
3、初始化每个segment:
输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每个segment的负载因子,在构造方法里需要通过这两个参数来初始化数组中的每个segment。
上面代码中的变量cap就是segment里HashEntry数组的长度,它等于initialCapacity除以ssize的倍数c,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。segment的容量threshold=(int)cap*loadFactor,默认情况下initialCapacity等于16,loadfactor等于0.75,通过运算cap等于1,threshold等于零。
【备注】:参数concurrencyLevel是用户估计的并发级别,就是说你觉得最多有多少线程共同修改这个map,根据这个来确定Segment数组的大小,默认为16。
6.1.4 定位Segment
既然ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过哈希算法定位到Segment。可以看到ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再哈希。
之所以进行再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。
6.1.5 ConcurrentHashMap的操作
本节介绍ConcurrentHashMap的3种操作——get、put、size
1、get操作
Segment的get操作实现非常简单和高效。先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素。
get操作的高效之处在于整个get过程不需要加锁,除非读到值为空才会加锁重读。因为用于统计当前Segment大小的count字段和用于存储值得HashEntry的value都被定义成volatile变量,而在get操作里只需要读不需要写共享变量count和value。在定位元素的代码里我们可以发现,定位HashEntry和定位Segment的散列算法虽然一样,都与数组的长度减去1再相“与”,但是相“与”的值不一样,定位Segment使用的是元素的hashcode通过再散列后得到的值得高位,而定位HashEntry直接使用的是再散列后的值。其目的是避免两次散列后的值一样,虽然元素在Segment里散列开了,但是却没有在HashEntry里散列开。
2、put操作
由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁。put方法首先定位到Segment,然后再Segment里进行插入操作。插入操作需要经历两个步骤,第一步在插入元素之前判断Segment里的HashEntry数组是否需要扩容,如果HashEntry数组超过容量,则创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组里。为了高效,ConcurrentHashMap只针对某个Segment进行扩容而不是整个容器。第二步定位添加元素的位置,然后将其放在HashEntry数组里。
3、size操作
如果要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。Segment里的全局变量count虽然被定义为volatile变量,但如果在累加前使用的count发生了变化,那么统计结果就不准了。最安全的做法是在统计size时,锁住所有的Segment的put,remove,clean方法,但显然很低效。
ConcurrentHashMap统计size的方法是,尝试2次不锁住Segment的方式来统计各个Segment大小,如果在统计过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。ConcurrentHashMap中modCount变量在调用put,remove和clean方法素前加1,从而来记录容器大小是否发生变化。
6.2 ConcurrentLinkedQueue
在并发编程中,如果要实现一个线程安全的队列,则有两种方式:
(1) 使用阻塞算法,入队和出队使用锁来控制。
(2) 非阻塞算法:即循环CAS方式来实现。
ConcurrentLinkedQueue就是使用非阻塞方式实现的基于链接节点的无界线程安全队列。它采用先进先出的规则对节点进行排序。新添加的元素会被添加到对尾,获取元素时,它会返回头部的元素。
6.2.1 ConcurrentLinkedQueue的结构
ConcurrentLinkedQueue的类图如下:
ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,由此组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。
6.2.2 入队列
1、入队列的过程
入队列就是将入队节点添加到队列的尾部。
入队主要做两件事情:
1、入队节点设置为当前队列尾节点的下一个节点。
2、更新tail节点,如果tail节点的next节点不为空,则将入队节点设置为tail节点,如果tail节点的next为空,则将入队节点设置为tail节点的next节点(注意:此时并未更新tail节点为尾节点)。所以,tail节点并不总是尾节点。
如果在单线程中执行没有任何问题,但如果在多线程中可能出现插队的情况。如果有一个线程正在入队,那么首先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这是如果另一个线程插队,则队列尾节点发生变化,当前线程需要暂停入队操作,重新获取新的尾节点。所以使用CAS算法来将入队节点设置为尾节点的next节点:
2、定位尾节点
tail节点并不总是尾结点,所以每次入队都必须先通过tail节点来找到尾结点。尾节点可能是tail节点或tail节点的next节点。
如果队列尾节点p与p的next节点都为空,则表示这个队列刚初始化,正准备添加节点,所以需要返回head节点。
3、设置入队节点为尾节点
p.casNext(null,n)方法用于将入队节点设置为当前队列尾节点的next节点,如果p是null,表示p是当前队列的尾节点,如果不为null,表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。
4、HOPS的设计意图
为什么不保证tail节点总是尾节点 ?
如果保证tail节点总是尾节点的话,那么入队操作直接通过tail节点定位到尾节点,然后把尾节点next节点更新为新的入队节点,随后更新tail节点为新的尾节点不就可以了吗?但这么做的有一个很明显的缺陷:每次都需要使用循环CAS更新tail节点为尾节点。一定程度上降低了入队的效率。所以在ConcurrentLinkedQueue入队时,并不是每次更新tail节点为尾节点,只有当tail节点和尾节点距离大于等于常量HOPS的值(默认为1)时才会更新tail节点。tail节点与尾节点距离越长,使用CAS更新tail节点次数越少,但每次入队时通过tail节点定位尾节点的时间就越长。但这样仍然可以提升入队效率,因为本质上来看通过增加volatile变量的读操作来减少volatile变量的写操作,而对volatile变量写操作的开销远远大于读操作。
【备注】:入队方法永远返回true,所以不要通过返回值判断入队是否成功。
6.2.3 出队列
出队列就是从队列里返回一个节点元素,并清空该节点对元素的引用。
以下为从队列获取4个元素的快照图:
从上图可以看出,并不是每次出队列都需要更新head节点为首节点,当head节点为空时,更新head节点为首节点,如果head节点不为空,则直接弹出head节点里的元素,并不会更新head节点为新的首节点。之所以这样设计,同样是为了减少CAS更新head节点从而提高出队效率。
首先获取首节点,然后判断首节点是否为空,如果为空,则证明另一个线程已经进行了一次出队操作,需要获取新首节点即原首节点的next节点。如果不为空则使用CAS方式将首节点引用设置为null,如果成功,则直接返回首节点的元素,如果不成功,则表示另外一个线程已经进行了一次出队操作并更新了head节点,导致元素发生变化,需要重新获取首节点。
6.3 Java中的阻塞队列
6.3.1 什么是阻塞队列?
阻塞队列是一个支持两个附加操作的队列。 这两个附加操作支持阻塞的插入和移除方法。
1)支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。
2)支持阻塞的移除方法:在队列为空时,获取元素的线程会等待队列变成非空。
阻塞队列常用于生产者和消费者的场景,生产者向队列里添加元素,消费者从队列里取元素。在阻塞队列不可用时(消费者取时,队列为空,生产者添加时,队列已满),这两个附加操作提供了4中处理方式:
- 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException(“Queuefull”)异常。当队列空时,从队列获取元素会抛出NoSuchElmentException异常。
- 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。当从队列取元素时,如果没有则返回null。
- 一直阻塞:当队列满时,如果生产者线程往队列put元素,则队列会一直阻塞生产者线程直到队列可用或响应中断。当队列空时,如果消费者线程从队列里take元素,队列会阻塞消费者线程直到队列不为空。
- 超时退出:当队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。当队列空时,如果消费者线程从队列取元素,队列会阻塞消费者线程一段时间,直到超过指定的时间,消费者线程退出。
【备注】:如果是无界阻塞队列,队列不可能出现满的情况,所以使用put或offer方法永远不会被阻塞,而且使用offer方法时,永远返回true。
6.3.2 Java里的阻塞队列
JDK7 提供的7个阻塞队列 :
(1)ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列。但可通过以下代码创建一个公平的阻塞队列:
为了保证公平性,通常会降低吞吐量,其实现是依靠可重入锁:
(2)LinkedBlockQueue:一个由链表结构组成的有界阻塞队列。
此队列的默认和最大长度为Integer.MAX_VALUE。按照先进先出(FIFO)的原则对元素进行排序。但不保证线程公平的访问队列,也不提供创建公平访问队列的方法。
(3)PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
默认情况下元素采用自然顺序升序排序。也可自定义类实现compareTo()方法来执行元素排序规则,或初始化PriorityBlockingQueue时,执行构造参数Comparator来对元素进行排序。但PriorityBlockingQueue不能保证同优先级元素的顺序。
(4)DelayQueue:一个使用优先级队列实现的无界阻塞队列。
DelayQueue是一个支持演示获取元素的无界阻塞队列。队列使用PriorityQueue实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
DelayQueue可以用在以下场景:
1)缓存系统的设计:用来保存缓存元素的有效期。使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
2)定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueued中获取到任务就开始执行,如TimerQueue就是使用DelayQueue实现的。
(5)SynchronousQueue:一个不存储元素的阻塞队列。
此队列每一个put操作必须等待一个take操作,否则不能继续添加元素。默认情况下线程采用非公平性策略访问队列,但可通过以下方法设置以公平策略访问:
SynchronousQueue本身不存储任何元素,只是负责把生产者线程处理的数据直接传递给消费者线程。其吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。
(6)LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
相比其他的阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
transfer方法:如果当前有消费者正在等待接收元素,transfer方法可以把生产者传入的元素立刻transfer给消费者。如果没有消费者等待接受元素,transfer方法会将元素存放在队列的tail节点,直到该元素被消费者消费才返回。
tryTransfer方法:tryTransfer方法用来试探生产者传入的元素是否能直接传给消费者消费。如果没有消费者等待接收元素,返回false,反之,返回true。tryTransfer方法会立即返回,而不用等待元素被消费以后才返回。
对于带有时间限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,试图把生产者传入的元素直接传给消费者,但如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没有消费元素,则返回false,如果在超时时间内消费了元素则返回true。
(7)LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
LinkedBlockingDeque是一个可以从队列两端插入和移除元素的队列。因为与其他阻塞队列相比,多了一个操作队列的入口,所以在多线程同时入队时,也就减少一半的竞争。
6.3.3 阻塞队列的实现原理
阻塞队列使用通知模式实现,即当生产者往满的队列里添加元素时会阻塞生产者线程,当消费者消费了队列中一个元素后,会通知生产者当前队列可用。
以下为ArrayBlockingQueue源码:
当往队列插入一个元素,如果队列不可用,那么阻塞生产者主要通过LockSupport.park(this)来实现。
6.4 Fork/Join框架
6.4.1 Fork/Join框架的定义
Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,它可以把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 其运行流程如下:
6.4.2 工作窃取算法
工作窃取算法是指某个线程从其他队列里窃取任务来执行。当我们需要做一个大任务时,可以把这个任务分割成若干互不依赖的子任务,为了减少线程竞争,把子任务分别放入不同队列,并为每个队列创建一个单独的工作线程,假设,有其他线程提前把自己队列任务做完之后,还需要等待其他线程,这时,为了提高效率,已经做完任务的线程会去其他队列窃取任务执行,以帮助其他未完成的线程。此时他们访问同一个队列,为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列。被窃取任务线程永远从双端队列头部取任务执行,窃取任务线程永远从双端队列尾部拿任务执行。其运行流程图如下:
工作窃取算法优点:就是充分利用线程进行并行计算,减少了线程间的竞争。
工作窃取算法缺点:当双端队列里只有一个任务时,还时会存在竞争。而且该算法创建多个线程和多个双端队列,会消耗更过的系统资源。
6.4.3 Fork/Join框架的设计
Fork/Join框架有以下两个步骤:
- 分割任务:首先我们需要有一个fork类来把大任务分割成子任务,如果子任务仍然很大,还需要不停分割,直到分割出的子任务足够小。
- 执行任务并合并结果:分割的子任务分别放在双端队列,然后启动线程分别从队列取任务执行,执行完的结果统一放在一个队列里,启动一个线程从队列里拿数据并合并数据。
Fork/Join使用两个类来完成上述工作:
- ForkJoinTask:要使用ForkJoin框架,必须先创建一个ForkJoin任务。它提供在任务中执行fork和join操作机制,在使用时,我们通常继承ForkJoinTask的子类:RecursiveAction或RecursiveTask,两者区别是RecursiveAction用于没有返回结果的任务,RecursiveTask用于有返回结果的任务。
- ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。被分割出来的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部窃取一个任务执行。
6.4.4 使用Fork/Join框架
6.4.5 Fork/Join框架的异常处理
ForkJoinTask在执行时可能抛出异常,但我们无法再主线程里捕捉异常,但可以通过ForkJoinTask的isCompletedAbnnormally()方法来检查任务是否已经抛出异常或已经被取消,调用getException()可以获取异常,此方法返回Throwable对象,如果任务取消则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。
6.4.6 实现原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
(1)ForkJoinTask的fork方法实现原理
当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。
pushTask方法把当前任务存放在ForkJoinTask数组队列里,然后调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。
(2)ForkJoinTask的join方法实现原理
Join方法的主要作用是阻塞当前线程并等待获取结果。
首先,它调用doJoin()方法获取当前任务状态,任务有4中状态:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)。如果任务状态是已完成,则直接返回任务结果。如果任务被取消,则抛出CancellationException。如果任务抛出异常,则直接抛出对应异常。
doJoin()方法源代码:
在doJoin()方法中,会查看任务状态,如果任务已经执行完成,则直接返回任务状态,如果没有执行完成,则从任务数组里取出任务并执行。如果顺利执行完毕,则将任务状态设置为NORMAL,如果出现异常,则将任务状态设置为EXCEPTION。
6.5 本章小结
第7章 Java中的13个原子操作类
当一个线程更新一个变量时,程序如果没有正确的同步,那么这个变量对于其他线程来说是不可见的。我们通常使用synchronized或者volatile来保证线程安全的更新共享变量。在JDK1.5中,提供了java.util.concurrent.atomic包,这个包中的原子操作类提供了一种用法简单,性能高效,线程安全地更新一个变量的方式。
Atomic包里一共提供了13个类,有4种类型的原子更新方式:原子更新基本类型、原子更新数组、原子更新引用和原子更新属性。其实现基本都是使用Unsafe实现的包装类。
7.1 原子更新基本类型类
- AtomicBoolean:原子更新布尔类型
- AtomicInteger:原子更新整型
- AtomicLong:原子更新长整型
以上3个类提供的方法基本一致,我们以AtomicInteger为例进行分析。
AtomicInteger常用的方法有:
- int addAndGet(int delta):以原子方式将输入的数值与实例中的值相加,并返回结果。
- boolean compareAndSet(int expect,int upate):如果输入的数值等于预期值,则以原子方式将该值设置为输入的值。
- int getAndIncrement():以原子方式将当前值加1,返回自增前的值。
- void lazySet(int newValue):最终会设置成new Value,但可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
- int getAndSet(int newValue):以原子方式设置为newValue,并返回旧值。
其实现依靠我们熟悉的CAS算法:
在Java的基本类型中除了Atomic包中提供原子更新的基本类型外,还由char、float和double。那么这些在Atomic包中没有提供原子更新的基本类型怎么保证其原子更新呢?
从AtomicBoolean源码中我们可以得到答案:首先将Boolean转换为整型,然后使用comareAndSwapInt进行CAS,所以原子更新char、float、double同样可以以此实现。
7.2 原子更新数组
- AtomicIntegerArray:原子更新整型数组里的元素。
- AtomicLongArray:原子更新长整型数组里的元素。
- AtomicReferenceArray:原子更新引用类型数组里的元素。
【备注:】看书上说原子更新数组有4个类,除了上述3个外,还有AtomicBooleanArray类,但我在jdk5/6/7/8中都没有找到这个类的存在,只找到共12个原子操作类,而不是标题中的13个。不知道是否是书中的错误?请知情的童鞋不吝赐教。
AtomicIntegerArray类主要提供原子的方式更新数组里的整型,其常用方法如下:
- int addAndGet(int i,int delta):以原子方式将输入值与数组中索引i的元素相加。
- boolean compareAndSet(int i,int expect,int update):如果当前值等于预期值,就把索引i的元素设置成update值。
【备注】:在AtomicIntegerArray构造方法中,AtomicIntegerArray会将数组复制一份,所以当其对内数组元素进行修改时,不会影响原传入数组。
7.3 原子更新引用类型
原子更新基本类型的AtomicInteger,只能更新一个变量,如果需要原子更新多个变量,就需要使用这个原子更新引用类型提供的类。Atomic包提供以下3个类:
- AtomicReference:原子更新引用变量。
- AtomicReferenceFieldUpdater:原子更新引用类型里的字段。
- AtomicMarkableReference:原子更新带有标记位的引用类型 。可以原子更新一个布尔类型的标记位和引用类型。构造方法时AtomicMarkableReference(V initialRef,boolean initialMark)。
7.4 原子更新字段类
如果需要原子的更新某个类里的某个字段,就需要使用原子更新字段类,Atomic包提供了以下3个类进行原子字段更新:
- AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
- AtomicLongFiledUpdater:原子更新长整型字段的更新器。
- AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,用于原子的更新数据和数据的版本号,避免CAS的ABA问题、
想要原子的更新字段类需要调用静态方法newUpdater()创建一个更新器,并设置想要更新的类和属性。且更新类的字段必须使用public volatile修饰符。
7.5 本章小结
第8章 Java中的并发工具类
在JDK的并发包里提供了几个非常有用的工具类。CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类提供了在线程间交换数据的一种手段。
8.1 等待多线程完成的CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作。
在JDK1.5之前,我们要达到一个或多个线程等待其他线程完成操作,需要使用thread.join()方法,不停检查等待的线程是否存活。而CountDownLatch的出现给我们提供了一种功能更强大、更优雅的方法。
CountDwonLatch的构造方法结构一个int类型的参数作为计数器的值,如果需要等待N个线程完成工作,那就传入N。
调用countDown()方法,计数器值会减1,当计数器值为0时,会唤醒被await阻塞的线程。CountDownLatch同样提供超时时间的await(long time,TimeUnit unit)。如果到了指定的时间,计数器仍然不为0,则同样会唤醒线程。
【备注】:CountDownLatch不能重新初始化或修改其内部计数器的值。
8.2 同步屏障CyclicBarrier
CyclicBarrer的作用是让一组线程达到一个屏障(同步点)时被阻塞,直到所有的线程到达此屏障时,才会唤醒被屏障阻塞的所有线程。
CyclicBarrier默认构造方法CyclicBarrier(int parties),需要传入屏障需要拦截的线程数量。每个线程通过调用await方法告诉CyclicBarrier已经到达屏障,随后被阻塞,直到所有线程到达屏障或线程被中断才被唤醒。
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parites,Runnable barrierAction),用于在所有线程到达屏障并被唤醒时,优先执行barrierAction。
CyclicBarrier和CounDownLatch的区别
- CountDownLatch的计数器只能使用一次,而CyclicBarrier计数器可以使用reset()方法重置。
- CyclicBarrier还提供获取阻塞线程数量及检测阻塞线程是否被中断等CountDownLatch没有的方法。
【备注】: CountDownLatch的getCount()方法返回还有多少需要调用countDown方法去tryReleaseShared使计数器归0的线程数量。CyclicBarrier的getParties()方法同样返回需要到达屏障的线程数量。
8.3 控制并发线程数的Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
Semaphore(2)表示允许2个线程获取许可证,也就是最大并发数为2。通过调用acquire()方法获取许可证,使用完毕之后调用release()方法归还许可证。
除此之外,Semaphore还提供一些其他方法:
- intavailablePermits():返回此信号量中当前可用的许可证数。
- intgetQueueLength():返回正在等待获取许可证的线程数。
- booleanhasQueuedThreads():是否有线程正在等待获取许可证。
- void reducePermits(int reduction):减少reduction个许可证,是个protected方法。
- Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方法。
8.4 线程间交换数据的Exchanger
Exchanger(交换者)是一个用于线程协作的工具类。用于进行线程间的数据交换。Exchanger提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。线程通过调用Exchanger的exchange()方法来通知Exchanger已经到达同步点,并被阻塞直到另外一个线程也调用exchange()方法到达同步点时,两个线程才可以交换数据。
8.5 本章小结
第9章 Java中的线程池
Java中的线程池是运营场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池,在开发过程中,合理地使用线程池能够带来3个好处:
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度:当任务到达时,任务可以不需要等待线程创建就可以立即执行。
- 提高线程的可管理性:线程时稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
9.1 线程池的实现原理
当提交一个新任务到线程池时,线程池的处理流程如下:
(1) 线程池判断核心线程池里的线程是否都在执行任务,如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
(2) 线程池判断工作队列是否已经满。如果没满,则将任务放入工作队列,等待核心线程池有空闲线程时,再取出来执行。如果满了,则进入下个流程。
(3) 线程池判断线程池的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果满了,则交给饱和策略取处理这个任务。
流程图如下:
线程池的主要处理流程ThreadPoolExecutor执行execute方法分下面4种情况。
1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。
源码分析:
工作线程:线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行。我们可以从Worker类的run()方法里看到这点。
ThreadPoolExecutor执行任务示意图线程池中的线程执行任务分两种情况,如下。
1)在execute()方法中创建一个线程时,会让这个线程执行当前任务。
2)这个线程执行完上图中1的任务后,会反复从BlockingQueue获取任务来执行。
9.2 线程池的使用
9.2.1 线程池的创建
我们可以通过ThreadPoolExecutor来创建一个线程池:
创建一个线程池需要输入一下几个参数:
1)corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
2)runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用这个队列SynchronousQueue,一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
3)maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。
4)ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字,代码如下:new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
5)RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK 1.5中Java线程池框架提供了以下4种策略。AbortPolicy:直接抛出异常。CallerRunsPolicy:只用调用者所在线程来运行任务。DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。DiscardPolicy:不处理,丢弃掉。当然,也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化存储不能处理的任务。keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。
9.2.2 向线程池提交任务
可以使用execute()或submit()方法向线程池提交任务。
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。通过以下代码可知execute()方法输入的任务是一个Runnable类的实例。
submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
9.2.3 关闭线程池
可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。
9.2.4 合理地配置线程池
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。
任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
任务的优先级:高、中和低。
任务的执行时间:长、中和短。
任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的线程,如配置N cpu +1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*N cpu 。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。
注意:如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。
建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。有一次,我们系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然,我们的系统所有的任务是用单独的服务器部署的,我们使用不同规模的线程池完成不同类型的任务,但是出现这样问题时也会影响到其他任务。
9.2.5 线程池的监控
如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根据线程池的使用状况快速定位问题。可以通过线程池提供的参数进行监控,在监控线程池的时候可以使用以下属性。
taskCount:线程池需要执行的任务数量。
completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。
largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减。
getActiveCount:获取活动的线程数。通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的beforeExecute、afterExecute和terminated方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。
protected void beforeExecute(Thread t, Runnable r) { }
9.3 本章小结
第10章 Executor框架
Java线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元和执行机制分离开来。工作单元包括Runnable和Callable,而执行机制有Executor框架提供。
10.1 Executor框架简介
10.1.1 Executor框架的两级调度模型
在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程。当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。
在上层,Java多线程程序通常会把应用分解成若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。 其模型图如下:
1、Executor框架的结构
Executor框架主要又3大部分组成:
1. 任务:包括被执行任务需要实现的接口:Runnable接口或Callable接口。
2. 任务的执行:包括任务执行机制的核心接口Executor,及其子接口ExecutorService接口。 Executor框架有两个关节类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)
3. 异步计算的结果:包括接口Future和其实现类FutureTask类。
Executor框架的类与接口关系示意图:
下面是这些类和接口的简介。
Executor是一个接口,它是Executor框架的基础,它将任务的提交和任务的执行分离开来。
ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。SchduledThreadPoolExecutor比Timer更灵活,功能更强大。
Future接口和实现Future接口的FutureTask类,代表异步计算的结果
Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或SchduledThreadPoolExecutor执行
2、Executor框架的成员
Executor框架的主要成员:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口、Executors。
(1)ThreadPoolExecutor
ThreadPoolExecutor通常使用工厂类Executors来创建,Executors可以创建3种类型的ThreadPoolExecutor: SingleThreadExecutor、FixedThreadPool、CachedThreadPool。
1) FixedThreadPool:创建固定线程数的线程池,构造函数中可以指定线程数量,适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。
FixedThreadPool内部使用无界队列LinkedBlockingQueue作为任务队列,队列的容量为Integer.MAX_VALUE,由于是无界队列,所以不会拒绝任务,可能会造成任务无限堆积,从而导致系统资源耗尽的情况。
2) SingleThreadExecutor:创建单个线程的线程池,可以保证顺序执行任务。与FixedThreadPool类似,只是SingleThreadExecutor的线程数固定为1
3) CachedThreadPool:可以根据需要创建新的线程,CachedThreadPool是大小无界的线程池,适用于执行很多短期异步任务的小程序,或者是负载比较轻的服务器。CachedThreadPool的corePool为空,maximumPoolSize为Integer.MAX_VALUE,keepAliveTime为60L,这意味着线程空闲超过60秒则会进行回收。CachedThreadPool内部使用不存储元素的SynchronousQueue作为任务队列(一个put操作等待着一个take操作),这意味着如果任务的提交速度高于线程的处理速度,那么CachedThreadPool则会不断的创建新的线程,在极端的情况下,会耗尽CPU和内存资源。
(2) SchduledThreadPoolExecutor
SchduledThreadPoolExecutor通常使用工厂类Executors来创建,Executors可以创建2中类型的SchduledThreadPoolExecutor,如下:
SchduledThreadPoolExecutor。包含若干个线程的SchduledThreadPoolExecutor。
SingleThreadSchduledExecutor。只包含一个线程的SchduledThreadPoolExecutor。
(3)Future接口
Future接口和实现Future接口的FutureTask类用来表示异步计算的结果,当我们把Runnable接口或者Callable接口的实现类提交(submit)给ThreadPoolExecutor或者SchduledThreadPoolExecutor时,ThreadPoolExecutor或者SchduledThreadPoolExecutor会向我们返回一个FutureTask对象。下面是对应的API
(4) Runnable和Callable接口
Runnable和Callable接口的实现类都可以被hreadPoolExecutor或者SchduledThreadPoolExecutor执行。它们之间的区别是Runnable不会返回结果,而Callable可以返回结果。
除了可以自已创建实现Callable接口的对象外,还可以使用工厂类Executors来把一个Runnable包装成一个Callable。
当我们把一个Callable对象提交给ThreadPoolExecutor或者SchduledThreadPoolExecutor执行时,summit()会向我们返回一个FutureTask对象。我们可以执行FutureTask.get()来等待任务执行完成。当任务完成后FutureTask.get()将会返回任务的结果。
10.2 ThreadPoolExecutor详解
ThreadPoolExecutor是Executor框架最核心的类。主要由corePool(核心线程池大小)、maximumPool(最大线程池大小)、BlockingQueue(工作队列)和RejecteExecutionHandler(饱和策略)构成。
ThreadPoolExecutor通常使用Executors来创建。Executors可以创建3中类型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。
10.2.1 FixedThreadPool详解
FixedThreadPool 被称为可重用固定线程数的线程池。
FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。当线程池数大于corePoolSize时,keepAliveTime为多余空闲线程等待新任务的最长时间,超过这个时间多余线程就会被终止。keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。 其运行示意图如下:
1>如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。
2>在线程池完成预热之后(当前线程中有一个运行的线程),将任务加入LinkedBlockingQueue。
3>线程执行完1中的任务后,会在一个无线循环中反复从LinkedBlockingQueue获取任务来执行。
FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。使用无界队列作为工作队列会对线程池带来如下影响:
a>当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程不会超过corePoolSize。
b>maximumPoolSize变为一个无效参数。
c>keepAliveTime也变为一个无效参数。
d>永远不会执行饱和策略。
10.2.1 SingleThreadExecutor详解
SingleThreadExecutor是使用单个worker线程的Executor。 下面是其源代码:
SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1。并使用LinkedBlockingQueue无界队列作为工作队列。 其运行示意图如下:
1>如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务。
2>在线程池完成预热之后(当前线程中有一个运行的线程),将任务加入LinkedBlockingQueue。
3>线程执行完1中的任务后,会在一个无线循环中反复从LinkedBlockingQueue获取任务来执行。
10.2.3 CachedThreadPool详解
CacheThreadPool是一个会更加需要创建新线程的线程池。 下面是其源代码:
CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool时无界的。keepAliveTime被设置为60L,意味着空闲线程超过60秒后被终止。
CachedThread使用没有容量的SynchronousQueue作为线程池的工作队列,但其maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度是,CachedThreadPool会不断创建新的线程,极端情况下,会耗尽CPU和内存资源。 其执行示意图如下:
1>首先执行SynchronousQueue.offer(Runnable task)。如果当前有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECODES),则配对成功,将任务交给空闲线程执行。
2>当没有空闲线程时,创建一个新线程执行任务。
3>线程在执行任务完毕后,执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECODES),向队列请求任务,并等待60秒。如果60之后仍没有新任务,则被终止。如果有新任务则继续执行。
10.3 ScheduledThreadPoolExecutor 详解
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或定期执行任务。 ScheduledThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建2种类型的ScheduledThreadPoolExecutor:
1>ScheduledThreadPoolExecutor适用于需要执行周期任务,同时为了满足资源管理的需求而限制后台线程的数量的应用场景。
2>SingleThreadScheduledExecutor:只包含一个线程的ScheduledThreadPoolExecutor,适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各任务的应用场景。
10.3.1 ScheduledThreadPoolExecutor的运行机制
其执行示意图如下:
DelayQueue是一个无界队列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中没什么意义。ScheduledThreadPoolExecutor的执行分为两大部分:
1>当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()或scheduleWithFixedDelay()时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFutur接口的ScheduledFutureTask。
2>线程池中的线程从DelayQueue中获取ScheduledFutureTask并执行。
10.3.2 ScheduledThreadPoolExecutor的实现
ScheduledThreadPoolExecutor把待调度的任务放在一个DelayQueue中。ScheduledFutureTask主要包含3个成员变量:
long型变量 time,表示这个任务将要被执行的具体时间。
long型变量 sequenceNumbe,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号。
long型成员变量 period,表示任务执行的间隔周期。
DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对对列中的ScheduledFutureTask进行排序。time小的先执行,被排在前面。如果两个任务time相同则比较sequenceNumbe。
ScheduledThreadPoolExecutor执行任务的步骤如下:
1>线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是指ScheduledFutureTask的time大于等于当前时间。
2>线程1执行这个ScheduledFutureTask。
3>线程1修改ScheduledFutureTask的time变量为下次要被执行的时间。
4>线程1把修改time之后的ScheduledFutureTask放入DelayQueue(DelayQueue.add())。
以下是DelayQueue.take()方法源码:
DelayQueue.add()方法源码:
10.4 FutureTask详解
Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。
10.4.1 FutureTask 简介
FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。FutureTask有未启动、已启动、已完成3种状态。
FutureTask的状态迁移示意图如下:
1>未启动。FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。
2>已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
3>已完成。FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel()),或FutureTask.run()时抛出异常而异常结束,FutureTask处于已完成状态。
10.4.2 FutureTask的使用
下面是FutureTask在不同状态时调用FutureTask.get()及Future.cancel()方法的执行示意图:
10.4.3 FutureTask的实现
FutureTask实现基于AQS.
10.5 本章小结
第11章 Java并发编程实践
生产者和消费者模式
线上问题定位
性能测试
异步任务池
完结……
网友评论