美文网首页Java
Netty源码_Future和Promise详解

Netty源码_Future和Promise详解

作者: wo883721 | 来源:发表于2021-10-24 17:05 被阅读0次

我们知道 netty 里面基本上都是异步操作,那么如何在异常操作完成通知使用者,获取异常操作的结果呢?

jdk 中提供了 Future 接口,来获取异常操作的结果,我们先从这个 Future 接口开始讲起。

一. java.util.concurrent.Future 类

package java.util.concurrent;

public interface Future<V> {

    /**
     * 取消当前的Future。会唤醒所有等待结果值的线程,抛出CancellationException异常
     * @param mayInterruptIfRunning 是否中断 计算结果值的那个线程
     * @return 返回true表示取消成功
     */
    boolean cancel(boolean mayInterruptIfRunning);

    // 当前的Future是否被取消,返回true表示已取消。
    boolean isCancelled();

    // 当前Future是否已结束。包括运行完成、抛出异常以及取消,都表示当前Future已结束
    boolean isDone();

    // 获取Future的结果值。如果当前Future还没有结束,那么当前线程就等待,
    // 直到Future运行结束,那么会唤醒等待结果值的线程的。
    V get() throws InterruptedException, ExecutionException;

    // 获取Future的结果值。与get()相比较多了允许设置超时时间。
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

这个接口的方法都写好注释了,很容易理解。
我们思考一下,jdk 为什么给这个接口定义这五个方法呢?

首先我们考虑一下,对于一个异步任务,使用者关心那些方面:

  • 获取异步操作结果,如果异步操作还未完成,那么就阻塞当前线程,直到异步操作完成,唤醒当前等待结果值的线程,得到结果。如果获取过程发生异常,那么也唤醒当前等待结果值的线程,抛出异常。这就是 V get() 方法.
  • 为了防止异步操作太耗时,等待结果线程一直被阻塞?提供两种方式:
    • 获取结果时,可以设置超时时间,即 V get(long timeout, TimeUnit unit) 方法。
    • 可以手动取消这个异步操作,即 boolean cancel(boolean mayInterruptIfRunning) 方法。

具体可以参考我的 Java线程池_Future与Callable原理分析 这一章,有更详细的分析。

但是 java.util.concurrent.Future 接口有一个非常大的缺陷,它只能通过线程阻塞的方式等待异步任务完成。

可能有人觉得这个有什么大问题么?因为想获取异步操作结果的线程,只能阻塞等待,那么在异步操作完成这段时间里,这个线程不能做其他事情,全部时间都浪费在等待结果值上了。
根本原因是java.util.concurrent.Future 接口没有提供一种通知的方式,当异步操作完成时,直接将结果值回传给想获取值的线程,而不需要它们等待啊。

因此针对这种情况,netty 提供了自己的 Future 接口。

二. io.netty.util.concurrent.Future 类

2.1 Future 类源码

public interface Future<V> extends java.util.concurrent.Future<V> {

    /**
     * 只有当异步操作成功后,这个方法才会返回 true。
     * 否则不管异步操作失败完成,还是被取消完成,这个方法都是返回false
     */
    boolean isSuccess();

    /**
     * 只有当这个异步操作可以被取消,即可以调用 {@link #cancel(boolean)} 方法,这个方法才返回 true
     */
    boolean isCancellable();

    /**
     * 只有当异步操作失败或者异步操作被取消,这个方法才会返回,异步操作失败的异常原因。
     * 注意:如果异步任务是被取消的,那么这个方法返回 CancellationException 异常。
     * 只有当异步操作成功完成或者还未完成,这个方法才是返回 null
     */
    Throwable cause();

    /**
     * 添加一个回调监听方法,当异步操作完成时,就会调用这个监听方法。
     * 异步任务完成,即包括运行完成、抛出异常以及取消
     */
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    /**
     * 添加多个回调监听方法,当异步操作完成时,就会调用这个监听方法。
     * 异步任务完成,即包括运行完成、抛出异常以及取消
     */
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    /**
     * 移除一个回调监听方法
     */
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    /**
     * 移除多个回调监听方法
     */
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    /**
     * 当前线程等待,直到这个异步操作完成。但是如果是异常完成,即 cause 方法有返回值,那么就抛出这个异常。
     */
    Future<V> sync() throws InterruptedException;

    /**
     * 与 sync 方法作用相同,只不过不会抛出中断异常
     */
    Future<V> syncUninterruptibly();

