美文网首页
(四)channelPromise在netty中的应用

(四)channelPromise在netty中的应用

作者: guessguess | 来源:发表于2021-03-24 17:51 被阅读0次

之前在讲channel是如何完成注册以及连接到服务端的时候,有这么一段代码。

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        1.完成初始化以及注册
        final ChannelFuture regFuture = initAndRegister();
        2.获取回调
        final Channel channel = regFuture.channel();
        3.如果完成了就直接连接服务端
        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            连接服务端的代码
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            4.如果暂时没有完成注册
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            通过监听器,在完成注册之后,执行需要进行的操作。
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
}

上面的代码其实蛮简单的,但是其实也涉及到不少东西,比如ChannlFutrue,ChannelFuture,Promise。
下面先来说说基本的概念。

Future

public interface Future<V> extends java.util.concurrent.Future<V> {
    boolean isCancellable();
    Throwable cause();
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    Future<V> sync() throws InterruptedException;
    Future<V> syncUninterruptibly();
    Future<V> await() throws InterruptedException;
    Future<V> awaitUninterruptibly();
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    boolean await(long timeoutMillis) throws InterruptedException;
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    boolean awaitUninterruptibly(long timeoutMillis);
    V getNow();
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

从源码上看,这个接口其实是对java提供的Future进行拓展。
加了许多的新方法。
但这里感觉最重要的,就是对于监听器的增删改。GenericFutureListener。
说明了这个接口的基本功能。回调功能,以及监听器的管理(曾删改)
下面来说说GenericFutureListener

GenericFutureListener

public interface EventListener {
}
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
    void operationComplete(F future) throws Exception;
}

监听器里面的泛型是netty自己定义的Future而不是java本身提供的,这点需要注意。
这个监听器的功能,就是对future进行监听。

ChannelFuture

public interface ChannelFuture extends Future<Void> {
    Channel channel();
    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelFuture sync() throws InterruptedException;
    @Override
    ChannelFuture syncUninterruptibly();
    @Override
    ChannelFuture await() throws InterruptedException;
    @Override
    ChannelFuture awaitUninterruptibly();
    boolean isVoid();
}

ChannelFuture,这个名字很简单粗暴,其实就是将Future与Channel进行了绑定。
同时覆写了Futrue的方法,返回类型都变为ChannelFuture 

Promise

public interface Promise<V> extends Future<V> {
    Promise<V> setSuccess(V result);
    boolean trySuccess(V result);
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);
    boolean setUncancellable();
    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    @Override
    Promise<V> await() throws InterruptedException;
    @Override
    Promise<V> awaitUninterruptibly();
    @Override
    Promise<V> sync() throws InterruptedException;
    @Override
    Promise<V> syncUninterruptibly();
}

Promise是对Future的拓展,比较重要的点,还是对回调监听器的管理。
新添加的功能就是,判断future的状态,同时决定是否要唤醒监听器,或者是否要抛异常。

ChannelPromise

public interface ChannelPromise extends ChannelFuture, Promise<Void> {
    @Override
    Channel channel();
    @Override
    ChannelPromise setSuccess(Void result);
    ChannelPromise setSuccess();
    boolean trySuccess();
    @Override
    ChannelPromise setFailure(Throwable cause);
    @Override
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
    @Override
    ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
    @Override
    ChannelPromise sync() throws InterruptedException;
    @Override
    ChannelPromise syncUninterruptibly();
    @Override
    ChannelPromise await() throws InterruptedException;
    @Override
    ChannelPromise awaitUninterruptibly();
    ChannelPromise unvoid();
}

从结构上来看,大家就知道这个接口具体提供什么功能了。
Promise, 就是对状态的判断,以及根据状态是否对监听器进行唤醒。
Future,回调功能,同时增加监听器的增删改
ChannelFuture,将Future与Channel绑定。
ChannelPromise = Channel + future + Promise
ChannelPromise结构图

