美文网首页
《线程池系列七》-Guava ListenableFuture

《线程池系列七》-Guava ListenableFuture

作者: 逍遥无极 | 来源:发表于2017-11-07 13:16 被阅读0次

该篇文章与线程池的关系不是很大,由于它与FutureTask的实现非常的相似,因此放在了线程池系列。在学习ListenableFuture时一定要对比着FutureTask的相关知识点学习,了解两者的共同点、区别、适应场景。本节将详细讲解ListenableFure的默认实现AbstractFuture类,并结合FutureTask展开一些列的问题讨论。有关FutureTask的知识,请阅读我的另一篇文章《线程池系列一》-FutureTask原理讲解与源码剖析。本节标题与《线程池系列六》-Guava ListenableFutureTask非常相似,但是这两篇文章完全不同,ListenableFutureTask是基于FutureTask的实现,而本文主要讲解的是ListenableFuture的默认实现AbstractFuture类的使用

ListenableFuture<V>

ListenableFuture<V>接口继承Future<V>接口。Future接口是JDK定义的接口,其定义了与任务操作相关的方法,例如任务的取消:cancel(),判断任务的状态:isCancelled()、isDone(),获取任务结果:get()等。
ListenableFuture<V>是Guava(google提供的一个java开发包)定义的接口,其在Future接口的基础上添加了addListener()方法,源码如下:

void addListener(Runnable listener, Executor executor);

该方法主要用于给Future任务添加监听任务,需要主要的是:

  • listener任务必须指定执行该任务的线程池executor(该executor一定不要与执行Future任务的线程池是同一个,否则会出现死锁情况)
  • 监听任务执行的时机为Future任务执行完成(包括正确执行完成和任务抛异常终止)或者被取消。

AbstractFuture<V>内部类Sync

AbstractFuture的实现与FutureTask的实现非常相似,FutureTask使用unsafe包的CAS实现,而AbstractFuture使用的是AQS(AbstractQueuedSynchronizer,jdk锁实现的模板类,本文不做讲解),AbstractFuture将99%的操作全部都交于内部类Sync实现,下面讲解一下Sync类的实现

  • 任务的状态信息

Sync将任务状态分为5种,分别为running、completing、completed、cancelled、interrupted。其与FutureTask任务的状态对比如下图所示:

FutureTask-AbstractFuture状态对比图.png

从图中可以看出,AbstractFure相比与FutureTask少了两种状态Exceptional和interrupting状态,其中Exceptional状态,在AbstractFure中通过添加一个Throwable类型的结果来实现(如果Throwable对象的值不为null,则说明是exceptional),而interrupting状态再FutureTask中的作用就不是很大,在AstractFurue中并没有设计该状态。

  • 成员变量

与FutureTask不同,FutureTask中只有一个Object对象用来存放Future执行的结果,可以是正常结果,也可以是异常。在AbstractFuture中,将两种结果分开,正常结果放在value中,异常结果放在exception中,源码如下:

private V value;
private Throwable exception;

这也是为什么不设置exceptional状态就能区分正常和异常结果的原因。

  • 锁方法的重写

该锁使用的是共享锁来实现,主要涉及两个方法:

  1. tryAcquireShared(int ignored) 该方法是在获取锁时调用,多个线程可以同时获取锁
  2. tryReleaseShared(int finalState) 该方法是在释放锁时调用

在锁的时候时,我们一般都是先尝试获取锁,然后处理临界资源,处理完成后释放锁。而在AbstractFuture中并不是这种常规的使用方式,其实现是必须有一个线程先调用releaseShared(int arg)释放锁,其他线程才能调用acquireShared(int arg)获取锁,否则,所有的获取锁线程都将会堵塞
源码如下:

protected int tryAcquireShared(int ignored) {
    if (isDone()) {
        return 1;
    }
    return -1;
}
    
@Override
protected boolean tryReleaseShared(int finalState) {
    setState(finalState);
    return true;
}

从源码中可以看出,调用tryAcquireShared()方法就是判断任务有没有完成(isDone()方法),任务完成成功获取锁,任务没有完成则等待。
那么问题就是任务什么时候完成,任务完成都会调用tryReleaseShared()方法,该方法用于任务完成或者取消时设置最终状态。

  • 成员方法

一. 阻塞与非阻塞的get()方法
get()方法或间接调用获取锁操作,如果成功获取锁,则返回对应的结果,如果获取不到锁,则返回对应的异常

V get(long nanos) throws TimeoutException, CancellationException,
        ExecutionException, InterruptedException {

    //间接调用tryAcquireShared(arg)方法,支持中断,最长等待nanos时间
    if (!tryAcquireSharedNanos(-1, nanos)) {
        throw new TimeoutException("Timeout waiting for task.");
    }

    return getValue();
}
    
V get() throws CancellationException, ExecutionException,
        InterruptedException {

    //间接调用tryAcquireShared(arg)方法,支持中断,直到获取锁返回
    acquireSharedInterruptibly(-1);
    return getValue();
}

从源码中可以看出,两种get()方法都会间接的调用tryAcquireShared(arg)方法,且都支持中断。在成功获取锁之后,两者都会调用getValue()方法,其源码如下:

private V getValue() throws CancellationException, ExecutionException {
    int state = getState();
    switch (state) {
        case COMPLETED:
            if (exception != null) {
                throw new ExecutionException(exception);
            } else {
                return value;
            }

        case CANCELLED:
        case INTERRUPTED:
            throw cancellationExceptionWithCause(
                    "Task was cancelled.", exception);

        default:
            throw new IllegalStateException(
                    "Error, synchronizer in invalid state: " + state);
    }
}