    /**
     *  当前线程等待,直到这个异步操作完成。
     *  注:这个方法和 sync 方法的区别时,如果异步操作出现异常,并不会抛出这个异常
     */
    Future<V> await() throws InterruptedException;

    /**
     * 当前线程等待,直到这个异步操作完成。
     */
    Future<V> awaitUninterruptibly();

    /**
     * 当前线程等待,直到这个异步操作完成或者等待时间超时。
     */
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * 当前线程等待,直到这个异步操作完成或者等待时间超时。
     */
    boolean await(long timeoutMillis) throws InterruptedException;

    /**
     * 当前线程等待,直到这个异步操作完成或者等待时间超时。
     */
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

    /**
     * 当前线程等待,直到这个异步操作完成或者等待时间超时。
     */
    boolean awaitUninterruptibly(long timeoutMillis);

    /**
     * 不会阻塞当前线程,直接返回值。如果异步操作还未完成,那么返回值为 null
     */
    V getNow();

    /**
     * 手动取消当前异步操作
     */
    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

我们可以看到 NettyFuturejdk 中的 Future 多了很多方法:

  1. isSuccess()cause() 方法,能够知道当前 Future 到底是成功了,还是失败了。而 jdk 中的 Future 只有 isDone() 表示当前 Future 已完成了。而当前 Future 是否失败,只能通过 get() 抛出异常才知道。
  2. boolean isCancellable() 表示当前这个异步操作能不能取消。如果返回 false 表示这个异步操作不能被取消。
  3. addListenerremoveListener 系列方法,这个就是比 jdk 中的 Future 多的核心方法,注册或移除回调GenericFutureListener,当异步操作Future 完成后,会回调GenericFutureListener传递结果值。

    因此nettyFuture就不需要一直等待异步操作的完成了,只需要注册回调GenericFutureListener就可以了。

  4. syncawait 系列方法,提供同步获取结果值的方法,即当前线程阻塞,一直等到 Future 完成。

    其实 sync 系列方法方法就是调用 await 系列方法实现的。只不过当异步操作发生异常时,sync 系列方法会重新抛出这个异常。

  5. getNow 方法:不会阻塞当前线程,直接返回值。如果异步操作还未完成,那么返回值为 null

2.2 AbstractFuture

public abstract class AbstractFuture<V> implements Future<V> {

    @Override
    public V get() throws InterruptedException, ExecutionException {
        // 通过 await() 方法,等待异步操作 Future 执行完成
        await();

        // 先判断异步操作过程中是否产生异常
        Throwable cause = cause();
        // 没有异常,直接返回结果值
        if (cause == null) {
            return getNow();
        }
        // 判断是取消异步操作异常还是其他异常
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;
        }
        // 抛出执行异常
        throw new ExecutionException(cause);
    }

    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        // 通过 await(timeout, unit) 方法,超时等待异步操作 Future 执行完成
        if (await(timeout, unit)) {
            // 先判断异步操作过程中是否产生异常
            Throwable cause = cause();
            if (cause == null) {
                return getNow();
            }
            // 判断是取消异步操作异常还是其他异常
            if (cause instanceof CancellationException) {
                throw (CancellationException) cause;
            }
            // 抛出执行异常
            throw new ExecutionException(cause);
        }
        // 抛出超时异常
        throw new TimeoutException();
    }
}

AbstractFuture主要是为了实现 java.util.concurrent.Future 中的两个方法,这样子类就不用继续实现了。

当然它们都是通过 await 系列方法实现对应功能的。

2.3 重要的子接口

2.3.1 Promise 接口

public interface Promise<V> extends Future<V> {

    /**
     * 手动设置异步操作成功完成,并唤醒那些等待异步操作结果值的线程。
     * 如果这个异步操作已完成,那么调用这个方法会抛出 {@link IllegalStateException} 异常
     */
    Promise<V> setSuccess(V result);

    /**
     * 尝试设置异步操作成功完成,并唤醒那些等待异步操作结果值的线程。
     * 如果返回 true,表示设置成功,如果返回false,说明这个异步操作已经完成了。
     */
    boolean trySuccess(V result);

    /**
     * 手动设置异步操作失败完成,并唤醒那些等待异步操作结果值的线程。
     * 如果这个异步操作已完成,那么调用这个方法会抛出 {@link IllegalStateException} 异常
     */
    Promise<V> setFailure(Throwable cause);