说完上面的结构,下面的话就讲一下,netty是如何使用ChannelPromise的。
下面还是通过上面那个例子来说明一下吧。

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        1.完成初始化以及注册=======================先说说如何获取回调
        final ChannelFuture regFuture = initAndRegister();
        2.获取通道
        final Channel channel = regFuture.channel();
        3.如果完成了就直接连接服务端
        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            连接服务端的代码
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            4.如果暂时没有完成注册
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            通过监听器,在完成注册之后,执行需要进行的操作。
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
}

从channel注册代码可以看到,netty中的注册流程大概是如下:
1.完成初始化以及注册,返回回调
2.从回调中获取通道,以供连接服务端使用
3.通过回调判断是否已经完成注册,从而觉得是进行连接,还是添加监听器。

ChannelFuture的是如何完成监听的

主要的代码位于io.netty.bootstrap.AbstractBootstrap.initAndRegister()

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                channel.unsafe().closeForcibly();
            }
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        最重要的还是这里,通过executor去注册通道。
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

一路debug,最终定位到生成ChannelFuture的位置

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
    @Override
    public ChannelFuture register(Channel channel) {
        将当前eventLoop以及channel传入,生成DefaultChannelPromise
        eventLoop是用于执行任务
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        unsafe其实是具体针对channel相关操作的实现,比如注册。
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
}


public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    protected abstract class AbstractUnsafe implements Unsafe {
        private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                注册的操作
                doRegister();
                neverRegistered = false;
                registered = true;
                pipeline.invokeHandlerAddedIfNeeded();
                注册完成后,设置为成功,这里就是重点需要看的。。。。。。
                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
      }
        设置成功的方法,这里面看是Promise接口的实现,因为本身promise用的是DefaultChannelPromise
        protected final void safeSetSuccess(ChannelPromise promise) {
            这里最重要的还是promise.trySuccess()方法。
            if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
                logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
            }
        }
}

DefaultChannelPromise中的实现
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
    @Override
    public boolean trySuccess() {
        trySuccess方法是在父类DefaultPromise中实现的。
        return trySuccess(null);
    }
}

DefaultPromise中的实现
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
    private boolean setSuccess0(V result) {
        return setValue0(result == null ? SUCCESS : result);
    }

    @Override
    public boolean trySuccess(V result) {
        这个方法必然是返回true
        if (setSuccess0(result)) {
            唤醒监听器
            notifyListeners();
            return true;
        }
        return false;
    }
    
    唤醒监听器
    private void notifyListenersNow() {
        Object listeners;
        synchronized (this) {
            如果正在唤醒监听器,或者监听器为空则直接返回
            if (notifyingListeners || this.listeners == null) {
                return;
            }
            notifyingListeners = true;
            将监听器列表赋值给局部变量
            listeners = this.listeners;
            this.listeners = null;
        }
        for (;;) {
            if (listeners instanceof DefaultFutureListeners) {
                唤醒监听器
                notifyListeners0((DefaultFutureListeners) listeners);
            } else {
                唤醒监听器
                notifyListener0(this, (GenericFutureListener<?>) listeners);
            }
            synchronized (this) {
                唤醒监听器之后,将监听器置空,说明其实监听器只会被执行一次。
                if (this.listeners == null) {
                    notifyingListeners = false;
                    return;
                }
                listeners = this.listeners;
                this.listeners = null;
            }
        }
    }

    private void notifyListeners0(DefaultFutureListeners listeners) {
        GenericFutureListener<?>[] a = listeners.listeners();
        int size = listeners.size();
        for (int i = 0; i < size; i ++) {
            notifyListener0(this, a[i]);
        }
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    private static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            看我们一开始举的例子里面,就是GenericFutureListener接口的实现。
            l.operationComplete(future);
        } catch (Throwable t) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }
}

这里其实为什么要用监听器呢?
因为很去执行注册,连接服务端的时候都是异步的(通过eventLoop去执行),获取到回调结果的时候,也许已经完成了,也许还没有开始,如果已经完成了的话,就没必要添加监听器了,如果没有完成了的话,添加监听器即可。
在完成操作后,就可以调用监听器去执行相应的操作了。

相关文章

网友评论

      本文标题:(四)channelPromise在netty中的应用

      本文链接:https://www.haomeiwen.com/subject/thkhhltx.html