美文网首页异步编程NIO(Netty, Vert.x, Akka和Node.js)
聊聊reactor-netty的PoolResources的两种

聊聊reactor-netty的PoolResources的两种

作者: go4it | 来源:发表于2018-04-09 22:58 被阅读0次

    本文主要研究下reactor-netty的PoolResources的两种模式elastic及fixed。

    LoopResources与PoolResources

    TcpResources是个工具类,可以用来创建loopResources和poolResources。

    loopResources

    主要是创建NioEventLoopGroup,以及该group下面的workerCount个NioEventLoop(这里涉及两个参数,一个是worker thread count,一个是selector thread count)

    • DEFAULT_IO_WORKER_COUNT:如果环境变量有设置reactor.ipc.netty.workerCount,则用该值;没有设置则取Math.max(Runtime.getRuntime().availableProcessors(), 4)))
    • DEFAULT_IO_SELECT_COUNT:如果环境变量有设置reactor.ipc.netty.selectCount,则用该值;没有设置则取-1,表示没有selector thread
    • DEFAULT_MAX_PENDING_TASKS: 指定NioEventLoop的taskQueue的大小,Math.max(16,SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE))
    • NioEventLoop继承了SingleThreadEventLoop,而SingleThreadEventLoop则继承SingleThreadEventExecutor,而其代理的executor是ThreadPerTaskExecutor,rejectHandler是RejectedExecutionHandlers.reject(),默认的taskQueue是LinkedBlockingQueue,其大小为Integer.MAX_VALUE

    poolResources

    主要是创建channelPools,类型是ConcurrentMap<SocketAddress, Pool>,这里主要研究下它的两种模式elastic及fixed

    DefaultPoolResources

    reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/resources/DefaultPoolResources.java

    它实现了netty-transport-4.1.22.Final-sources.jar!/io/netty/channel/pool/ChannelPool.java的接口,重点看如下的几个方法:

            @Override
            public Future<Channel> acquire() {
                return acquire(defaultGroup.next().newPromise());
            }
    
            @Override
            public Future<Channel> acquire(Promise<Channel> promise) {
                return pool.acquire(promise).addListener(this);
            }
    
            @Override
            public Future<Void> release(Channel channel) {
                return pool.release(channel);
            }
    
            @Override
            public Future<Void> release(Channel channel, Promise<Void> promise) {
                return pool.release(channel, promise);
            }
    
            @Override
            public void close() {
                if(compareAndSet(false, true)) {
                    pool.close();
                }
            }
    

    这里的几个接口基本是委托为具体的pool来进行操作,其实现主要有SimpleChannelPool及FixedChannelPool。

    PoolResources.elastic(SimpleChannelPool)

    reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/resources/PoolResources.java

        /**
         * Create an uncapped {@link PoolResources} to provide automatically for {@link
         * ChannelPool}.
         * <p>An elastic {@link PoolResources} will never wait before opening a new
         * connection. The reuse window is limited but it cannot starve an undetermined volume
         * of clients using it.
         *
         * @param name the channel pool map name
         *
         * @return a new {@link PoolResources} to provide automatically for {@link
         * ChannelPool}
         */
        static PoolResources elastic(String name) {
            return new DefaultPoolResources(name, SimpleChannelPool::new);
        }
    

    这个是TcpClient.create过程中,默认使用的方法,默认使用的是SimpleChannelPool,创建的是DefaultPoolResources

    reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/tcp/TcpResources.java

        static <T extends TcpResources> T create(T previous,
                LoopResources loops,
                PoolResources pools,
                String name,
                BiFunction<LoopResources, PoolResources, T> onNew) {
            if (previous == null) {
                loops = loops == null ? LoopResources.create("reactor-" + name) : loops;
                pools = pools == null ? PoolResources.elastic(name) : pools;
            }
            else {
                loops = loops == null ? previous.defaultLoops : loops;
                pools = pools == null ? previous.defaultPools : pools;
            }
            return onNew.apply(loops, pools);
        }
    

    SimpleChannelPool

    netty-transport-4.1.22.Final-sources.jar!/io/netty/channel/pool/SimpleChannelPool.java

    /**
     * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire
     * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced.
     *
     * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}.
     *
     */
    public class SimpleChannelPool implements ChannelPool {
    
        @Override
        public final Future<Channel> acquire() {
            return acquire(bootstrap.config().group().next().<Channel>newPromise());
        }
    
        @Override
        public Future<Channel> acquire(final Promise<Channel> promise) {
            checkNotNull(promise, "promise");
            return acquireHealthyFromPoolOrNew(promise);
        }
    
        /**
         * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
         * @param promise the promise to provide acquire result.
         * @return future for acquiring a channel.
         */
        private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
            try {
                final Channel ch = pollChannel();
                if (ch == null) {
                    // No Channel left in the pool bootstrap a new Channel
                    Bootstrap bs = bootstrap.clone();
                    bs.attr(POOL_KEY, this);
                    ChannelFuture f = connectChannel(bs);
                    if (f.isDone()) {
                        notifyConnect(f, promise);
                    } else {
                        f.addListener(new ChannelFutureListener() {
                            @Override
                            public void operationComplete(ChannelFuture future) throws Exception {
                                notifyConnect(future, promise);
                            }
                        });
                    }
                    return promise;
                }
                EventLoop loop = ch.eventLoop();
                if (loop.inEventLoop()) {
                    doHealthCheck(ch, promise);
                } else {
                    loop.execute(new Runnable() {
                        @Override
                        public void run() {
                            doHealthCheck(ch, promise);
                        }
                    });
                }
            } catch (Throwable cause) {
                promise.tryFailure(cause);
            }
            return promise;
        }
    
        @Override
        public final Future<Void> release(Channel channel) {
            return release(channel, channel.eventLoop().<Void>newPromise());
        }
    
        @Override
        public Future<Void> release(final Channel channel, final Promise<Void> promise) {
            checkNotNull(channel, "channel");
            checkNotNull(promise, "promise");
            try {
                EventLoop loop = channel.eventLoop();
                if (loop.inEventLoop()) {
                    doReleaseChannel(channel, promise);
                } else {
                    loop.execute(new Runnable() {
                        @Override
                        public void run() {
                            doReleaseChannel(channel, promise);
                        }
                    });
                }
            } catch (Throwable cause) {
                closeAndFail(channel, cause, promise);
            }
            return promise;
        }
    
        @Override
        public void close() {
            for (;;) {
                Channel channel = pollChannel();
                if (channel == null) {
                    break;
                }
                channel.close();
            }
        }
    
        //......
    }    
    

    这个连接池的实现如果没有连接则会创建一个(没有限制),取出连接(连接池使用一个LIFO的Deque来维护Channel)的时候会检测连接的有效性。

    PoolResources.fixed(FixedChannelPool)

    reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/resources/PoolResources.java

        /**
         * Default max connection, if -1 will never wait to acquire before opening new
         * connection in an unbounded fashion. Fallback to
         * available number of processors.
         */
        int DEFAULT_POOL_MAX_CONNECTION =
                Integer.parseInt(System.getProperty("reactor.ipc.netty.pool.maxConnections",
                "" + Math.max(Runtime.getRuntime()
                            .availableProcessors(), 8) * 2));
    
        /**
         * Default acquisition timeout before error. If -1 will never wait to
         * acquire before opening new
         * connection in an unbounded fashion. Fallback to
         * available number of processors.
         */
        long DEFAULT_POOL_ACQUIRE_TIMEOUT = Long.parseLong(System.getProperty(
                "reactor.ipc.netty.pool.acquireTimeout",
                "" + 45000));
    
        /**
         * Create a capped {@link PoolResources} to provide automatically for {@link
         * ChannelPool}.
         * <p>A Fixed {@link PoolResources} will open up to the given max number of
         * processors observed by this jvm (minimum 4).
         * Further connections will be pending acquisition indefinitely.
         *
         * @param name the channel pool map name
         *
         * @return a new {@link PoolResources} to provide automatically for {@link
         * ChannelPool}
         */
        static PoolResources fixed(String name) {
            return fixed(name, DEFAULT_POOL_MAX_CONNECTION);
        }
    
        /**
         * Create a capped {@link PoolResources} to provide automatically for {@link
         * ChannelPool}.
         * <p>A Fixed {@link PoolResources} will open up to the given max connection value.
         * Further connections will be pending acquisition indefinitely.
         *
         * @param name the channel pool map name
         * @param maxConnections the maximum number of connections before starting pending
         * acquisition on existing ones
         *
         * @return a new {@link PoolResources} to provide automatically for {@link
         * ChannelPool}
         */
        static PoolResources fixed(String name, int maxConnections) {
            return fixed(name, maxConnections, DEFAULT_POOL_ACQUIRE_TIMEOUT);
        }
    
        /**
         * Create a capped {@link PoolResources} to provide automatically for {@link
         * ChannelPool}.
         * <p>A Fixed {@link PoolResources} will open up to the given max connection value.
         * Further connections will be pending acquisition indefinitely.
         *
         * @param name the channel pool map name
         * @param maxConnections the maximum number of connections before starting pending
         * @param acquireTimeout the maximum time in millis to wait for aquiring
         *
         * @return a new {@link PoolResources} to provide automatically for {@link
         * ChannelPool}
         */
        static PoolResources fixed(String name, int maxConnections, long acquireTimeout) {
            if (maxConnections == -1) {
                return elastic(name);
            }
            if (maxConnections <= 0) {
                throw new IllegalArgumentException("Max Connections value must be strictly " + "positive");
            }
            if (acquireTimeout != -1L && acquireTimeout < 0) {
                throw new IllegalArgumentException("Acquire Timeout value must " + "be " + "positive");
            }
            return new DefaultPoolResources(name,
                    (bootstrap, handler, checker) -> new FixedChannelPool(bootstrap,
                            handler,
                            checker,
                            FixedChannelPool.AcquireTimeoutAction.FAIL,
                            acquireTimeout,
                            maxConnections,
                            Integer.MAX_VALUE
                            ));
        }
    

    最后调用的fixed方法有三个参数,一个是name,一个是maxConnections,一个是acquireTimeout。可以看到这里创建的是FixedChannelPool。

    FixedChannelPool

    netty-transport-4.1.22.Final-sources.jar!/io/netty/channel/pool/FixedChannelPool.java

    /**
     * {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum
     * number of concurrent connections.
     */
    public class FixedChannelPool extends SimpleChannelPool {
    
        @Override
        public Future<Channel> acquire(final Promise<Channel> promise) {
            try {
                if (executor.inEventLoop()) {
                    acquire0(promise);
                } else {
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            acquire0(promise);
                        }
                    });
                }
            } catch (Throwable cause) {
                promise.setFailure(cause);
            }
            return promise;
        }
    
        private void acquire0(final Promise<Channel> promise) {
            assert executor.inEventLoop();
    
            if (closed) {
                promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
                return;
            }
            if (acquiredChannelCount < maxConnections) {
                assert acquiredChannelCount >= 0;
    
                // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
                // EventLoop
                Promise<Channel> p = executor.newPromise();
                AcquireListener l = new AcquireListener(promise);
                l.acquired();
                p.addListener(l);
                super.acquire(p);
            } else {
                if (pendingAcquireCount >= maxPendingAcquires) {
                    promise.setFailure(FULL_EXCEPTION);
                } else {
                    AcquireTask task = new AcquireTask(promise);
                    if (pendingAcquireQueue.offer(task)) {
                        ++pendingAcquireCount;
    
                        if (timeoutTask != null) {
                            task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
                        }
                    } else {
                        promise.setFailure(FULL_EXCEPTION);
                    }
                }
    
                assert pendingAcquireCount > 0;
            }
        }
    
        @Override
        public Future<Void> release(final Channel channel, final Promise<Void> promise) {
            ObjectUtil.checkNotNull(promise, "promise");
            final Promise<Void> p = executor.newPromise();
            super.release(channel, p.addListener(new FutureListener<Void>() {
    
                @Override
                public void operationComplete(Future<Void> future) throws Exception {
                    assert executor.inEventLoop();
    
                    if (closed) {
                        // Since the pool is closed, we have no choice but to close the channel
                        channel.close();
                        promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION);
                        return;
                    }
    
                    if (future.isSuccess()) {
                        decrementAndRunTaskQueue();
                        promise.setSuccess(null);
                    } else {
                        Throwable cause = future.cause();
                        // Check if the exception was not because of we passed the Channel to the wrong pool.
                        if (!(cause instanceof IllegalArgumentException)) {
                            decrementAndRunTaskQueue();
                        }
                        promise.setFailure(future.cause());
                    }
                }
            }));
            return promise;
        }
    
        @Override
        public void close() {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    if (!closed) {
                        closed = true;
                        for (;;) {
                            AcquireTask task = pendingAcquireQueue.poll();
                            if (task == null) {
                                break;
                            }
                            ScheduledFuture<?> f = task.timeoutFuture;
                            if (f != null) {
                                f.cancel(false);
                            }
                            task.promise.setFailure(new ClosedChannelException());
                        }
                        acquiredChannelCount = 0;
                        pendingAcquireCount = 0;
                        FixedChannelPool.super.close();
                    }
                }
            });
        }
        //......
    }
    

    这里的acquire,如果当前线程不是在eventLoop中,则放入队列中等待执行acquire0,这里可能撑爆eventLoop的taskQueue,不过其队列大小的值取Math.max(16,SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE)),默认是Integer.MAX_VALUE。

    FixedChannelPool继承了SimpleChannelPool,并重写了acquire、release、close方法。它对获取连接进行了限制,主要有如下几个参数:

    • maxConnections
      该值先从系统变量reactor.ipc.netty.pool.maxConnections取(如果设置为-1,表示无限制,回到elastic模式),如果没有设置,则取Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2,即核数与8的最大值的2倍。

    • acquireTimeout
      该值先从系统变量reactor.ipc.netty.pool.acquireTimeout取(如果设置为-1,表示立即执行不等待),如果没有设置,则为45000毫秒

    • maxPendingAcquires
      这里设置的是Integer.MAX_VALUE

    • AcquireTimeoutAction
      这里设置为FixedChannelPool.AcquireTimeoutAction.FAIL,即timeoutTask为

                    timeoutTask = new TimeoutTask() {
                        @Override
                        public void onTimeout(AcquireTask task) {
                            // Fail the promise as we timed out.
                            task.promise.setFailure(TIMEOUT_EXCEPTION);
                        }
                    };
    

    如果当前连接超过maxConnections,则进入pendingAcquireQueue等待获取连接,而在进入pendingAcquireQueue之前,如果当前等待数量超过了maxPendingAcquires,则返回FULL_EXCEPTION(Too many outstanding acquire operations),这里设置的是Integer.MAX_VALUE,所以不会有这个异常。进入pendingAcquireQueue之后,还有一个acquireTimeout参数,即进入pendingAcquireQueue等待acquireTimeout时间,如果还没有获取到连接则返回TIMEOUT_EXCEPTION(Acquire operation took longer then configured maximum time)。

    小结

    默认TcpClient创建的PoolResources使用的是elastic模式,即连接池的实现是SimpleChannelPool,默认使用一个LIFO的Deque来维护Channel,如果从连接池取不到连接则会创建新的连接,上限应该是系统设置的能够打开的文件资源数量,超过则报SocketException: Too many open files。PoolResources还提供了FixedChannelPool实现,使用的是fixed模式,即限定了连接池最大连接数及最大等待超时,避免连接创建数量过多撑爆内存或者报SocketException: Too many open files异常。

    注意,对于fixed模式,如果reactor.ipc.netty.pool.maxConnections设置为-1,则回退到elastic模式。

    doc

    相关文章

      网友评论

        本文标题:聊聊reactor-netty的PoolResources的两种

        本文链接:https://www.haomeiwen.com/subject/javwhftx.html