    /**
     * 尝试设置异步操作失败完成,并唤醒那些等待异步操作结果值的线程。
     * 如果返回 true,表示设置成功,如果返回false,说明这个异步操作已经完成了。
     */
    boolean tryFailure(Throwable cause);

    /**
     * 强制设置这个异步操作不能被取消。
     * 如果返回true,表示设置成功。 如果返回false,表示这个异步操作已经被取消了
     */
    boolean setUncancellable();

    // 剩下的方法 与 Future 一样,复写方法只不过是为了更换返回值 Promise。
    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> await() throws InterruptedException;

    @Override
    Promise<V> awaitUninterruptibly();

    @Override
    Promise<V> sync() throws InterruptedException;

    @Override
    Promise<V> syncUninterruptibly();
}

这个是 Future 最重要的一个子接口,它对比Future,多了一个非常神奇的功能,Promise 可以主动设置异步操作结果。而 Future 只能等待异步操作完成,或者手动取消这个异步任务。

  • 通过 setSuccess(V result)trySuccess(V result) 来手动设置异步操作成功完成。
  • 通过 setFailure(Throwable cause)tryFailure(Throwable cause) 来手动设置异步操作失败完成。
  • 通过 setUncancellable() 方法强制设置这个异步操作不能被取消。
  • 最后在 Future 接口中,所有返回值是 Future 方法都要被复写,返回值变成 Promise

2.3.2 ChannelFuture 接口

public interface ChannelFuture extends Future<Void> {

    /**
     * 返回一个通道,与此通道相关联的I/O操作在该通道中发生。
     * @return
     */
    Channel channel();

    // 剩下的方法 与 Future 一样,复写方法只不过是为了更换返回值 ChannelFuture。
    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture sync() throws InterruptedException;

    @Override
    ChannelFuture syncUninterruptibly();

    @Override
    ChannelFuture await() throws InterruptedException;

    @Override
    ChannelFuture awaitUninterruptibly();

    /**
     * 如果ChannelFuture是一个void future,则返回true,因此不允许调用以下任何方法:
     * <ul>
     *     <li>{@link #addListener(GenericFutureListener)}</li>
     *     <li>{@link #addListeners(GenericFutureListener[])}</li>
     *     <li>{@link #await()}</li>
     *     <li>{@link #await(long, TimeUnit)} ()}</li>
     *     <li>{@link #await(long)} ()}</li>
     *     <li>{@link #awaitUninterruptibly()}</li>
     *     <li>{@link #sync()}</li>
     *     <li>{@link #syncUninterruptibly()}</li>
     * </ul>
     */
    boolean isVoid();
}

这个ChannelFuture 接口也是比较重要的子接口,它将 FutureChannel联系到一起了。当然它还有什么 ChannelPromiseChannelProgressivePromise 这些衍生接口啊,这不过都多了几个方法。

2.3.3 ChannelGroupFuture 接口

public interface ChannelGroupFuture extends Future<Void>, Iterable<ChannelFuture> {

    /**
     * 返回与此future关联的ChannelGroup。
     */
    ChannelGroup group();

    /**
     * 返回与指定通道相关联的单个I/O操作的ChannelFuture。
     *
     * @return 如果找到匹配的ChannelFuture。否则无效。
     */
    ChannelFuture find(Channel channel);

    /**
     * 当且仅当与此future关联的所有I/O操作都成功而没有任何失败时返回true。
     */
    @Override
    boolean isSuccess();

    @Override
    ChannelGroupException cause();

    /**
     * 当且仅当与此未来相关的I/O操作部分成功而部分失败时返回true。
     */
    boolean isPartialSuccess();

    /**
     * 当且仅当与此未来相关的I/O操作部分失败而部分成功时返回true。
     */
    boolean isPartialFailure();

    @Override
    ChannelGroupFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelGroupFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelGroupFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelGroupFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelGroupFuture await() throws InterruptedException;

    @Override
    ChannelGroupFuture awaitUninterruptibly();

    @Override
    ChannelGroupFuture syncUninterruptibly();

    @Override
    ChannelGroupFuture sync() throws InterruptedException;

    /**
     * 返回枚举与此future关联的所有ChannelFutures的迭代器。
     * 请注意,返回的Iterator是不可修改的,
     * 这意味着ChannelFuture不能从这个future中删除。
     */
    @Override
    Iterator<ChannelFuture> iterator();
}

