美文网首页js css htmljava学习之路
JavaGuide知识点整理——并发进阶知识点(下)

JavaGuide知识点整理——并发进阶知识点(下)

作者: 唯有努力不欺人丶 | 来源:发表于2022-07-24 02:03 被阅读0次

线程池

为什么要用线程池?

池化技术想必大家都已经屡见不鲜了,线程池,数据库连接池,http连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗。提高对资源的利用率。

线程池提供了一种限制和管理资源的方式。每个线程池还维护了一些基本统计信息。例如已完成的任务数量。

使用线程池的好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立刻执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源。还会降低系统的稳定性。使用线程池可以进行统一分配,调优和监控。

实现Runnable接口和Callable接口的区别

Runnable是java1.0就有的,而Callable是java1.5才有的。目的就是用来处理Runnable不支持的用例。
Runnable接口不会返回结果或者抛出检查异常。但是Callable可以。所以如果任务不用返回结果或者抛出异常可以用Runnable,这样代码看上去更简洁。
工具类Executors可以将Runnable对象转化成Callable对象(Executors.callable(Runnable task) 或 Executors.callable(Runnable task, Object result))。

执行execute()方法和submit()方法的区别是什么?

  1. execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
  2. submit()方法用于提交需要返回值的任务,线程池会返回一个Future类型的对象。通过这个对象可以判断任务是否执行成功。并且可以通过Future的get方法来获取返回值。get()方法会阻塞到线程完成任务位置。而使用get(long timeout,TimeUnit unit)会阻塞到等待时间立即返回。这时候可能任务没执行完。

如何创建线程池

阿里巴巴java开发手册中强制线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的人更加明确线程池的运行规则,避免资源耗尽的风险。
Executors返回的线程池有两大弊端:

  • FixedThreadPool 和 SingleThreadExecutor :允许请求的队列长度为int最大值,可能堆积大量请求从而oom。
  • CachedThreadPool 和 ScheduledThreadPool :允许创建的最多线程数量是int最大值,可能创建大量线程导致oom。

而通过T和readPoolExecutor创建线程,可以直接设置最大线程数,核心线程数,最大等待数量,拒绝策略等。而且实际上Executors 创建线程,方法内部调用的也是ThreadPoolExecutor的构造方法。


Executors创建线程池
其方法内部调用的ThreadPoolExecutor构造方法

简单说一下工具创建的几个线程池特性:

  • FixedThreadPool : 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor: 方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool: 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

ThreadPoolExecutor类分析

虽然ThreadPoolExecutor提供了多个构造方法,但是其本质就是七个参数。不过有的参数可以默认而已。
我们直接快进到七大参数的讲解:

  • corePoolSize:核心线程数,定义了最小可以同时运行的线程数量。
  • maximumPoolSize:最大线程数,核心线程都在使用且任务队列满了,则会增加新线程,这个参数就是当前线程池可以同时运行的最大线程数量。
  • workQueue:任务队列。当新任务来了以后如果核心线程都被使用,则会将任务加到队列里。
  • keepAliveTime:线程池中线程数大于核心线程数且没有新任务,核心线程之外的线程等到时间超过了keepAliveTime才会被回收销毁。
  • unit:keepAliveTime的时间单位
  • threadFactory:executor创建新线程的时候用的线程工厂。
  • handler:拒绝策略。当达到最大线程数且任务队列满了,新来的任务的处理方式。

拒绝策略有如下四种:

  • AbortPolicy:抛出异常拒绝新任务(默认的)。
  • CallerRunsPolicy:让调用的线程执行任务。如果执行程序关闭则丢弃任务。但是这种策略会降低新任务的提交速度,影响整体性能,如果程序可以承受这个延迟且一个任务都不能丢,可以选择这个策略。
  • DiscardPolicy:不处理新任务,直接丢掉。也不报错。
  • DiscardOldestPolicy:丢掉队首的任务。将新任务加进去。

线程池原理分析

其实想要搞懂线程池的原理,首先要分析execute方法。源码如下:


execute源码

