美文网首页
多线程-线程池

多线程-线程池

作者: 麦大大吃不胖 | 来源:发表于2020-12-05 19:08 被阅读0次

    by shihang.mai

    1. 线程池的种类

    实际上只有两种,一种是ThreadPoolExecutor,另一种是ForkJoinPool

    Executors.newCachedThreadPool()

    new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
    

    核心线程 = 0
    线程最大数 = Integer.MAX_VALUE
    过期时间 = 60s
    阻塞队列 = SynchronousQueue
    线程工厂 = Executors.defaultThreadFactory()
    拒绝策略 = AbortPolicy

    Executors.newFixedThreadPool(n)

    new ThreadPoolExecutor(n, n,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    

    核心线程 = n
    线程最大数 = n
    过期时间 = 0s
    阻塞队列 = LinkedBlockingQueue
    线程工厂 = Executors.defaultThreadFactory()
    拒绝策略 = AbortPolicy

    Executors.newScheduledThreadPool(n)

    new ThreadPoolExecutor(n, Integer.MAX_VALUE,0L, NANOSECONDS,new DelayedWorkQueue());
    

    核心线程 = n
    线程最大数 = Integer.MAX_VALUE
    过期时间 = 0s
    阻塞队列 = DelayedWorkQueue
    线程工厂 = Executors.defaultThreadFactory()
    拒绝策略 = AbortPolicy

    Executors.newWorkStealingPool()

    new ForkJoinPool
      (Runtime.getRuntime().availableProcessors(),
       ForkJoinPool.defaultForkJoinWorkerThreadFactory,
       null, true)
    

    线程数为CPU核心数, ForkJoinPool

    Executors.newSingleThreadExecutor()

    new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())
    

    核心线程 = 1
    线程最大数 = 1
    过期时间 = 0s
    阻塞队列 = LinkedBlockingQueue
    线程工厂 = Executors.defaultThreadFactory()
    拒绝策略 = AbortPolicy

    Executors.newSingleThreadScheduledExecutor()

    new ThreadPoolExecutor(1, Integer.MAX_VALUE,0L, NANOSECONDS,new DelayedWorkQueue());
    

    核心线程 = 1
    线程最大数 = Integer.MAX_VALUE
    过期时间 = 0s
    阻塞队列 = DelayedWorkQueue
    线程工厂 = Executors.defaultThreadFactory()
    拒绝策略 = AbortPolicy

    线程池种类

    2. ThreadPoolExecutor

    创建线程池:

    ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4,
                    60, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(4),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.CallerRunsPolicy());
    
    属性 含义
    int corePoolSize 核心线程
    int maximumPoolSize 最大线程
    long keepAliveTime 当线程超过时间长期不干活,归还操作系统。(有参数设定核心线程是否参与)
    TimeUnit unit 时间单位
    BlockingQueue<Runnable> workQueue 任务队列
    ThreadFactory threadFactory 生产线程的工厂
    RejectedExecutionHandler handler 拒绝策略

    线程池与任务关系:

    ThreadPoolExecutor任务与线程关系
    1. 首先线程数为空,当来2个任务时,那么开启2个线程去处理
    2. 再来4个任务,直接进入到阻塞队列中
    3. 当再来2个任务时,再开2个线程去处理这两个新的任务
    4. 当再来任务时,执行拒绝策略。拒绝策略可以自定义,Jdk默认提供4种
      • Abort.扔掉,抛异常
      • Discard.扔掉,不抛异常
      • DiscardOld.扔掉排队时间最长的
      • CallerRuns.调用者处理任务

    3. ForkJoinPool

    原理:

    采用的是work stealing算法

    work stealing
    1. ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
    2. 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行
    3. 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
    4. 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
    5. 在既没有自己的任务,也没有可以窃取的任务时,进入休眠

    用途:

    • 分解汇总的任务
    • 用很少的线程可以执行很多的任务
    • CPU密集型
    ExecutorService service = Executors.newWorkStealingPool();
    
    //没返回值的任务继承RecursiveAction
    class AddTask extends RecursiveAction
    //有返回值任务继承
    class AddTaskRet extends RecursiveTask    
    new ForkJoinPool().execute(Task)  
    
    ForkJionPool

    4. 队列的种类

    队列可分为阻塞和非阻塞队列,也可分为有界、无界队列、同步移交。有界是指,队列放入的元素个数有限。无界是指,队列放入的元素没个数限制,只限制于物理设备

    按有界无界分

    无界队列:ConcurrentLinkedQueue、PriorityBlockingQueue、DelayQueue、LinkedTransferQueue

    有界队列:ArrayBlockingQueue、LinkedBlockingQueue(默认Integer.MAX_VALUE近似无限,但它构造时可传入值变为有界)

    按阻塞非阻塞分

    阻塞队列:LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue、DelayQueue、LinkedTransferQueue

    非阻塞队列:ConcurrentLinkedQueue

    4.1 ConcurrentLinkedQueue

    底层是单向链表,每个节点是一个Node。

    PS:当调试时,必须关闭idea的配置,因为idea默认开启了toString预览特性,在debug模式下,ConcurrentLinkedQueue的对象也会被调用toString方法的,在队列的toString方法中会获取队列的迭代器,而创建迭代器时会调用first方法,first方法里就会cas修改head属性

    Enable 'toString()' object view
    Enable alternative view for Collections classes
    

    下面观察元素入队过程


    ConcurrentLinkedQueue入队.png

    观察上面的加入元素的快照图,tail并不是都指向最后一个节点,是先经历tail.next赋值为加入元素,下一次tail节点才指向尾节点
    源码中的offer()其实只是做了2步

    1. 定位尾节点
    2. 使用CAS将入队节点设置成尾节点的next节点,如不成功则重试

    下面观察出队流程


    ConcurrentLinkedQueue出队.png

    入队/出队head、tail不是一致指向头和尾节点的目的:减少cas的次数,提高入队出队的效率

    4.2 LinkedBlockingQueue

    底层是单向链表,每个节点是一个Node。并且使用了两个ReentrantLock,分别代表用于入队和出队的锁,并且用了两个Condition,用于挂起和唤醒线程

    下面观察入队操作


    LinkedBlockingQueue入队.png

    在元素入队的时候,需要先获取put lock(Reentranlock)

    • 队列已满,利用Condition阻塞等待
    • 队列未满,创建一个node节点放入队列中,如果放完以后队列还有剩余空间,继续唤醒下一个添加线程进行添加。如果放之前队列中没元素,放完以后要唤醒消费线程进行消费

    下面观察出队操作


    LinkedBlockingQueue出队.png

    在元素出队时,需要先获取take lock(Reentranlock)

    • 队列为空,阻塞等待。
    • 队列不为空,从队首获取并移除一个元素,如果消费后还有元素在队列中,继续唤醒下一个消费线程进行元素移除。如果放之前队列是满元素的情况,移除完后要唤醒生产线程进行添加元素

    4.3 ArrayBlockingQueue

    底层是一个环形数组,使用单个ReentrantLock,两个Conditon队列,用在出队、入队操作上

    利用常用的数组实现队列,那么我们维护一个尾指针,即可对入队达到O(1),但是对于出队操作,都均要移动元素,达到了O(n),我们可以用环形逻辑解决,出入队都是O(1)

    ArrayBlockingQueue.png

    我们观察一下入队操作


    ArrayBlockingQueue入队.png

    在元素入队时,需要获取lock(Reentranlock)

    • 将加入的元素放到putIndex位置
    • putIndex+1,当到了环形的尽头时,重新置为0
    • 唤醒在Condition等待获取元素的线程

    我们观察一下出队操作


    ArrayBlockingQueue出队.png

    在元素出队时,需要获取lock(Reentranlock)

    • 当队列中没元素,直接返回null,不操作takeIndex了
    • 通过takeIndex获取到该位置的元素,并把该位置置为null
    • takeIndex+1,到达列表长度时设置为0
    • 唤醒在Condition等待元素放入队列的线程

    4.4 PriorityBlockingQueue

    底层是用数组+二叉堆维护元素的优先级,并且使用了一个ReentrantLock,只有一个Conditon,因为它无界的,可以无限向里面加入元素,但是获取时,当没元素就会被阻塞,所以只有一个Condition

    1. 初始化时,默认队列容量11,比较器为null,即使用元素的compareTo,即队列的元素必须实现Comparable接口
    2. 当元素入队时,需要先使用ReentranLock上锁
    • 如果当前元素个数>=队列容量,则扩容
    • 建堆排序元素
    • 元素数量+1
    • 唤醒Conditon中的等待的线程
    1. 当触发扩容时
    • 当capacity<64时,扩容至2capacity+2
    • 当capacity>64时,扩容至capacity(1+50%)
    • 最大扩容至Integer.MAX_VALUE - 8
    • 扩容后,将原本的元素复制到新数组中
      当需要扩容时,让一个线程去做扩容操作,其他线程自旋等待
    1. 建堆排序,最终结果会把最高或者最低优先级放到根顶位置
    /**
    * k为当前元素数量
    * x为加入的元素
    * array是数组
    */
    private static <T> void siftUpComparable(int k, T x, Object[] array) {
            Comparable<? super T> key = (Comparable<? super T>) x;
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                Object e = array[parent];
                if (key.compareTo((T) e) >= 0)
                    break;
                array[k] = e;
                k = parent;
            }
            array[k] = key;
        }
    

    假如初始容量为2,按顺序加入B、C、D、A,会经历如下


    PriorityBlockingQueue建堆过程.png
    1. 对于出队操作,操作前使用ReentranLock上锁,也会涉及重建堆过程
    private E dequeue() {
            int n = size - 1;
            if (n < 0)
                return null;
            else {
                Object[] array = queue;
                E result = (E) array[0];
                E x = (E) array[n];
                array[n] = null;
                Comparator<? super E> cmp = comparator;
                if (cmp == null)
                    siftDownComparable(0, x, array, n);
                else
                    siftDownUsingComparator(0, x, array, n, cmp);
                size = n;
                return result;
            }
        }
    
    /**
    * k=0
    * x为数组中的尾元素
    * array是数组
    * n=size-1
    */
    private static <T> void siftDownComparable(int k, T x, Object[] array,
                                                   int n) {
            if (n > 0) {
                Comparable<? super T> key = (Comparable<? super T>)x;
                int half = n >>> 1;           // loop while a non-leaf
                while (k < half) {
                    int child = (k << 1) + 1; // assume left child is least
                    Object c = array[child];
                    int right = child + 1;
                    if (right < n &&
                        ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                        c = array[child = right];
                    if (key.compareTo((T) c) <= 0)
                        break;
                    array[k] = c;
                    k = child;
                }
                array[k] = key;
            }
        }
    

    续上面的图,会经历如下


    PriorityBlockingQueue重建堆过程.png

    4.5 DelayQueue

    底层用PriorityBlockingQueue,放入此队列的元素必须实现Delayed,而接口Delayed extends Comparable接口,即任务具备过期时间+排序能力。一样使用ReentrantLock和Condition

    1. 对于入队操作,操作前使用ReentranLock上锁,向PriorityBlockingQueue加入元素,并建堆
    2. 对于出队操作,操作前使用ReentranLock上锁,在PriorityBlockingQueue获取根元素即可,但是要判断失效时间是否为0
    • 当=0,取出元素
    • 当!=0, 这里有一个类型为Thread leader标志位,看是否有线程在取值,如果为null,证明没线程取值,直接等待delay时间后获取即可;如!=null,直接阻塞当前线程

    4.6 LinkedTransferQueue

    底层是单链表+松弛阈值

    • 与SynchronousQueue相比,LinkedTransferQueue多了一个可以存储的队列
    • 与LinkedBlockingQueue相比,LinkedTransferQueue多了直接传递元素,少了用锁来同步
    LinkedTransferQueue入队出队.png
    1. 所有的入队出队方法都调用xfer(E e, boolean haveData, int how, long nanos)
    • e:如果是入队操作,那么就是实际入队的值,如果是出队操作,为null
    • haveData:如果是入队操作,true,如果是出队操作,false
    • how:执行类型,有立即返回的NOW,有异步的ASYNC,有阻塞的SYNC, 有带超时的 TIMED
    • nanos:只有在执行类型TIMED才有作用
    1. 节点分为数据节点和请求节点
    2. 当线程A、B、C都调用offer,它们分别携带值1、2、3,它们通过CAS形成链表1->2->3,并且线程都阻塞
    3. 当线程D调用take,那么就会从第一个节点开始匹配,匹配到值1,获取1并设置原节点为null,并唤醒A线程,A线程唤醒后,将null替换为this
    4. 当线程E调用take,那么还是从第一个节点开始匹配,因为第一个节点已经匹配过,找第二个节点,获取2并设置原节点为null,并唤醒B线程,第一个节点的next指向自身变为垃圾,等到GC回收。如此往复,但是不是每一个匹配过的节点都会将next指向自身变为垃圾的,具体要看松弛阈值
    5. 对于transfer操作,将指定元素e传递给消费者线程,如果有消费者线程正在阻塞等待,则调用transfer方法的线程会直接将元素传递给它;如果没有消费者线程等待获取元素,则调用transfer方法的线程会将元素插入到队尾,然后阻塞等待,直到出现一个消费者线程获取元素.
    6. 当生产者线程调用tryTransfer方法时,如果没有消费者等待接收元素,则会立即返回false。该方法和transfer方法的区别就是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法必须等到消费者消费后才返回。
    7. 对于take操作,会从队首取出一个元素,如果队列为空,则线程会阻塞

    为了节省 CAS 操作的开销,LinkedTransferQueue使用了松弛(slack)操作:
    在结点被匹配(被删除)之后,不会立即更新队列的head、tail,而是当 head、tail结点与最近一个未匹配的结点之间的距离超过“松弛阀值”后才会更新(默认为 2)。这个“松弛阀值”一般为1到3,如果太大会增加沿链表查找未匹配结点的时间,太小会增加 CAS 的开销

    4.7 LinkedTransferQueue和SynchronousQueue区别

    SynchronousQueue:线程A使用put将数据添加到队列,如果没有其他线程使用take去获取数据,那么线程A阻塞,直到数据被其他线程获取,同理 如果线程B从队列中获取数据为空,被阻塞,等待线程添加数据。即握手传递数据

    LinkedTransferQueue:LinkedTransferQueue使用put,tryTransfer和transfer可添加多条数据, LinkedTransferQueue具有SynchronousQueue的功能,而且LinkedTransferQueue比SynchronousQueue灵活,可选择put和tryTransfer进行非阻塞操作,也可以用transfer进行阻塞操作。

    • put就是用ASYNC方式执行,不阻塞,一直自旋.
    • transfer用SYNC方式执行,会阻塞,直到有消费线程后唤醒.
    • tryTransfer用NOW方式执行,直接检测是否有消费线程,有直接递交数据,没直接返回
    public void put(E e) {
       xfer(e, true, ASYNC, 0);
    }
    
    public void transfer(E e) throws InterruptedException {
       if (xfer(e, true, SYNC, 0) != null) {
           Thread.interrupted(); // failure possible only due to interrupt
           throw new InterruptedException();
       }
    }
    
    public boolean tryTransfer(E e) {
       return xfer(e, true, NOW, 0) == null;
    }
    

    5. 合理的线程数

    1. N * U * (1+ W/C)
    2. 压测

    N-CPU核数 U-CPU使用率,取值范围0-1 W/C-等待时间与计算时间比值

    参考

    https://blog.csdn.net/qq_38293564/article/details/80798310

    https://benjaminwhx.com

    https://www.cnblogs.com/yuexiaoyun/p/12203101.html

    https://www.cnblogs.com/myseries/p/10944211.html

    相关文章

      网友评论

          本文标题:多线程-线程池

          本文链接:https://www.haomeiwen.com/subject/zqhziktx.html