这个ChannelGroupFuture 接口也算重要的子接口,它将 FutureChannelGroup联系到一起了。

2.4 小结

Future 和它几个比较重要的子接口都讲解完了,下面我们将介绍它最重要的一个实现类 DefaultPromise , FuturePromise 的基本功能都是在这个里面完成的。

三. DefaultPromise 类

3.1 重要静态属性

  // 通过 cas 的方式,更改 result 变量
    @SuppressWarnings("rawtypes")
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
    // 如果异步操作Future 没有结果值,那么就设置成SUCCESS, 和初始值 null 做区别
    private static final Object SUCCESS = new Object();
    // 将异步操作Future 设置成不可取消时,result的值就设置成 UNCANCELLABLE, 表示此时不可取消
    private static final Object UNCANCELLABLE = new Object();
    // 异步操作Future被取消后,result的值就设置成 CANCELLATION_CAUSE_HOLDER, 表示异步操作已经取消了啊
    private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(
            StacklessCancellationException.newInstance(DefaultPromise.class, "cancel(...)"));

3.2 重要成员属性


    // 异步操作Future的结果值
    private volatile Object result;
    // 与这个异步操作Future 关联的 事件执行器executor,
    // 监控异步任务结果值的回调都必须在这个事件执行器executor中执行。
    private final EventExecutor executor;
    /**
     * 一个或多个侦听器。
     * 一个侦听器是 GenericFutureListener;
     * 多个侦听器是 DefaultFutureListeners。
     * 如果为 `null`, 则意味着要么没有添加监听器,要么所有监听器都得到了通知。
     */
    private Object listeners;

    /**
     * 表示当前阻塞等待结果值的线程数量
     * 线程同步。我们需要持有同步锁才能调用Java底层的wait()/notifyAll()。
     */
    private short waiters;

    /**
     * 表示当前是否正在进行通知
     * 线程同步,如果执行器发生变化,我们必须防止并发通知冲突,而是按照FIFO进行侦听器通知。
     */
    private boolean notifyingListeners;
  1. result 表示异步操作 Future的结果值,它是最重要的属性,也代表当前异步操作 Future 的状态。

    • result == null 表示异步操作 Future还没有完成。
    • result == UNCANCELLABLE 异步操作 Future也没有完成,但是这个异步操作 Future不可被取消。
    • result instanceof CauseHolder 表示异步操作 Future 发生了异常。也分为两种:result.cause instanceof CancellationException 表示异步操作取消完成,其他的就表示异步操作异常完成。
    • 除了上述三种情况,就表示异步操作 Future成功完成。
  2. executor: 与这个异步操作Future关联的 事件执行器executor,监控异步任务结果值的回调都必须在这个事件执行器executor中执行。

  3. waiters 表示正在同步阻塞等待异步操作结果的等待者数量,一般都是调用 await 系列方法产生的。

  4. listenersnotifyingListeners 和异步任务结果监听器通知有关,防止并发通知冲突。

3.3 重要方法

3.3.1 设置异步操作完成

  1. 成功完成
    private boolean setSuccess0(V result) {
        // 通过 setValue0 方法设置完成
        // 如果 result == null,就使用 SUCCESS,和 null 进行区别
        return setValue0(result == null ? SUCCESS : result);
    }
    @Override
    public Promise<V> setSuccess(V result) {
        // 通过 setSuccess0 尝试设置成功,如果成功就返回,失败就抛出异常
        if (setSuccess0(result)) {
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }
    
    @Override
    public boolean trySuccess(V result) {
        // 通过 setSuccess0 尝试设置成功
        return setSuccess0(result);
    }
    

    最终都是通过 setValue0() 方法进行异步操作完成设置。

  2. 失败完成
     /**
      * 检查是否有等待者waiters,如果有,通知他们。
      */
     private synchronized boolean checkNotifyWaiters() {
         if (waiters > 0) {
             // 唤醒那些正在阻塞等待异步操作结果的线程
             notifyAll();
         }
         // 异步操作监听器是否有
         return listeners != null;
     }
    
     private boolean setFailure0(Throwable cause) {
         // 通过 setValue0 方法设置完成,
         // 创建 CauseHolder 对象包装 cause,
         // 这样只要是 CauseHolder 实例,就表示失败了啊
         return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
     }
     @Override
     public Promise<V> setFailure(Throwable cause) {
         // 通过 setFailure0 尝试设置失败,
         // 如果设置成功就返回,设置失败就抛出异常
         if (setFailure0(cause)) {
             return this;
         }
         throw new IllegalStateException("complete already: " + this, cause);
     }
    
     @Override
     public boolean tryFailure(Throwable cause) {
         // 通过 setFailure0 尝试设置失败
         return setFailure0(cause);
     }
    

    最终都是通过 setValue0() 方法进行异步操作完成设置。

  3. 设置完成
     private boolean setValue0(Object objResult) {
         // 未完成情况有两种,result == null 或 result == UNCANCELLABLE
         // 使用 CAS 来更新,如果失败,说明异步操作被其他线程完成了,那么直接返回 false
         // 如果成功,那么表示异步操作完成了,就要通知监听器 和 阻塞等待异步任务结果的线程
         if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
             RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
             // 这里需要通知两个:
             // 1. 同步阻塞等待异步任务结果的线程,调用 await 系列方法
             // 2. 监听异步任务完成的监听器
             if (checkNotifyWaiters()) {
                 // 通知监听器,异步任务已经完成了
                 notifyListeners();
             }
             return true;
         }
         return false;
     }
    

    当异步操作完成后,即需要通知异步任务完成的监听器,还需要唤醒同步阻塞等待异步任务结果的线程。