workerCountOf(c)这个方法我们可以理解成当前线程池中执行的任务数量。如果小于核心线程数,那么会直接addWorker新建一个线程并且执行这个任务。

如果当前执行的任务数量大于核心线程的时候,如果当前线程池状态是RUNNING,并且任务队列可以添加任务,则把任务放进队列。如果任务队列不可以添加元素了,直接尝试新建一个线程,新建失败的话直接走拒绝策略。
需要注意的是在往队列添加任务的那一步,里面还有个判断:再次获取线程状态是不是RUNNING则会尝试从任务队列移除任务并且走拒绝策略。
如果当前线程池为空则新创建一个线程。

下面是一个上述代码的流程图:


execute方法流程

Atomic原子类

Atomic简介

Atomic翻译成中文是原子的意思。在化学上,原子是构成一般物质的最小单位。在化学反应中是不可分割的。在java里的Atomic是指一个操作是不可中断的。即使在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。
所以所谓原子类简单来说就是具有原子/原子操作特征的类。

JUC的原子类都存放在java.util.concurrent.atomic下。


JUC的atomic包下类

JUC包中原子类是哪四类

  • 基本类型
    使用原子的方式更新基本类型
    • AtomicInteger:整型原子类
    • AtomicLong:长整型原子类
    • AtomicBoolean:布尔型原子类
  • 数组类型
    使用原子的方式更新数组里的某个元素
    • AtomicIntegerArray:整型数组原子类
    • AtomicLongArray:长整型数组原子类
    • AtomicReferenceArray:引用类型数组原子类
  • 引用类型
    • AtomicReference:引用类型原子类
    • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决CAS进行原子更新时出现的ABA问题。
    • AtomicMarkableReference:原子更新带有标记位的引用类型
  • 对象的属性修改类型
    • AtomicIntegerFieldUpdater:原子更新整型字段的更新器
    • AtomicLongFieldUpdater:原子更新长整型字段的更新器
    • AtomicReferenceFieldUpdater:原子更新引用类型字段的更新器

讲讲AtomicInteger的使用

其实它的方法不算多,源码也才三百多行。大概主要的有下面几个方法:

public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

简单介绍一下AtomicInteger类的原理

AtomicInteger线程安全的原理简单分析,我们看下AtomicInteger类的部分代码:


AtomicInteger源码
AtomicInteger源码

首先我们可以看出AtomicInteger大量利用了CAS+volatile+native方法来保证原子操作,从而避免synchronized的高开销,提高执行效率。
CAS的原理是拿期望值和原本的值作比较,如果相同则更新成新的值。UnSafe类中objectFieldOffset方法是一个本地方法,用来拿到原本的值的内存地址。返回值是valueOffset。另外value是volatile变量,在内存中可见。因此JVM可以保证任何时刻拿到的都是这个变量的最新值。

AQS

AQS介绍

AQS全称是AbstractQueueSynchronizer。这个是也是JUC包下,locks包下的一个类。


AQS类

它是一个用来构建锁和同步器的框架,使用AQS能简单且高效的构造出大量应用广泛的同步器。比如我们之前说的ReentrantLock,Semaphore,其他如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆基于AQS的。当然我们也可以利用AQS自己构建同步器。

AQS原理分析

AQS核心思想是:如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程。并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制。这个机制AQS是用CLH队列锁实现的,即暂时将获取不到锁的线程加入到队列中。

