美文网首页netty
Netty中的任务队列(添加元素篇)

Netty中的任务队列(添加元素篇)

作者: 书唐瑞 | 来源:发表于2021-01-06 22:08 被阅读0次

    此篇文章讲解一下Netty中的任务队列.这里说的任务队列是Netty中的IO线程对应的任务队列.

    在Netty中NioEventLoopGroup这个类相当于线程池,而由它创建的每个NioEventLoop相当于池中的线程,因为每个NioEventLoop都是和唯一的一个线程绑定的,而这个线程只负责IO相关的工作,因此称作IO线程.

    在创建NioEventLoop的时候会创建一个与之关联的任务队列(Queue taskQueue).这个任务队列用于'装载'其他非IO线程向IO线程提交的任务,比如业务线程(即非IO线程)需要向对端写数据,那么业务线程会把写数据这个操作封装成一个任务'丢到'任务队列中,由IO线程将数据写到网络中.

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        ...
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            // 业务线程将写操作封装成Task
            final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
            // 提交任务到IO线程对应的任务队列中
            if (!safeExecute(executor, task, promise, m, !flush)) {
                task.cancel();
            }
        }
    }
    
    private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg, boolean lazy) {
        // 将任务提交到IO线程的任务队列中
        executor.execute(runnable);    
    }
    

    首先看下这个taskQueue是由谁实现的

    // 实例化NioEventLoop
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                     EventLoopTaskQueueFactory queueFactory) {
                   
        // newTaskQueue(queueFactory)会实现具体的任务队列
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }
    
    private static Queue<Runnable> newTaskQueue(EventLoopTaskQueueFactory queueFactory
        if (queueFactory == null) {
            // 流程会走到这里
            return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
        }
        return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
    }
    
    private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
        // 流程会继续调用 PlatformDependent.<Runnable>newMpscQueue()
        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
    }
    
    public static <T> Queue<T> newMpscQueue() {
        return Mpsc.newMpscQueue();
    }
    
    static <T> Queue<T> newMpscQueue() {
        // 流程会执行第一个new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE)
        // MPSC_CHUNK_SIZE=1024
        return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE)
            : new MpscUnboundedAtomicArrayQueue<T>(MPSC_CHUNK_SIZE);
    }
    

    上面说了这么多,只是想说明taskQueue具体的实现

    Queue<Runnable> taskQueue = new MpscUnboundedArrayQueue<T>(1024)
    

    接下来具体分析下MpscUnboundedArrayQueue

    org.jctools.queues.MpscUnboundedArrayQueue
    

    在Netty之前的版本中,taskQueue是Netty自身实现它的.但是后面版本就将这个taskQueue的实现'交由'JCTools下的类来实现了.

    <dependency>
        <groupId>org.jctools</groupId>
        <artifactId>jctools-core</artifactId>
        <version>3.1.0</version>
    </dependency>
    

    在Netty中,多个Netty客户端连接Netty服务端的时候,Netty服务端中的一个IO线程会负责处理多个客户端.

    在这里插入图片描述

    如上图所示,IO线程-1负责处理Netty客户端-1和Netty客户端-2的读写请求.当多个业务线程需要向对端写数据的时候,会把写操作封装成任务'丢到'IO线程-1的任务队列中.

    Netty中的线程有个特别的地方,就是一个IO线程会对应多个业务线程,业务线程就是生产者,IO线程就是消费者,它消费业务线程'生产'的任务.属于单消费者多生产者模式.通过类的名称MpscUnboundedArrayQueue可以看出来,这个类就是为多生产者(MultiProducer)单消费者(SingleConsumer)设计的.

    MpscUnboundedArrayQueue的底层使用数组的形式存储元素.

    // MpscUnboundedArrayQueue继承BaseMpscLinkedArrayQueue
    public BaseMpscLinkedArrayQueue(final int initialCapacity) {
        // 转成2^n
        int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
        long mask = (p2capacity - 1) << 1;
        // 存储元素的底层数组
        E[] buffer = allocateRefArray(p2capacity + 1);
        producerBuffer = buffer;
        producerMask = mask;
        consumerBuffer = buffer;
        consumerMask = mask;
        soProducerLimit(mask);
    }
    

    从全局的角度看下MpscUnboundedArrayQueue的底层结构,这里假设initialCapacity=4


    在这里插入图片描述

    虽然设置的初始容量大小=4,但是当存放的元素大于4的时候,就会新创建一个与之前同等大小的数组,然后'挂接'到之前的数组. 当再次'装载'不了新放入的元素时候,会再次新创建一个数组'挂接'到之前的数组,以此类推.最后形成一个数组+链表的结构.

    结合源码分析下.以下假设初始容量大小initialCapacity=4

    // MpscUnboundedArrayQueue继承BaseMpscLinkedArrayQueue
    public BaseMpscLinkedArrayQueue(final int initialCapacity) {
        // p2capacity  = 4
        int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
        // 转成二进制mask=110
        long mask = (p2capacity - 1) << 1;
        // 存储元素的底层数组大小=4+1=5
        E[] buffer = allocateRefArray(p2capacity + 1);
        // 指向生产者的数组
        producerBuffer = buffer;
        producerMask = mask;
        // 指向消费者的数组
        consumerBuffer = buffer;
        consumerMask = mask;
        // producerLimit=mask=110
        soProducerLimit(mask);
    }
    

    构造方法中,虽然设置的初始容量=4,但是在初始化底层数组的时候,分配的大小=5.从上面的图中可以看出,上一个数组为了指向下一个数组,因此数组在设计的时候就必须要多出来一个元素,用于指向下一个数组.

    当提交元素的时候,代码如下所示,调用offer方法

    MpscUnboundedArrayQueue<Integer> queue = new MpscUnboundedArrayQueue<>(4);
    queue.offer(1);
    
    public boolean offer(final E e) {
        long mask;
        E[] buffer;
        long pIndex;
    
        while (true) {
            long producerLimit = lvProducerLimit();
            pIndex = lvProducerIndex();
            // 表示正在扩容
            if ((pIndex & 1) == 1) {
                continue;
            }
            mask = this.producerMask;
            buffer = this.producerBuffer;
            // 如果提交的元素即将超过容量
            if (producerLimit <= pIndex) {
                int result = offerSlowPath(mask, pIndex, producerLimit);
                switch (result) {
                    case CONTINUE_TO_P_INDEX_CAS:
                        break;
                    case RETRY:
                        continue;
                    case QUEUE_FULL:
                        return false;
                    case QUEUE_RESIZE:
                        // 扩容
                        resize(mask, buffer, pIndex, e, null);
                        return true;
                }
            }
    
            // +2
            if (casProducerIndex(pIndex, pIndex + 2)) {
                break;
            }
        }
        // 计算元素在数组中的偏移地址
        final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
        soRefElement(buffer, offset, e);
        return true;
    }
    

    首先要明确一点的是,producerIndex(即代码中的pIndex)记录生产者添加元素指向的位置,而且这个位置并不是在数组中的实际下标.

    每添加一个元素,producerIndex就会+2.并不是+1.

    通过构造方法初始化时,producerLimit=110.
    当添加第一个元素之后,pIndex=010
    当添加第二个元素之后,pIndex=100
    当添加第三个元素之后,pIndex=110

    根据上面第16行代码producerLimit <= pIndex满足条件.进入下面的代码

    private int offerSlowPath(long mask, long pIndex, long producerLimit) {
        final long cIndex = lvConsumerIndex();
        long bufferCapacity = getCurrentBufferCapacity(mask);
    
        if (cIndex + bufferCapacity > pIndex) {
            if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) {
                return RETRY;
            } else {
                return CONTINUE_TO_P_INDEX_CAS;
            }
        } else if (availableInQueue(pIndex, cIndex) <= 0) {
            return QUEUE_FULL;
    
        }
        // pIndex + 1
        else if (casProducerIndex(pIndex, pIndex + 1)) {
            // 需要扩容
            return QUEUE_RESIZE;
        } else {
            return RETRY;
        }
    }
    

    能走到上面的代码,说明此时容器马上满了,需要扩容了,会将pIndex+1.之后就会进入到扩容逻辑.

    resize(mask, buffer, pIndex, e, null);
    

    之前的pIndex=110,加1之后,变成pIndex=111.这个时候,其他生产者线程根据(pIndex & 1) == 1判断成立,说明有一个生产者线程正在扩容容器,当前生产者线程需要重试.

    也就是说根据最后一个字节,控制是否有生产者线程正在扩容.

    public boolean offer(final E e) {
        long mask;
        E[] buffer;
        long pIndex;
    
        while (true) {
            long producerLimit = lvProducerLimit();
            pIndex = lvProducerIndex();
            // 表示有其他线程正在扩容
            if ((pIndex & 1) == 1) {
                // 重试
                continue;
            }
            ...
        }
        ...
    }
    

    扩容的线程会重新创建一个新的数组

    private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s) {
        int newBufferLength = getNextBufferSize(oldBuffer);
        final E[] newBuffer;
        try {
            // 创建新数组
            newBuffer = allocateRefArray(newBufferLength);
        } catch (OutOfMemoryError oom) {
            assert lvProducerIndex() == pIndex + 1;
            soProducerIndex(pIndex);
            throw oom;
        }
    
        producerBuffer = newBuffer;
        final int newMask = (newBufferLength - 2) << 1;
        producerMask = newMask;
    
        final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
        final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);
    
        soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);
        // 新老数组进行连接
        soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);
    
        final long cIndex = lvConsumerIndex();
        final long availableInQueue = availableInQueue(pIndex, cIndex);
    
        soProducerLimit(pIndex + Math.min(newMask, availableInQueue));
    
        // +2之后,保证其他生产者线程可以继续增加元素了
        soProducerIndex(pIndex + 2);
    
        // 添加一个JUMP元素
        soRefElement(oldBuffer, offsetInOld, JUMP);
    }
    

    在扩容的时候,会添加一个JUMP元素,这个元素是用来告诉消费者,当消费到这类元素的时候,需要跳到下一个数组继续消费.

    假设向容器中依次添加1-9这9个元素,它的结构如下
    消费者也会按照1-9进行消费.(即添加顺序和消费顺序一致)

    在这里插入图片描述

    在向容器中添加元素的时候,采用如下方式. 根据起始地址+偏移地址,提高添加元素的速度.

    static long modifiedCalcCircularRefElementOffset(long index, long mask) {
        return REF_ARRAY_BASE + ((index & mask) << (REF_ELEMENT_SHIFT - 1));
    }
    
    
    public static final long REF_ARRAY_BASE;
    public static final int REF_ELEMENT_SHIFT;
    
    static {
        // 数组中一个元素占用的大小
        final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
        if (4 == scale) {
            REF_ELEMENT_SHIFT = 2;
        } else if (8 == scale) {
            REF_ELEMENT_SHIFT = 3;
        } else {
            throw new IllegalStateException("Unknown pointer size: " + scale);
        }
        // 数组中第一个元素的偏移地址
        REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class);
    }
    

    此篇简单介绍下Netty中如何使用JCTools中的类在并发场景下提交元素,以及它的底层数据结构. 这种是与传统直接创建一个2倍的数组处理方式不同的.

    相关文章

      网友评论

        本文标题:Netty中的任务队列(添加元素篇)

        本文链接:https://www.haomeiwen.com/subject/shggoktx.html