3.3.2 异步操作状态

  1. 是否完成

     private static boolean isDone0(Object result) {
         // result 不是 null 和 UNCANCELLABLE,就表示这个异步操作已经完成
         return result != null && result != UNCANCELLABLE;
     }
    
     @Override
     public boolean isDone() {
         return isDone0(result);
     }
    
  2. 是否成功完成

     @Override
     public boolean isSuccess() {
         Object result = this.result;
         // 成功完成,首先必须已经完成,即 result != null && result != UNCANCELLABLE
         // 不能是失败完成,即 !(result instanceof CauseHolder)
         return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
     }
    
  3. 是否可取消

    @Override
     public boolean isCancellable() {
         // 只有 result == null 时,这个异步操作才能取消
         return result == null;
     }
    
  4. 是否已取消

     private static boolean isCancelled0(Object result) {
         // 结果值 result 必须 CauseHolder,且 cause 是 CancellationException 异常
         return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
     }
    
  5. 获取结果值的异常

     @Override
     public Throwable cause() {
         return cause0(result);
     }
    
     private Throwable cause0(Object result) {
         // 如果 result 不是 CauseHolder 实例,就没有异常,直接返回
         if (!(result instanceof CauseHolder)) {
             return null;
         }
         if (result == CANCELLATION_CAUSE_HOLDER) {
             CancellationException ce = new LeanCancellationException();
             if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
                 return ce;
             }
             result = this.result;
         }
         // 返回包裹的异常
         return ((CauseHolder) result).cause;
     }
    

3.3.3 同步等待结果

