并发
如何减少上下文切换
- 无锁并发编程:将数据ID按照Hash算法取模分段,不同线程处理不同段数据
- CAS算法:Java的Atomic包使用CAS算法来更新数据
- 使用最少线程:避免创建不必要的线程
- 协程:单线程里实现多任务调度
避免死锁方法
- 避免一个线程同时获取多个锁
- 避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源
- 使用定时锁,lock.tryLock(timeout)来替换使用内部锁机制
- 对于数据库锁,加锁解锁必须在一个数据库连接里,否则会出现解锁失效的情况
并发机制的底层实现原理
volatile
- 保证线程间变量修改的可见性,确保所有线程看到这个变量的值是一致的
- 有volatile变量修饰的共享变量在进行写操作时,会多一条lock指令:
- 将当前处理器缓存行的数据写回到系统内存
- 这个写回内存的操作会使在其他CPU里缓存了该内存地址的数据无效
- 内部缓存(L1,L2)/ 缓存一致性协议
volatile的使用优化
LinkedTransferQueue
synchronized
- 对于普通同步方法,锁是当前实例对象
- 对于静态同步方法,锁是当前类的Class对象
- 对于同步方法块,锁是Synchroized括号里配置的对象
实现原理
- JVM基于进入和退出Monitor对象来实现方法同步和代码块同步
- 代码块是用monitorenter和monitorexit指令实现的
- 方法同步细节在JVM规范中没有详细说明,但同样可以用上面两个指令实现
- monitorenter是编译后插入到同步代码块的开始位置,monitorexit是插入到方法结束处和异常处
- 任何对象都有一个monitor与之关联,当且一个monitor被持有后,它处于锁定状态
- 同步方法依靠方法修饰符上的ACC_SYNCHRONIZED来完成的
Java对象头
- synchronized用的锁是存在Java对象头里的
- 对象头的Mark Word默认为下:
锁状态 | 25bit | 4bit | 1bit是否是偏向锁 | 2bit锁标志位 |
---|---|---|---|---|
无锁状态 | 对象hashcode | 对象分代年龄 | 0 | 01 |
锁状态标志位:
锁状态 | 2bit |
---|---|
轻量级锁 | 00 |
重量级锁 | 10 |
GC标记 | 11 |
偏向锁 | 01 |
锁的升级与对比
- 锁级别从低到高:无锁状态、偏向锁状态、轻量级锁、重量级锁
- 偏向锁:HotSpot作者经过以往的研究发现大多数情况下锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代码更低因此引入了偏向锁。偏向锁的获取过程为:
- 访问Mark Word中偏向锁的标识是否设置为1,所标志位是否为01----确认为可偏向状态
- 如果为可偏向状态,则测试线程id是否指向当前线程,如果是,执行(5),否则执行(3)
- 如果线程id并未指向当前线程,通过CAS操作竞争锁。如果竞争成功,则将Mark Word中的线程id设置为当前线程id,然后执行(5);如果竞争失败,执行(4)
- 如果CAS获取偏向锁失败,则表示有竞争。当达到全局安全点(safepoint)时获得偏向锁的线程被挂起,偏向锁升级为轻量级锁(因为偏向锁是假设没有竞争,但是这里出现了竞争,要对偏向锁进行升级),然后被阻塞在安全点的线程继续往下执行同步代码
- 执行同步代码
偏向锁的释放:
- 偏向锁的撤销在上述第四步骤中有提到。偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动去释放偏向锁。
- 偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行)
- 它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态,撤销偏向锁后恢复到未锁定(标志位为“01”)或轻量级锁(标志位为“00”)的状态。
-
轻量级锁:
轻量级锁的加锁过程:
- 在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝,官方称之为 Displaced Mark Word。
- 拷贝对象头中的Mark Word复制到锁记录中。
- 拷贝成功后,虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向锁记录的指针
- 如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位设置为“00”,即表示此对象处于轻量级锁定状态
- 如果这个更新操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行。否则说明多个线程竞争锁,轻量级锁就要膨胀为重量级锁,锁标志的状态值变为“10”,Mark Word中存储的就是指向重量级锁(互斥量)的指针,后面等待锁的线程也要进入阻塞状态。 而当前线程便尝试使用自旋来获取锁,自旋就是为了不让线程阻塞,而采用循环去获取锁的过程。
轻量级锁的释放过程:
- 通过CAS操作尝试把线程中复制的Displaced Mark Word对象替换当前的Mark Word。
- 如果替换成功,整个同步过程就完成了。
- 如果替换失败,说明有其他线程尝试过获取该锁(此时锁已膨胀),那就要在释放锁的同时,唤醒被挂起的线程。
-
重量级锁
锁之间对比
原子操作实现原理
处理器如何实现原子操作
- 使用总线锁来保证原子性
- 使用缓存锁来保证原子性 缓存一致性,但以下两种情况不能使用缓存锁定:
- 操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行,则处理器会调用总线锁定
- 处理器不支持缓存锁定
Java如何实现原子操作
- 使用循环CAS实现原子操作:
- JVM中的CAS操作利用处理器提供的CMPXCHG指令实现的
- 自旋CAS实现的基本思路就是循环进行CAS操作直到成功为止
- CAS实现原子操作的三大问题:
- ABA问题:因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。
Java1.5开始JDK的atomic包里提供了一个类AtomicStampedReference来解决ABA问题 - 循环时间长开销大:自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销
- 只能保证一个共享变量的原子操作:对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁
- 使用锁机制实现原子操作:
- 锁机制保证了只有获得锁的线程能够操作锁定的内存区域
- JVM内部实现了很多种锁机制,有偏向锁,轻量级锁和互斥锁
Java并发编程基础
线程简介
-
一个Java程序的运行不仅是main()线程,还包括多个其他线程
-
程序正确性不能依赖线程优先级的高低
-
线程状态:
image
-
线程状态的变化:
线程状态的变化
-
demon线程:
- Daemon线程是一种支持性线程,主要是用在后台程序做一些后台调度与支持性工作。这意味着当JVM中没有非Daemon线程时,JVM将自动退出。
- 可以通过调用Thread.setDaemon(true)方法将线程设为Daemon线程。(注:该方法必须在start()或者run()方法前执行,也就是说必须在线程启动前执行)
- Daemon线程被用作,完成支持性工作,但是在java虚拟机退出时,Daemon线程中的finally块并不一定会执行。 注:在构建Daemon时,不能依靠finally块中的内容来确保执行关闭或清理资源的逻辑。
启动和终止线程
构造线程
- 线程运行前需要先构造一个线程
- 父线程就是当前线程(开启多线程的线程),子线程会具有与父线程一致的优先级, 守护线程,线程组,还会有父线程的可继承ThreadLocal。还会分配给一个唯一的ID
- java.lang.Thread的init()方法运行完毕,线程对象就初始化好了,在堆内存中等待运行
启动线程
- 线程完成初始化后,调用start()方法就可以启动这个线程
- 线程start()的含义:当前线程同步告知JVM,只要线程规划器空闲,应立即启动调用start()方法的线程
- 作为一个习惯,最好为自定义线程起一个好名字,方便使用jstack分析程序
理解中断
- 中断:一个标识位属性
- 其他线程通过调用该线程的interrupt()方法使其进入中断状态
- 线程通过方法isInterrupted()来判断是否被中断,也可以调用静态方法Thread.interrupted()对当前线程的中断进行复位
- 只要线程进入中断状态(调用interrupt()方法),再调用sleep(),会抛出异常InterruptedException。同时JVM会将线程的打断状态清空,此时再调用isInterrupted()会返回false。
过期的suspend()、resume()和stop()方法
- suspend()用于暂停线程、resume()用于恢复线程、stop()用于停止线程,这三个方法都过期了
- suspend()不会释放锁,会导致线程占用资源进入休眠状态,容易导致死锁
- stop()不能保证线程资源的正确释放,通常没有给予线程完成资源释放工作的机会,可能会导致程序运行在不确定的状态
- 暂停恢复方法可以用等待/通知机制完成
安全地终止线程
- 利用中断标志位终止线程
- 利用一个volatile型boolean变量来控制是否需要停止任务并终止该线程
线程间通信
volatile和synchronized关键字
- volatile可以用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性
- synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性
等待/通知机制
- 生产者/消费者模型
- 定义:等待/通知机制是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。上述两个线程对象O来完成交互,而对象上的wait()和notify/notifyAll()的关系就如同开关信号一样,用来完成等待方通知方之间的交互工作
等待/通知的相关方法是任意Java对象都具备的,这些方法被定义在所有对象的超类java.lang.Object上。
- 相关方法:
- 注意事项:
- 使用wait,notify,notifyAll需要先对调用对象加锁
- 调用wait方法后,状态由Running变成WAITING,并将当前线程放入等待队列中
- notify,notifyAll方法调用后,wait方法不会立刻返回,而是要等调用notify,notifyAll方法的线程释放锁后
- notify将等待队列中一个等待线程移到同步队列中,notifyAll将等待队列中所有等待线程移到同步队列中,被移到线程状态由WAITING变成BLOCKED
- 从wait方法返回前提是获取了对象锁
- wait方法会释放对象锁
等待/通知的经典范式
- 等待/通知的经典范式分为两部分,分别针对等待方(消费者)和通知方(生产者)
- 等待方:
- 获取对象锁
- 如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件
- 条件满足则执行对应的逻辑
对应的伪代码如下:
synchronized(对象){
while(条件不满足){
Object.wait();
}
逻辑处理
}
- 通知方:
- 获得对象的锁
- 改变条件
- 通知所有等待在对象上的线程
对应的伪代码如下:
synchronized(对象){
改变条件
对象.notifyAll();
}
管道输入/输出流
- 主要用于线程间的数据传输,传输的媒介为内存
- PipedOutputStream PipedInputStream PipedReader PipedWriter:I/O知识点
Thread.join()
- 如果一个线程A执行了thread.join()语句,其含义是:当前线程A等待thread线程终止之后才从thread.join()返回
- Thread除了提供join()方法之外,还提供了join(long millis)和join(longmillis,int nanos)两个具备超时特性的方法。
- join方法源码:
// 加锁当前线程对象
public final synchronized void join(long millis){
// 条件不足,继续等待
while (isAlive()) {
wait(0);
}
// 条件符合,方法返回
}
- 当线程终止时,会调用线程自身的notifyAll方法,会通知所有等待再该线程对象上的线程,join方法的逻辑与等待/通知的经典范式一致
ThreadLocal的使用
- 定义:ThreadLocal 是一个线程内部的数据存储类,通过它可以在指定的线程中存储数据,数据存储以后,只有在指定线程中可以获取到存储的数据,对于其他线程来说则无法获取到数据。
虽然在不同线程中访问的是同一个 ThreadLocal 对象,但是它们通过 ThreadLocal 获取到的值却是不一样的。一般来说,当某些数据是以线程为作用域并且不同线程具有不同的数据副本的时候,就可以考虑采用 ThreadLocal。
-
原理:ThreadLocaL是一个泛型类,一个线程中有可以有多个ThreadLocal,一个线程只有一个ThreadLocalMap,要获取数据时,先通过当前线程名获取当前线程ThreadLocalMap,ThreadLocalMap保存的是键值对,键值为不同的ThreadLocal对象,值为相应ThreadLocal保存的数据,获取数据时以ThreadLocal为键值去ThreadLocalMap中获取对应的数据
Java中的锁
- 锁是用来控制多个线程访问共享资源的方式,一般用来防止多个线程同时访问共享资源
- 但是有些锁允许多个线程并发访问共享资源,比如读写锁
Lock接口
- Lock接口出现以前,Java依靠synchronized关键字实现锁功能
- Lock接口可以显式的获取锁和释放锁,有较强的扩展性
-
Lock接口与synchronized关键字对比:
image -
Lock接口API:
image
队列同步器:AbstractQueuedSynchronizer
- 是构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作
- 主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态
- 在抽象方法的实现过程中对同步状态进行更改,需要使用到同步器提供的三个方法:getState()、setState(int newState)和compareAndSetState(int expect,int update)来进行操作,这三个方法可以保证状态的改变是安全的
- 子类被推荐定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放方法来供自定义同步组件使用
- 同步器即可以支持独占式获取同步状态,也可以支持共享式地获取同步状态,这样方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock、CountDownLatch等)
- 同步器是实现锁(也可以是任何同步组件)的关键:在锁中聚合同步器,利用同步器实现锁的语义。
锁与同步器关系
- 锁是面向使用者的,他定义了使用者与锁交互的接口(比如允许两个线程并行访问),隐藏了实现细节;
- 同步器是面向锁的实现者,它简化了锁的实现方式,屏蔽了同步管理状态、线程的排队、等待与唤醒等底层操作;
- 锁让使用者仅仅是调用其方法既可以实现同步效果、同步器让实现者通过重写抽象方法进行了队列的底层操作;
- 他们两个是使用者和实现者关注不同的领域实现了相同的效果
队列同步器的接口与示例
- 同步器基于模板设计模式实现的,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义的同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法会调用使用者重写的方法
- 重写同步器指定方法时需要使用同步器提供的如下三个方法来访问或修改同步状态:
- getState():获取当前同步状态
- setState(int new State):设置当前同步状态
- compareAndState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性
- 一个独占锁实例:
public class Metux implements Lock {
//静态内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer{
//是否处于占用状态
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
//当状态为0的时候获取锁
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0,1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//释放锁,将状态设置为0
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) throw new
IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//返回一个condition,每个condition都包含了一个condition队列
Condition newCondition() {
return new ConditionObject();
}
}
//仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
@Override
public void lock() {
sync.tryAcquire(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public void unlock() {
sync.tryRelease(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
}
队列同步器的实现分析
- 同步队列:同步器依赖于内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构成一个节点(Node)并将其加入同步队列,同时阻塞当前线程,当同步状态释放时,会将首节点中的线程唤醒,使其再次尝试获取同步状态
image - 独占式同步状态获取与释放:
- 通过同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是说由于线程获取同步状态失败后进入同步队列中,后继对线程进行中断操作时,线程不会从同步队列移除
-
acquire方法:
image
首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现 -
acquire方法整体流程:
image -
释放同步状态:
image
共享式同步状态获取与释放
- 共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态
-
获取共享式同步状态:通过调用同步器的acquireShared(int arg)方法可以共享式地获取同步状态:
image
独占式超时获取同步状态
- 通过调用同步器的tryAcquireNanos(int arg,long nanosTimeout)方法可以超时获取同步状态,即在指定的时间内获取同步状态,如果获取到同步状态则返回true,否则,返回false。
- 在java5之后,同步器提供了acquireInterruptibly(int arg)方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出InterruptException异常。(1.5之前并不会)。
-
超时获取同步状态的过程可以被视作响应中断获取同步状态过程的“增强版”
image
重入锁:ReentrantLock
- “可重入”,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁
- 该锁还支持获取锁时的公平和非公平性选择。“公平”是指“不同的线程获取锁的机制是公平的”,而“不公平”是指“不同的线程获取锁的机制是非公平的”
- 实现重进入:
- 线程再次获取锁,识别获取锁的线程是否为当前占据锁的线程
- 锁的最终释放,重复获取n次,则也需要释放n次
- 非公平锁获取同步状态:
该方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。
- 公平锁获取同步状态:
该方法与nofairTryAcquire(int acquires)比较,唯一不同的位置为判断条件多了hasQueuedPredecessors()方法,即加入同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,表示有线程比当前线程更早地请求获取锁,因为需要等待前驱线程获取并释放锁之后才能继续获取锁。
- 公平锁与非公平锁区别:公平性锁保证了锁的获取按照FIFO原则,而代价是进行大量的线程切换。非公平性锁虽然可能造成线程”饥饿”,但极少的线程切换,保证了其更大的吞吐量
读写锁
- 读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
- 特性:
- 公平性:支持公平性和非公平性。
- 重入性:支持重入,读写锁最多支持65535个递归写入锁和65535个递归读取锁。
- 锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁
读写状态设计
- 在ReentrantLock中使用一个int类型的state来表示同步状态,该值表示锁被一个线程重复获取的次数。但是读写锁ReentrantReadWriteLock内部维护着两个一对锁,需要用一个变量维护多种状态。所以读写锁采用“按位切割使用”的方式来维护这个变量,将其切分为两部分,高16为表示读,低16为表示写。
写锁的获取与释放
- 写锁是一个支持重进入的排他锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程已经获取写锁的线程,则当前线程进入等待状态。
该方法除了重入条件(当前线程为获取了写锁的线程)之外,增加了一个读锁是否存在的判断。如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞
读锁的获取与释放
- 读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已经被其他线程获取,则进入等待状态。
锁降级
- 写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级,锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
LockSupport工具
- LockSupport定义了一组公共的静态方法,这些方法提供了最基本的阻塞或者唤醒的功能,也是构建同步组件的基础工具
Condition接口
- 定义:任意一个Java对象都拥有一组监视器方法(定义在java.lang.Object上),主要包括:wait()、wait(long timeout)、notify()、notifyAll(),这些方法和synchronized关键字结合使用,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock接口配合使用实现等待/通知模式。对比Object的监视器方法和Condition接口,可以更加详细的了解Condition 的特性:
- 获取一个Condition必须要通过Lock的newCondition()方法。该方法定义在接口Lock下面,返回的结果是绑定到此 Lock 实例的新 Condition 实例
- Condition为一个接口,其下仅有一个实现类ConditionObject,由于Condition的操作需要获取相关的锁,而AQS则是同步锁的实现基础,所以ConditionObject则定义为AQS的内部类
等待队列
-
等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入到等待队列并进入等待状态。事实上,同步队列和等待队列中的节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node。
-
一个Condition包含一个等待队列,Condition有首节点和尾节点。当前线程调用Condition.await()时会将当前线程构造节点,并将节点从尾部加入到等待队列。基本结构如下:
- 在Object的监视器模型上,一个对象拥有一个同步队列和一个等待队列;而并发包中的Lock拥有一个同步队列和多个等待队列,其对应关系是:
等待
- 调用Condition的await(),或以await开头的方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法中返回时,一定是获取了Condition相关联的锁。
public final void await() throws InterruptedException {
// 当前线程中断
if (Thread.interrupted())
throw new InterruptedException();
//当前线程加入等待队列
Node node = addConditionWaiter();
//释放锁
long savedState = fullyRelease(node);
int interruptMode = 0;
/**
* 检测此节点的线程是否在同步队上,如果不在,则说明该线程还不具备竞争锁的资格,则继续等待
* 直到检测到此节点在同步队列上
*/
while (!isOnSyncQueue(node)) {
//线程挂起
LockSupport.park(this);
//如果已经中断了,则退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//竞争同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清理下条件队列中的不是在等待条件的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
image
通知
- 调用Condition的signal()方法,将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到CLH同步队列中
public final void signal() {
//检测当前线程是否为拥有锁的独
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//头节点,唤醒条件队列中的第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first); //唤醒
}
image
13个原子操作类
原子更新基本类型
- AtomicBoolean:原子更新布尔类型
- AtomicInteger:原子更新整型
- int addAndGet(int delta) 以原子方式相加并返回结果
- boolean compareAndSet(int expect,int update) 如果输入的数值等于预期值,则以原子方式将该值设置为输入的值
- int getAndIncrement() 以原子方式将当前值加1,返回自增前的值
- void lazySet(int newValue) 将值设置为newValue,但是需要一些时间
- int getAndSet(int newValue) 以原子方式设置为newValue,返回旧值
- AtomicLong:原子更新长整型
原子更新数组
- AtomicIntegerArray:原子更新整型数组里的元素
- int addAndGet(int i,int delta) 以原子方式将索引为i的和delta相加
- boolean compareAndSet(int i,int expect,int update) 获取索引i的值compareAndSet
- AtomicLongArray:原子更新长整型数组里的元素
- AtomicReferenceArray:原子更新引用类型数组的元素
- AtomicBooleanArray :原子更新布尔类型数组的元素
原子更新引用类型
- AtomicReference:原子更新引用类型
- AtomicReferenceFieldUpdater:原子更新引用类型里的字段
- AtomicMarkableReference:原子更新带有标记位的引用类型
原子更新字段类
- AtomicIntegerFieldUpdater:原子更新整型的字段的更新器
- AtomicLongFieldUpdater:原子更新长整型的字段的更新器
- AtomicStampedReference:原子更新带有版本号的引用类型,解决CAS进行原子更新出现的ABA问题
Java 并发工具类
CountDownLatch:等待多线程完成
- CountDownLatch允许一个或多个线程等待其他线程完成操作
- CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,就传入N
- 每次调用CountDownLatch的countDown方法时,N就减1,CountDownLatch的await方法会阻塞当前线程,直到N变成0
CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数
器的值。一个线程调用countDown方法happen-before,另外一个线程调用await方法
CyclicBarrier:同步屏障
- CyclicBarrier的作用是让一组线程到达一个屏障(也可以称之为同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才能继续运行
- CyclicBarrier(int parties)构造函数接收一个int参数用来设置拦截线程的数量,还有一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction)设定第一个到达屏障的线程执行barrierAction
CyclicBarrier和CountDownLatch的区别
- CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置
- getNumberWaiting方法可以获得Cyclic-Barrier阻塞的线程数量
- isBroken()方法用来了解阻塞的线程是否被中断
- 所以CyclicBarrier能处理更为复杂的业务场景
Semaphore
- Semaphore(信号量)用来控制同时访问特定资源的线程数量,通过协调各个线程以保证合理的使用公共资源
- Semaphore(int permits)构造方法接收一个int参数,表示可用的许可证数量
- 每次线程使用Semaphore的acquire()方法获取一个许可证,用完后调用release()方法归还
- 其他方法:
- int availablePermits():返回此信号量中当前可用的许可证数
- int getQueueLength():返回正在等待获取许可证的线程数
- boolean hasQueuedThreads():是否有线程正在等待获取许可证
- void reducePermits(int reduction):减少reduction个许可证,是个protected方法
- Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方法
Exchanger
- 用于线程间交换数据,每两次执行Exchanger的exchange(V data)方法,则交换两次执行的数据并返回,可用于校对工作。
Java并发容器和框架
ConcurrentHashMap
HashMap HashTable ConcurrentHashMap区别
-
HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry列表形成环形数据结构,一旦形成环形数据结构,Entry的next结点永远不为空,产生死循环获取Entry
-
HashTable使用synchronized来保证线程安全,因此效率低下
-
ConcurrentHashMap使用锁分段技术:首先将数据分成一段一段存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问
-
结构:
- ConcurrentHashMap由Segment数组结构和HashEntry数组结构组成,Segment是可重入锁,扮演锁的角色;HashEntry存储键值对数据
- 一个Segment包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改的时候,必须首先获得与它对应的Segment锁
- 初始化
- ConcurrentHashMap的初始化方法是通过initialCapacity,loadFactor,concurrencyLevel等几个参数来初始化segment数组,段偏移量segmentShift,段掩码segmentMask和每个segment里的HashEntry数组来实现。默认情况下concurrencyLevel等于16
- 初始化segment:segment数组的长度ssize是大于或等于concurrentLevel的最小2的N次方值。
- 初始化segmentShift和segmentMask:sshift等于ssize从1向左移位的次数。默认情况下等于1需要向左位移4次,所以sshift=4;segmentShift等于32-sshift,segmentMask=ssize-1;
- 初始化每个segment:同样的initialCapacity,loadFactor.segment里HashEntry数组的长度cap=initialCapacity/ssize x c;c是倍数,如果c>1,取c的2的N次方值。所以cap不是1就是2的N次方。segment的容量threshold = (int)cap * loadFactor;
- 综上默认情况下,initialCapacity=16,loadFactor=0.75,concurrencyLevel=16;对应的ssize=16;sshift=4;segmentShift=28;segmentMask=15;cap=1;threshold=0
- 定位Segment
- 分段锁Segment保护不同段的数据,那么在插入和获取ConcurrentHashMap元素的时候,必须先通过散列算法定位到Segment
- ConcurrentHashMap会对元素的hashcode进行二次hash,以减少hash冲突
- 操作
- get过程不需要加锁,只有值为空值的时候才加锁重读。内部value定义为volatile
- put过程必须加锁,首先定位到Segment,然后在segment进行插入操作。第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位置,然后将其放到HashEntry数组里
- size,先尝试2次不锁住Segment的方式统计各个Segment大小,如果统计过程中count发生了变化,对所有的Segment的put、remove、clean进行加锁再统计
线程安全队列
非阻塞队列:ConcurrentLinkedQueue
- 实现方法:循环CAS法
- 一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。
-
结构:
a.png
transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = newNode(null);
}
-
入队过程:
b.png
- 添加元素1:队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。
- 添加元素2:队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。
- 添加元素3:设置tail节点的next节点为元素3节点。
- 添加元素4:设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点
规则:第一是将入队节点设置成当前队列尾节点的下一个节点。第二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点,理解这一点很重要。
多线程入队规则:如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操作,然后重新获取尾节点
public boolean offer(E e) {
// 如果e为null,则直接抛出NullPointerException异常
checkNotNull(e);
// 创建入队节点
final Node<E> newNode = new Node<E>(e);
// 循环CAS直到入队成功
// 1、根据tail节点定位出尾节点(last node);
// 2、将新节点置为尾节点的下一个节点;
// 3、casTail更新尾节点
for (Node<E> t = tail, p = t;;) {
// p用来表示队列的尾节点,初始情况下等于tail节点
// q是p的next节点
Node<E> q = p.next;
// 判断p是不是尾节点,tail节点不一定是尾节点,判断是不是尾节点的依据是该节点的next是不是null
// 如果p是尾节点
if (q == null) {
// p is last node
// 设置p节点的下一个节点为新节点,设置成功则casNext返回true;
// 否则返回false,说明有其他线程更新过尾节点
if (p.casNext(null, newNode)) {
// 如果p != t,则将入队节点设置成tail节点,
// 更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
}
// 多线程操作时候,由于poll时候会把旧的head变为自引用,然后将head的next设置为新的head
// 所以这里需要重新找新的head,因为新的head后面的节点才是激活的节点
else if (p == q){
p = (t != (t = tail)) ? t : head;
// 寻找尾节点
else
p = (p != t && t != (t = tail)) ? t : q;
}
}
-
出队操作:
出队操作.png
并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点
public E poll() {
restartFromHead:
for (;;) {
// p节点表示首节点,即需要出队的节点
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 如果p节点的元素不为null,则通过CAS来设置p节点引用的元素为null,如果成功则返回p节点的元素
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
// 如果p != h,则更新head
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。
// 那么获取p节点的下一个节点,如果p节点的下一节点为null,则表明队列已经空了
else if ((q = p.next) == null) {
// 更新头结点
updateHead(h, p);
return null;
}
// p == q,则使用新的head重新开始
else if (p == q)
continue restartFromHead;
// 如果下一个元素不为空,则将头节点的下一个节点设置成头节点
else
p = q;
}
}
}
阻塞队列:JDK7 中7个阻塞队列
- 分类:
- ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列
-
四种处理方法:
image
ArrayBlockingQueue
-
ArrayBlockingQueue,一个由数组实现的有界阻塞队列。该队列采用FIFO的原则对元素进行排序添加的
-
ArrayBlockingQueue为有界且固定,其大小在构造时由构造函数来决定,确认之后就不能再改变了。
-
ArrayBlockingQueue支持对等待的生产者线程和使用者线程进行排序的可选公平策略,但是在默认情况下不保证线程公平的访问,在构造时可以选择公平策略(fair = true)。公平性通常会降低吞吐量,但是减少了可变性和避免了“不平衡性”
-
ArrayBlockingQueue内部使用可重入锁ReentrantLock + Condition来完成多线程环境的并发操作
LinkedBlockingQueue
-
LinkedBlockingQueue是一个单向链表实现的阻塞队列。该队列按 FIFO(先进先出)排序元素,新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低
-
LinkedBlockingQueue还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE
-
LinkedBlockingQueue在实现“多线程对竞争资源的互斥访问”时,对于“插入”和“取出(删除)”操作分别使用了不同的锁。对于插入操作,通过“插入锁putLock”进行同步;对于取出操作,通过“取出锁takeLock”进行同步
PriorityBlockingQueue
-
PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序
-
内部使用可重入锁ReentrantLock + Condition来完成多线程环境的并发操作
DelayQueue
-
支持延时获取元素的无界阻塞队列。里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。也就是说只有在延迟期到时才能够从队列中取元素。
-
DelayQueue主要用于两个方面:
- 缓存:清掉缓存中超时的缓存数据
- 任务超时处理
- DelayQueue实现的关键主要有如下几个:
- 可重入锁ReentrantLock
- 用于阻塞和通知的Condition对象
- 根据Delay时间排序的优先级队列:PriorityQueue
- 用于优化阻塞通知的线程元素leader
SynchronousQueue
- SynchronousQueue没有容量,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然
- SynchronousQueue分为公平和非公平,默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为true即可)
- SynchronousQueue非常适合做交换工作,生产者的线程和消费者的线程同步以传递某些信息、事件或者任务
- SynchronousQueue
LinkedTransferQueue
- LinkedTransferQueue是基于链表的FIFO无界阻塞队列,实现了TransferQueue
- 即可以像其他的BlockingQueue一样有容量又可以像SynchronousQueue一样不会锁住整个队列
- LinkedTransferQueue
LinkedBlockingDeque
- LinkedBlockingDeque则是一个由链表组成的双向阻塞队列,双向队列就意味着可以从对头、对尾两端插入和移除元素,同样意味着LinkedBlockingDeque支持FIFO、FILO两种操作方式
- LinkedBlockingDeque是可选容量的,在初始化时可以设置容量防止其过度膨胀,如果不设置,默认容量大小为Integer.MAX_VALUE
- LinkedBlockingDeque底层实现机制与LinkedBlockingQueue一样,依然是通过互斥锁ReentrantLock 来实现,notEmpty 、notFull 两个Condition做协调生产者、消费者问题
- LinkedBlockingDeque
线程池
实现原理
线程池主要处理流程- 处理流程
- 线程池判断核心线程池里面的线程是否都在执行任务,如果核心线程未满,则创建一个新的工作线程来执行任务。如果核心线程已满,则进如下一个流程判断。
- 线程池判断工作队列是否已经满了,如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列已经满了,则进入下个流程。
- 线程池判断线程池的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则提交给饱和策略来处理这个任务。
ThreadPoolExecutor
框架结构.png- ThreadPoolExecutor执行execute方法
- 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(需要获取全局锁)。
- 如果运行的线程等于或者多余corePoolSize,则将任务加入BlockingQueue。
- 如果无法将任务加入BlockingQueue,则创建新的线程来处理任务(需要获取全局锁)。
- 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectExecutionHandler.rejectedExecution方法
线程池使用
线程池创建
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds, runnableTaskQueue, handler);
- corePoolSize(核心线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于核心线程池基本大小时就不再创建。
- runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列
ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue无界
- maximumPoolSize:线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。无界队列没有效果
- ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
- RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常
- AbortPolicy:直接抛出异常。
- CallerRunsPolicy:只用调用者所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
- keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
- TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)
提交任务
- execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
- submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象。
关闭线程池
- 通过调用线程池的shutdown或shutdownNow方法来关闭线程池。
- shutdown:之前提交的任务会被执行(包含正在执行的,工作队列中的),但新任务会被拒绝
- shutdownNow:强制停止所有正在执行的任务
线程池监控
- taskCount:线程池需要执行的任务数量,是个近似值。
- completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount,是个近似值。
- largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
- poolSize:线程池当前的线程数总量,包括活动的线程与闲置的线程。
- activeCount:获取活动的线程数。
Executor框架
两级调度模型
两级调度模型.png- 上层,多线程程序通常把应用分解成若干个任务,然后Executor将任务映射为固定数量的线程,底层,系统内核将线程映射到cpu处理器上。应用程序通过Executor控制上层调度,而下层通过操作系统内核控制。
框架结构
- 任务:包括被执行任务需要实现的接口:Runnable接口或Callable接口
- 任务的执行:包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)
- 异步计算的结果:包括接口Future和实现Future接口的FutureTask类
- 类或接口简介:
- Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来
- ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
- ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
- Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
- Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或Scheduled-ThreadPoolExecutor执行
Executor框架的主要成员
- ThreadPoolExecutor:通常使用工厂类Executors来创建
- FixedThreadPool,适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场
景,它适用于负载比较重的服务器。 - SingleThreadExecutor。适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。
- CachedThreadPool。大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。
- ScheduledThreadPoolExecutor:通常使用工厂类Executors来创建
- ScheduledThreadPoolExecutor:适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景
- SingleThreadScheduledExecutor:适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。
- Future接口
- Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。
- Runnable接口或Callable接口的实现类提交(submit)给ThreadPoolExecutor或ScheduledThreadPoolExecutor时,ThreadPoolExecutor或ScheduledThreadPoolExecutor会向我们返回一个FutureTask对象。
- Runnable接口和Callable接口:
Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。它们之间的区别是Runnable不会返回结果,而Callable可以返回结果
网友评论