美文网首页
(四)channelPromise在netty中的应用

(四)channelPromise在netty中的应用

作者: guessguess | 来源:发表于2021-03-24 17:51 被阅读0次

    之前在讲channel是如何完成注册以及连接到服务端的时候,有这么一段代码。

    public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
        private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
            1.完成初始化以及注册
            final ChannelFuture regFuture = initAndRegister();
            2.获取回调
            final Channel channel = regFuture.channel();
            3.如果完成了就直接连接服务端
            if (regFuture.isDone()) {
                if (!regFuture.isSuccess()) {
                    return regFuture;
                }
                连接服务端的代码
                return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
            } else {
                4.如果暂时没有完成注册
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                通过监听器,在完成注册之后,执行需要进行的操作。
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            promise.setFailure(cause);
                        } else {
                            promise.registered();
                            doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    }
    

    上面的代码其实蛮简单的,但是其实也涉及到不少东西,比如ChannlFutrue,ChannelFuture,Promise。
    下面先来说说基本的概念。

    Future

    public interface Future<V> extends java.util.concurrent.Future<V> {
        boolean isCancellable();
        Throwable cause();
        Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
        Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
        Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
        Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
        Future<V> sync() throws InterruptedException;
        Future<V> syncUninterruptibly();
        Future<V> await() throws InterruptedException;
        Future<V> awaitUninterruptibly();
        boolean await(long timeout, TimeUnit unit) throws InterruptedException;
        boolean await(long timeoutMillis) throws InterruptedException;
        boolean awaitUninterruptibly(long timeout, TimeUnit unit);
        boolean awaitUninterruptibly(long timeoutMillis);
        V getNow();
        @Override
        boolean cancel(boolean mayInterruptIfRunning);
    }
    
    从源码上看,这个接口其实是对java提供的Future进行拓展。
    加了许多的新方法。
    但这里感觉最重要的,就是对于监听器的增删改。GenericFutureListener。
    说明了这个接口的基本功能。回调功能,以及监听器的管理(曾删改)
    下面来说说GenericFutureListener
    

    GenericFutureListener

    public interface EventListener {
    }
    public interface GenericFutureListener<F extends Future<?>> extends EventListener {
        void operationComplete(F future) throws Exception;
    }
    
    监听器里面的泛型是netty自己定义的Future而不是java本身提供的,这点需要注意。
    这个监听器的功能,就是对future进行监听。
    

    ChannelFuture

    public interface ChannelFuture extends Future<Void> {
        Channel channel();
        @Override
        ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
        @Override
        ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
        @Override
        ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
        @Override
        ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
        @Override
        ChannelFuture sync() throws InterruptedException;
        @Override
        ChannelFuture syncUninterruptibly();
        @Override
        ChannelFuture await() throws InterruptedException;
        @Override
        ChannelFuture awaitUninterruptibly();
        boolean isVoid();
    }
    
    ChannelFuture,这个名字很简单粗暴,其实就是将Future与Channel进行了绑定。
    同时覆写了Futrue的方法,返回类型都变为ChannelFuture 
    

    Promise

    public interface Promise<V> extends Future<V> {
        Promise<V> setSuccess(V result);
        boolean trySuccess(V result);
        Promise<V> setFailure(Throwable cause);
        boolean tryFailure(Throwable cause);
        boolean setUncancellable();
        @Override
        Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
        @Override
        Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
        @Override
        Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
        @Override
        Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
        @Override
        Promise<V> await() throws InterruptedException;
        @Override
        Promise<V> awaitUninterruptibly();
        @Override
        Promise<V> sync() throws InterruptedException;
        @Override
        Promise<V> syncUninterruptibly();
    }
    
    Promise是对Future的拓展,比较重要的点,还是对回调监听器的管理。
    新添加的功能就是,判断future的状态,同时决定是否要唤醒监听器,或者是否要抛异常。
    

    ChannelPromise

    public interface ChannelPromise extends ChannelFuture, Promise<Void> {
        @Override
        Channel channel();
        @Override
        ChannelPromise setSuccess(Void result);
        ChannelPromise setSuccess();
        boolean trySuccess();
        @Override
        ChannelPromise setFailure(Throwable cause);
        @Override
        ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
        @Override
        ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
        @Override
        ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
        @Override
        ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
        @Override
        ChannelPromise sync() throws InterruptedException;
        @Override
        ChannelPromise syncUninterruptibly();
        @Override
        ChannelPromise await() throws InterruptedException;
        @Override
        ChannelPromise awaitUninterruptibly();
        ChannelPromise unvoid();
    }
    
    从结构上来看,大家就知道这个接口具体提供什么功能了。
    Promise, 就是对状态的判断,以及根据状态是否对监听器进行唤醒。
    Future,回调功能,同时增加监听器的增删改
    ChannelFuture,将Future与Channel绑定。
    ChannelPromise = Channel + future + Promise
    
    ChannelPromise结构图

    说完上面的结构,下面的话就讲一下,netty是如何使用ChannelPromise的。
    下面还是通过上面那个例子来说明一下吧。

    public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
        private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
            1.完成初始化以及注册=======================先说说如何获取回调
            final ChannelFuture regFuture = initAndRegister();
            2.获取通道
            final Channel channel = regFuture.channel();
            3.如果完成了就直接连接服务端
            if (regFuture.isDone()) {
                if (!regFuture.isSuccess()) {
                    return regFuture;
                }
                连接服务端的代码
                return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
            } else {
                4.如果暂时没有完成注册
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                通过监听器,在完成注册之后,执行需要进行的操作。
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            promise.setFailure(cause);
                        } else {
                            promise.registered();
                            doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    }
    

    从channel注册代码可以看到,netty中的注册流程大概是如下:
    1.完成初始化以及注册,返回回调
    2.从回调中获取通道,以供连接服务端使用
    3.通过回调判断是否已经完成注册,从而觉得是进行连接,还是添加监听器。

    ChannelFuture的是如何完成监听的

    主要的代码位于io.netty.bootstrap.AbstractBootstrap.initAndRegister()

        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel();
                init(channel);
            } catch (Throwable t) {
                if (channel != null) {
                    channel.unsafe().closeForcibly();
                }
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            最重要的还是这里,通过executor去注册通道。
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
            return regFuture;
        }
    

    一路debug,最终定位到生成ChannelFuture的位置

    public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
        @Override
        public ChannelFuture register(Channel channel) {
            将当前eventLoop以及channel传入,生成DefaultChannelPromise
            eventLoop是用于执行任务
            return register(new DefaultChannelPromise(channel, this));
        }
    
        @Override
        public ChannelFuture register(final ChannelPromise promise) {
            ObjectUtil.checkNotNull(promise, "promise");
            unsafe其实是具体针对channel相关操作的实现,比如注册。
            promise.channel().unsafe().register(this, promise);
            return promise;
        }
    }
    
    
    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
        protected abstract class AbstractUnsafe implements Unsafe {
            private void register0(ChannelPromise promise) {
                try {
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    boolean firstRegistration = neverRegistered;
                    注册的操作
                    doRegister();
                    neverRegistered = false;
                    registered = true;
                    pipeline.invokeHandlerAddedIfNeeded();
                    注册完成后,设置为成功,这里就是重点需要看的。。。。。。
                    safeSetSuccess(promise);
                    pipeline.fireChannelRegistered();
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
          }
            设置成功的方法,这里面看是Promise接口的实现,因为本身promise用的是DefaultChannelPromise
            protected final void safeSetSuccess(ChannelPromise promise) {
                这里最重要的还是promise.trySuccess()方法。
                if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
                    logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
                }
            }
    }
    
    DefaultChannelPromise中的实现
    public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
        @Override
        public boolean trySuccess() {
            trySuccess方法是在父类DefaultPromise中实现的。
            return trySuccess(null);
        }
    }
    
    DefaultPromise中的实现
    public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
        private boolean setSuccess0(V result) {
            return setValue0(result == null ? SUCCESS : result);
        }
    
        @Override
        public boolean trySuccess(V result) {
            这个方法必然是返回true
            if (setSuccess0(result)) {
                唤醒监听器
                notifyListeners();
                return true;
            }
            return false;
        }
        
        唤醒监听器
        private void notifyListenersNow() {
            Object listeners;
            synchronized (this) {
                如果正在唤醒监听器,或者监听器为空则直接返回
                if (notifyingListeners || this.listeners == null) {
                    return;
                }
                notifyingListeners = true;
                将监听器列表赋值给局部变量
                listeners = this.listeners;
                this.listeners = null;
            }
            for (;;) {
                if (listeners instanceof DefaultFutureListeners) {
                    唤醒监听器
                    notifyListeners0((DefaultFutureListeners) listeners);
                } else {
                    唤醒监听器
                    notifyListener0(this, (GenericFutureListener<?>) listeners);
                }
                synchronized (this) {
                    唤醒监听器之后,将监听器置空,说明其实监听器只会被执行一次。
                    if (this.listeners == null) {
                        notifyingListeners = false;
                        return;
                    }
                    listeners = this.listeners;
                    this.listeners = null;
                }
            }
        }
    
        private void notifyListeners0(DefaultFutureListeners listeners) {
            GenericFutureListener<?>[] a = listeners.listeners();
            int size = listeners.size();
            for (int i = 0; i < size; i ++) {
                notifyListener0(this, a[i]);
            }
        }
    
        @SuppressWarnings({ "unchecked", "rawtypes" })
        private static void notifyListener0(Future future, GenericFutureListener l) {
            try {
                看我们一开始举的例子里面,就是GenericFutureListener接口的实现。
                l.operationComplete(future);
            } catch (Throwable t) {
                logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
            }
        }
    }
    

    这里其实为什么要用监听器呢?
    因为很去执行注册,连接服务端的时候都是异步的(通过eventLoop去执行),获取到回调结果的时候,也许已经完成了,也许还没有开始,如果已经完成了的话,就没必要添加监听器了,如果没有完成了的话,添加监听器即可。
    在完成操作后,就可以调用监听器去执行相应的操作了。

    相关文章

      网友评论

          本文标题:(四)channelPromise在netty中的应用

          本文链接:https://www.haomeiwen.com/subject/thkhhltx.html