要同步等待异步操作结果,基本上都是靠 await 系列方法实现的。

  1. await() 没有超时时间等待方法

     protected void checkDeadLock() {
         EventExecutor e = executor();
         // 等待结果线程与异步操作线程是同一个线程
         if (e != null && e.inEventLoop()) {
             throw new BlockingOperationException(toString());
         }
     }
    
     @Override
     public Promise<V> await() throws InterruptedException {
         // 如果已经完成,那么不用等待了,直接返回
         if (isDone()) {
             return this;
         }
    
         // 如果其他线程已经调用了本线程中断方法,那么直接就直接抛出中断异常
         if (Thread.interrupted()) {
             throw new InterruptedException(toString());
         }
    
         // 检查有没有产生死锁情况,即在异步操作线程中调用await()方法等待操作完成。
         // 抛出 BlockingOperationException 异常
         checkDeadLock();
    
         // 获取同步锁
         synchronized (this) {
             // 只要异步操作没有完成,继续循环
             while (!isDone()) {
                 // 将 waiters 值加一,表示多一个等待者
                 incWaiters();
                 try {
                     // 等待,直到 checkNotifyWaiters 方法中调用 notifyAll() 方法唤醒
                     wait();
                 } finally {
                     // 已经完成,最后将waiters 值减一
                     decWaiters();
                 }
             }
         }
         return this;
     }
    

    利用同步锁 synchronizedwait() 方法,让当前线程等待,直到checkNotifyWaiters 方法中调用 notifyAll() 方法唤醒。

  2. await0(long timeoutNanos, boolean interruptable) 带超时时间等待方法

     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
         // 如果已经完成,那么不用等待了,直接返回 true
         if (isDone()) {
             return true;
         }
    
         // 超时时间已经到了,返回 isDone() 的值
         if (timeoutNanos <= 0) {
             return isDone();
         }
    
         // 需要抛出中断异常时,才进行判断
         if (interruptable && Thread.interrupted()) {
             throw new InterruptedException(toString());
         }
    
         // 检查有没有产生死锁情况,即在异步操作线程中调用await()方法等待操作完成。
         // 抛出 BlockingOperationException 异常
         checkDeadLock();
    
         // 从这里开始计算时间,而不是从方法的第一行开始计算,以避免和推迟 System.nanoTime()的性能成本。
         final long startTime = System.nanoTime();
         // 获取同步锁
         synchronized (this) {
             // 是否有中断异常
             boolean interrupted = false;
             try {
                 // 记录超时时间
                 long waitTime = timeoutNanos;
                 // 只有当异步操作没有完成 且 超时时间waitTime还大于0的时候,继续循环
                 while (!isDone() && waitTime > 0) {
                     // 将 waiters 值加一,表示多一个等待者
                     incWaiters();
                     try {
                         // 调用 wait 超时的等待方法进行等待。
                         // 直到超时时间到了或者checkNotifyWaiters 方法中调用 notifyAll() 方法唤醒
                         wait(waitTime / 1000000, (int) (waitTime % 1000000));
                     } catch (InterruptedException e) {
                         if (interruptable) {
                             throw e;
                         } else {
                             interrupted = true;
                         }
                     } finally {
                         // 已经完成,最后将waiters 值减一
                         decWaiters();
                     }
                     // 提前检查isDone(),避免以后计算耗时。
                     if (isDone()) {
                         return true;
                     }
                     // 在这里而不是在while条件中计算经过的时间,尽量避免在while的第一个循环中System.nanoTime()的性能代价。
                     waitTime = timeoutNanos - (System.nanoTime() - startTime);
                 }
                 return isDone();
             } finally {
                 if (interrupted) {
                     Thread.currentThread().interrupt();
                 }
             }
         }
     }
    

    await 方法实现基本逻辑差不多,只不过多了超时的判断。

     @Override
     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
         return await0(unit.toNanos(timeout), true);
     }
    
     @Override
     public boolean await(long timeoutMillis) throws InterruptedException {
         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
     }
     @Override
     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
         try {
             return await0(unit.toNanos(timeout), false);
         } catch (InterruptedException e) {
             // Should not be raised at all.
             throw new InternalError();
         }
     }
    
     @Override
     public boolean awaitUninterruptibly(long timeoutMillis) {
         try {
             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
         } catch (InterruptedException e) {
             // Should not be raised at all.
             throw new InternalError();
         }
     }
    
  3. sync 系列方法,都是借助 await 实现的

     private void rethrowIfFailed() {
         Throwable cause = cause();
         if (cause == null) {
             return;
         }
    
         // 如果有异常,就抛出
         PlatformDependent.throwException(cause);
     }
    
     @Override
     public Promise<V> sync() throws InterruptedException {
         await();
         // 如果有异常,重新抛出异常
         rethrowIfFailed();
         return this;
     }
    
     @Override
     public Promise<V> syncUninterruptibly() {
         awaitUninterruptibly();
         // 如果有异常,重新抛出异常
         rethrowIfFailed();
         return this;
     }
    