CLH队列是一个虚拟的双向队列(即不存在队列实例,仅存在节点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁的节点来实现锁分配。

下面是AQS原理图:


image.png

AQS使用一个int成员变量来表示同步状态,并且该变量用volatile修饰了。通过内置的先入先出队列来完成线程的排队工作。AQS使用CAS对同步状态进行原子操作实现对其值的修改。

//返回同步状态的当前值
protected final int getState() {
    return state;
}
//设置同步状态的值
protected final void setState(int newState) {
    state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS对资源的共享方式

AQS定义两种资源共享方式

  • Exclusive(独占):只能一个线程执行,如ReentrantLock。又可分为公平锁和非公平锁

    • 公平锁:按照线程在队列中的排队顺序,先到先得
    • 非公平锁:多个线程获取锁时,每一次都是所有线程去抢,谁抢到是谁的。
  • Share(共享):多个线程可以同时执行,比如CountDownLatch,Semaphore,CyclicBarrier,ReadWriteLock等。

需要注意的是ReentrantReadWriteLock可以看成是组合锁。因为允许多个线程读,一个线程写。

不同的自定义同步器争用共享资源的方式也不同。自定义的同步器在实现的时候只要实现共享资源state的获取和释放方式即可,具体的线程等待队列的维护AQS在顶端已经实现好了。

AQS底层使用了模板方法模式

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式的一个很经典应用):

  1. 使用者继承AbstractQueuedSynchronizer 并重写指定的方法(对共享资源state的获取和释放)。
  2. 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。

AQS使用了模板方法模式,自定义同步器的时候需要重写下面几个AQS提供的钩子方法:

protected boolean tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected int tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected boolean tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。

什么是钩子方法:钩子方法是一种被声明在抽象类中的方法,他可以是空方法,也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。

除了上面的五个钩子方法,AQS类中的其他方法都是final的,无法被其他类重写。
以ReentrantLock为例,state初始化是0,表示未锁定状态。A线程lock()时会调用tryAcquire()独占该锁并将state+1,此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其他线程才有机会获取锁,当然A线程自己可以重复获取锁,每次state都会加1.这就是可重入的概念。但是要注意获取多少次就要释放多少次,这样才能让state=0.

再说CountDownLatch,任务分为N个子线程去执行,state初始化是N。这n个子线程是并行执行的,每个线程执行完countDown()一次,state会cas-1.等到state变成0后,会unpark()主调用线程,然后主调用线程会从await()函数返回,继续往后执行。


CountDownLatch代码测试

看上面的代码,正常main方法里开的线程睡了一秒,而我们后来的 打印语句一开始没有执行,都是等线程执行完了才执行的。这就说明主调用线程调用await()会阻塞,一直到state为1,如果我图中for循环只有4次,那么主线程会一直阻塞。有兴趣的可以自己试试。

一般来说自定义同步器要么独占方式,要么共享方式,他们也只需要tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种,但是AQS也支持两种方式都实现,比如ReentrantReadWriteLock。

AQS组件总结

  • Semaphore(信号灯):允许多个线程同时访问。synchronized和ReentrantLock都是一次只允许一个线程访问某资源,而Semaphore可以多个线程同时访问某资源。大概意思有点类似停车场。车位数固定。最多可同时停车数固定,但是有车出去了就多空位,可以多个车进来。
  • CountDownLatch:倒计时器。这是一个同步工具,协调多个线程之间的同步。类似下班锁门,要等所有员工都出去才会触发锁门这个行为。这个其实阻塞的是主线程。也就是要锁门的那一个人。
  • CyclicBarrier:这个和CountDownLatch类似,但是CountDownLatch是倒计时,这个是正面累加。打个比方比如众筹抽奖,奖池达到多少钱会触发开奖。先投完钱的在那等着,知道奖池满了触发抽奖。这个触发阻塞的是参与的那些人。
    CyclicBarrier使用demo

什么场景适合用CountDownLatch?

CountDownLatch的作用是允许count个线程阻塞在一个地方,直到所有的线程任务都执行完毕。之前在项目中有一个多线程获取数据库数据的场景,用到了CountDownLatch.
任务情况是一个导出分为多个sheet页。分别订单列表,汇总列表,发货列表,对账列表等按月统一导出。每一个列表之间没有依赖关系。所以为了效率用了多线程分别写入。定义了一个count是6的CountDownLatch,每一个sheet页对应一个子任务。所有sheet页都写入完成会触发下面的逻辑。

本篇笔记就记到这里,这里面说的用的CountDownLatch场景是真的在项目中用过。如果上面的笔记稍微帮到你了记得点个喜欢点个关注。也祝大家工作顺顺利利,生活健健康康!

相关文章

网友评论

    本文标题:JavaGuide知识点整理——并发进阶知识点(下)

    本文链接:https://www.haomeiwen.com/subject/oaivirtx.html