美文网首页
笔记:多线程并发编程(2)CAS 、阻塞队列、线程池

笔记:多线程并发编程(2)CAS 、阻塞队列、线程池

作者: 盐海里的鱼 | 来源:发表于2021-03-09 17:35 被阅读0次

CAS (Compare and swap )原子性操作(乐观锁)

  • CAS 是使用现代cpu的cas 指令实现的原子性操作 当多个线程修改同个值时 每个线程都会被赋予一个初始值 当计算完准备写回时会跟 初始值比较如果不等 再次赋值为新值再进行修改操作 直到完成为止。


    cas.png
  • CAS存在什么问题
  1. ABA问题
    CAS需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。
    ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A→B→A就会变成1A→2B→3A。
  2. 开销问题
    自旋需要消耗cpu资源 循环时间越长消耗越大。自旋CAS如果长时间不成功, 会给CPU带来非常大的执行开销。
  3. 只能保证一个共享变量的原子操作
    还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java 1.5开始,JDK提供了AtomicReference类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。
  • 常用的原子性操作类
    Atomic开头的类。
public class UseAtomicInt {
  static AtomicInteger ai = new AtomicInteger(10);
  public static void main(String[] args) {
       System.out.println("等价:i++"+ai.getAndIncrement());
        System.out.println("等价:++i"+ai.incrementAndGet());
        System.out.println("等价 先加24:"+ai.addAndGet(24));
        System.out.println("等价 先给当前值再加24:"+ai.getAndAdd(24));
  }
}


public class UseAtomicReference {
    static AtomicReference<UserInfo> atomicUserRef;
    public static void main(String[] args) {
        UserInfo user = new UserInfo("Mark", 15);//要修改的实体的实例
        atomicUserRef = new AtomicReference(user);
        UserInfo updateUser = new UserInfo("Bill",17);
        atomicUserRef.compareAndSet(user,updateUser);

        System.out.println(atomicUserRef.get());
        System.out.println(user);
    }
    
    //定义一个实体类
    static class UserInfo {
        private volatile String name;
        private int age;
        public UserInfo(String name, int age) {
            this.name = name;
            this.age = age;
        }
        public String getName() {
            return name;
        }
        public int getAge() {
            return age;
        }

        @Override
        public String toString() {
            return "UserInfo{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    '}';
        }
    }

}

阻塞队列

  • 什么是阻塞队列

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。
在队列中插入一个队列元素称为入队,从队列中删除一个队列元素称为出队。因为队列只允许在一端插入,在另一端删除,所以只有最早进入队列的元素才能最先从队列中删除,故队列又称为先进先出(FIFO—first in first out)线性表
1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。生产者和消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

注意:阻塞队列的方法不一定都是阻塞方法
add() .remove() 非阻塞
offer() poll() 非阻塞
take() pull() 阻塞

  • 常用阻塞队列

ArrayBlockingQueue: 一个由数组结构组成的有界阻塞队列。
是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。初始化时有参数可以设置

LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

DelayQueue:一个使用优先级队列实现的无界阻塞队列。
是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

SynchronousQueue:一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。

LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。多了tryTransfer和transfer方法,
(1)transfer方法
如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
(2)tryTransfer方法
tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。

LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。
多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是JDK的bug,使用时还是用带有First和Last后缀的方法更清楚。在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀

以上的阻塞队列都实现了BlockingQueue接口,也都是线程安全的。

线程池

  • 为什么用线程池(线程本身是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性)
    1.降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    2.提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
    3.提高线程的可管理性。
  • ThreadPoolExecutor 的类关系

Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。

ExecutorService接口继承了Executor,在其上做了一些shutdown()、submit()的扩展,可以说是真正的线程池接口;
AbstractExecutorService抽象类实现了ExecutorService接口中的大部分方法;
ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
ScheduledExecutorService接口继承了ExecutorService接口,提供了带"周期执行"功能ExecutorService;
ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。

创建线程池的参数含义:

ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)

corePoolSize:核心线程数
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;
如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

maximumPoolSize:最大并发线程数
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize

keepAliveTime:线程空闲销毁时间限制 unit:时间单位
线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于corePoolSize时才有用

BlockingQueue<Runnable>:任务阻塞队列
workQueue必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能。

ThreadFactory:
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。
Executors静态工厂里默认的threadFactory,线程的命名规则是“pool-数字-thread-数字”。

RejectedExecutionHandler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
(1)AbortPolicy:直接抛出异常,默认策略;
(2)CallerRunsPolicy:用调用者所在的线程来执行任务;
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

  • 线程池的工作机制
    1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
    2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
    3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务。
    4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。


    线程池.png
  • 线程任务提交
    execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
    submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

  • 关闭线程池
    可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态。

  • 合理地配置线程池
    按照任务的类型分为 CPU密集型、IO密集型和CPU密集、IO密集混合型
    最大线程数的配置:
    Ncpu 可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
    IO密集型:Ncpu+1 +1防止出现页缺失不让cpu闲下来
    CPU密集型:2*Ncpu
    CPU密集、IO密集混合型:如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务

相关文章

  • 笔记:多线程并发编程(2)CAS 、阻塞队列、线程池

    CAS (Compare and swap )原子性操作(乐观锁) CAS 是使用现代cpu的cas 指令实现的原...

  • 并发编程之并发队列

    常见的并发队列有2种:阻塞队列和非阻塞队列。阻塞队列使用锁实现,非阻塞队列使用CAS非阻塞算法实现。这2种队列都是...

  • 并发编程-BlockQueue线程容器

    概述 blockQueue 作为线程容器、阻塞队列,多用于生产者、消费者的关系模式中,保障并发编程线程同步,线程池...

  • 线程池

    [TOC] 线程池 1. 并发队列:阻塞队列和非阻塞队列 区别如下: 入队: 非阻塞队列:当队列中满了的时候,放入...

  • 阻塞队列

    BlockingQueue线程池的数据结构是阻塞队列BlockingQueue。(在多线程领域:所谓阻塞,在某些情...

  • 线程

    Java 并发编程:线程池的使用 Java 并发编程:线程池的使用java 多线程核心技术梳理 (附源码) 本文对...

  • Thread

    队列 线程锁 多线程,线程池 队列 多线程爬虫示例 多线程 自定义线程 线程池

  • 常用阻塞队列 BlockingQueue 有哪些?

    为什么要使用阻塞队列 之前,介绍了一下 ThreadPoolExecutor 的各参数的含义(并发编程之线程池Th...

  • Java 阻塞队列原理 之 条件队列学习

    更多并发相关内容,查看==>Java 线程&并发学习目录 阻塞队列BlockingQueue是一种在多线程环境下依...

  • 技术点

    1Java集合主要是hashmap实现原理2.多线程问AQS源码、并发工具类源码、锁的实现原理、阻塞队列源码、线程...

网友评论

      本文标题:笔记:多线程并发编程(2)CAS 、阻塞队列、线程池

      本文链接:https://www.haomeiwen.com/subject/tvcbqltx.html