3.3.4 监听器相关

  1. 添加监听器

     @Override
     public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
         checkNotNull(listener, "listener");
    
         // 使用同步锁,防止并发冲突
         synchronized (this) {
             // 添加监听器
             addListener0(listener);
         }
    
         // 如果已经完成,那么就要通知监听器
         if (isDone()) {
             notifyListeners();
         }
    
         return this;
     }
    
     @Override
     public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
         checkNotNull(listeners, "listeners");
    
         // 使用同步锁,防止并发冲突
         synchronized (this) {
             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
                 if (listener == null) {
                     break;
                 }
                 // 添加监听器
                 addListener0(listener);
             }
         }
    
         // 如果已经完成,那么就要通知监听器
         if (isDone()) {
             notifyListeners();
         }
    
         return this;
     }
     private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
         // 当前 listeners 是 null, 直接将 listener 赋值给listeners
         if (listeners == null) {
             listeners = listener;
         } else if (listeners instanceof DefaultFutureListeners) {
             // 如果 listeners 已经是DefaultFutureListeners,调用它的 add 方法添加监听器
             ((DefaultFutureListeners) listeners).add(listener);
         } else {
             // 创建新的 DefaultFutureListeners 对象,包含两个监听器
             listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
         }
     }
    
  2. 移除监听器

     @Override
     public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
         checkNotNull(listener, "listener");
    
         synchronized (this) {
             removeListener0(listener);
         }
    
         return this;
     }
    
     @Override
     public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
         checkNotNull(listeners, "listeners");
    
         synchronized (this) {
             for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
                 if (listener == null) {
                     break;
                 }
                 removeListener0(listener);
             }
         }
    
         return this;
     }
    
     private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
         if (listeners instanceof DefaultFutureListeners) {
             // 如果 listeners 是DefaultFutureListeners,调用它的remove 移除对应监听器
             ((DefaultFutureListeners) listeners).remove(listener);
         } else if (listeners == listener) {
             // 唯一一个监听器移除了,将listeners重新设置成 null
             listeners = null;
         }
     }
    
  3. 通知监听器

     private void notifyListeners() {
         // 获取事件执行器
         EventExecutor executor = executor();
         // 当前线程是不是事件执行器的线程
         if (executor.inEventLoop()) {
             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
             final int stackDepth = threadLocals.futureListenerStackDepth();
             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                 try {
                     // 通知监听器
                     notifyListenersNow();
                 } finally {
                     threadLocals.setFutureListenerStackDepth(stackDepth);
                 }
                 return;
             }
         }
    
         // 将通知监听器调用包装成一个 Runnable ,交给事件执行器,
         // 保证 notifyListenersNow 方法是在事件执行器线程中执行
         safeExecute(executor, new Runnable() {
             @Override
             public void run() {
                 notifyListenersNow();
             }
         });
     }
    

    保证通知 notifyListenersNow() 操作是在事件执行器的线程中调用。

     private void notifyListenersNow() {
         Object listeners;
         // 使用同步锁,防止并发冲突
         synchronized (this) {
             // 当监听器listeners为null或者 现在已经在通知了 notifyingListeners == true,直接返回
             if (notifyingListeners || this.listeners == null) {
                 return;
             }
             // 表示正在通知
             notifyingListeners = true;
             listeners = this.listeners;
             this.listeners = null;
         }
         // 这里使用死循环,是为了处理在通知过程中,又有新的监听器添加了,也要通知这些添加的监听器
         for (;;) {
             if (listeners instanceof DefaultFutureListeners) {
                 // 如果是 DefaultFutureListeners,那么遍历一下,每个监听器都要通知
                 notifyListeners0((DefaultFutureListeners) listeners);
             } else {
                 notifyListener0(this, (GenericFutureListener<?>) listeners);
             }
             // 使用同步锁,处理可能添加的新的监听器
             synchronized (this) {
                 // listeners 还是 null,说明没有新的监听器添加
                 if (this.listeners == null) {
                     // 将 notifyingListeners 设置成 false,表示已经完成通知了
                     // 因为这个内部方法不会抛出异常,所以设置 notifyingListeners = false不用放到 finally 的代码块中。
                     notifyingListeners = false;
                     return;
                 }
                 // 得到新添加的监听器,进行通知
                 listeners = this.listeners;
                 // 再将 this.listeners 设置为 null
                 this.listeners = null;
             }
         }
     }
    
     private void notifyListeners0(DefaultFutureListeners listeners) {
         GenericFutureListener<?>[] a = listeners.listeners();
         int size = listeners.size();
         for (int i = 0; i < size; i ++) {
             // 遍历 DefaultFutureListeners 中的监听器
             notifyListener0(this, a[i]);
         }
     }
    
     @SuppressWarnings({ "unchecked", "rawtypes" })
     private static void notifyListener0(Future future, GenericFutureListener l) {
         try {
             // 回调监听器的方法,进行通知
             l.operationComplete(future);
         } catch (Throwable t) {
             if (logger.isWarnEnabled()) {
                 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
             }
         }
     }
    

    notifyListenersNow() 方法中,为了防止并发冲突,使用同步锁;为了防止在通知过程中,新添加监听器不能通知,使用 for (;;) 死循环,保证新添加监听器也能被通知到。

相关文章

网友评论

    本文标题:Netty源码_Future和Promise详解

    本文链接:https://www.haomeiwen.com/subject/bemraltx.html