该方法主要针对三种终止状态做处理:

  1. completed状态,该状态又分为正常结束和异常终止,通过判断exception是否为null进行区分,如果异常,则抛出异常,否则返回正常结束的结果
  2. cancelled和interrupted状态,统一处理抛出取消异常
  3. 其他情况,讲道理不会出现的状态,如果出现了抛出非法状态异常

二. 设值方法
该类方法都会间接调用 tryReleaseShared(int finalState)方法,使锁处于可获取状态,表示任务执行完成。其中包括set(@Nullable V v)设值正常值、setException(Throwable t)设值异常、cancel(boolean interrupt)取消任务,期源码如下:

boolean set(@Nullable V v) {
    return complete(v, null, COMPLETED);
}

boolean setException(Throwable t) {
    return complete(null, t, COMPLETED);
}

boolean cancel(boolean interrupt) {
    return complete(null, null, interrupt ? INTERRUPTED : CANCELLED);
}

从源码中可以看出三者都调用了complete(@Nullable V v, @Nullable Throwable t, int finalState)方法,下面对该方法进行详细的讲解,其核心逻辑如下:

  1. 将任务状态由running修改为completing状态
  2. 如果状态修改成功,则对结果value和异常exception赋值,异常的赋值主要看方法参数finalState,改状态如果为cancelled或者interrupted则为异常赋值,否则赋值为参数t的值(t可能为null),然后调用releaseShared(finalState)(该方法间接调用tryReleaseShared(arg))方法更新任务状态为最终状态。
  3. 如果状态修改失败,判断状态是否为completing状态,如果是则说明任务已经执行完成,只在赋值阶段,执行acquireShared(-1)获取锁操作,使自己阻塞至任务完成

源码如下:

private boolean complete(@Nullable V v, @Nullable Throwable t,
                         int finalState) {
    boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
    if (doCompletion) {
        // If this thread successfully transitioned to COMPLETING, set the value
        // and exception and then release to the final state.
        this.value = v;
        // Don't actually construct a CancellationException until necessary.
        this.exception = ((finalState & (CANCELLED | INTERRUPTED)) != 0)
                ? new CancellationException("Future.cancel() was called.") : t;
        releaseShared(finalState);
    } else if (getState() == COMPLETING) {
        // If some other thread is currently completing the future, block until
        // they are done so we can guarantee completion.
        acquireShared(-1);
    }
    return doCompletion;
}

AbstractFuture的状态转换全部都在该方法中了。
三. 状态判断方法
主要判断任务状态state的值,state的值存放在AQS中,再次不过多讲解,源码如下:

boolean isDone() {
    return (getState() & (COMPLETED | CANCELLED | INTERRUPTED)) != 0;
}
    
boolean isCancelled() {
    return (getState() & (CANCELLED | INTERRUPTED)) != 0;
}

boolean wasInterrupted() {
    return getState() == INTERRUPTED;
}

AbstractFuture<V>

  • 成员变量
  private final Sync<V> sync = new Sync<V>();
  private final ExecutionList executionList = new ExecutionList();

其中,sync已经讲解,ExecutionList主要用于执行listener,其实用可以参考我的另一篇博客《线程池系列六》-Guava ListenableFutureTask中有关于ExecutionList的详细讲解

  • 成员方法

大部分的成员方法都是直接调用sync的对应方法,并没有做过多的操作,只是简单的将方法暴露给外部使用而已。其中,get(long timeout, TimeUnit unit)、get()、isDone()、isCancelled()、wasInterrupted()都是直接调用的sync的方法。
除了上述方法外,还有赋值操作和取消操作的方法,由于该类方法设计到任务完成回调listener方法的关系,因此不是简单的调用sync的方法,其实现如下所示:

  protected boolean set(@Nullable V value) {
    boolean result = sync.set(value);
    if (result) {
      executionList.execute();
    }
    return result;
  }

如果设置成功,表示任务结束,则执行listener方法(executionList.execute())
setException(Throwable throwable)方法与set(@Nullable V value)操作一直,多了一个非空判断而已,不再讲解。
cancel() 如果取消成功,则执行listener回调,如果参数为真,则interruptTask();该方法为抽象方法,子类可以实现。cancel()的源码如下:

  public boolean cancel(boolean mayInterruptIfRunning) {
    if (!sync.cancel(mayInterruptIfRunning)) {
      return false;
    }
    executionList.execute();
    if (mayInterruptIfRunning) {
      interruptTask();
    }
    return true;
  }

AbstractFuture与FutureTask的区别

  1. AbstractFuture通过AQS实现,FutureTask通过unsafe CAS实现,本质是一样的
  2. AbstractFuture 有五种状态,两种任务结果value和exception,而FutureTask有七种状态,任务执行结果只有一个outcome
  3. AbstractFuture没有实现Runnable接口,不能作为任务放到线程池中执行,而FutureTask可以
  4. AbstractFuture有接口回调,FutureTask没有,但是留下了回调的接口,可以重写done()方法

AbstractFuture的使用场景

AbstractFuture是一个抽象类,我们需要自定义子类来使用AbstractFuture,又因此AbstractFuture并没有实现Runnable接口,因此其不适合和线程池配合使用(子类同时实现Runnable接口也是可以的)。它经常用来与AsyncHttpClient配合使用,使用异步HttpClient发起请求,在请求的回调中根据请求的返回结果执行AbstractFuture对象set()、setException()、cancel()操作。

欢迎扫描下方二维码,关注公众号,我们可以进行技术交流,共同成长

qrcode_for_gh_5580beb3cba1_430.jpg

相关文章

网友评论

      本文标题:《线程池系列七》-Guava ListenableFuture

      本文链接:https://www.haomeiwen.com/subject/luudmxtx.html