美文网首页Java 杂谈高性能/高并发
并发系列之 Future 框架详解

并发系列之 Future 框架详解

作者: java小牛 | 来源:发表于2019-04-08 14:59 被阅读1次

    本文将主要讲解 J.U.C 中的 Future 框架,并分析结合源码分析其内部结构逻辑;

    一、Future 框架概述

    JDK 中的 Future 框架实际就是 Future 模式的实现,通常情况下我们会配合线程池使用,但也可以单独使用;下面我们就单独使用简单举例;

    1. 应用实例

    FutureTask future =newFutureTask<>(() -> {  log.info("异步任务执行...");  Thread.sleep(2000);  log.info("过了很久很久...");return"异步任务完成";});log.info("启动异步任务...");newThread(future).start();log.info("继续其他任务...");Thread.sleep(1000);log.info("获取异步任务结果:{}", future.get());

    打印:

    [15:38:03,231INFO ] [main]    - 启动异步任务...

    [15:38:03,231INFO ] [main]    - 继续其他任务...

    [15:38:03,231INFO ] [Thread-0] - 异步任务执行...

    [15:38:05,232INFO ] [Thread-0] - 过了很久很久...

    [15:38:05,236INFO ] [main]    - 获取异步任务结果:异步任务完成

    如上面代码所示,首先我们将要执行的任务包装成 Callable,这里如果不需要返回值也可以使用 Runnable;然后构建 FutureTask 由一个线程启动,最后使用 Future.get() 获取异步任务结果;

    2. Future 运行逻辑

    对于 Future 模式的流程图如下:

    对比上面的实例代码,大家可能会发现有些不一样,因为在 FutureTask 同时继承了 Runnable 和 Future 接口,所以再提交任务后没有返回Future,而是直接使用自身调用 get;下面我们就对源码进行实际分析;

    二、源码分析

    1. FutureTask 主体结构

    publicinterface RunnableFuture<V> extends Runnable, Future<V> {}publicclass FutureTask<V> implements RunnableFuture<V> {privatevolatileintstate;// 任务运行状态privateCallable callable;// 异步任务privateObject outcome;// 返回结果privatevolatileThread runner;// 异步任务执行线程privatevolatileWaitNode waiters;// 等待异步结果的线程栈(通过Treiber stack算法实现)public FutureTask(Callable<V> callable) {// 需要返回值if(callable ==null)thrownewNullPointerException();this.callable = callable;this.state = NEW;// ensure visibility of callable}public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;// ensure visibility of callable}  ...}

    另外在代码中还可以看见有很多地方都是用了 CAS 来更新变量,而 JDK1.6 中甚至使用了 AQS 来实现;其原因就是同一个 FutureTask 可以多个线程同时提交,也可以多个线程同时获取; 所以代码中有很多的状态变量:

    // FutureTask.state 取值privatestaticfinalintNEW          =0;// 初始化到结果返回前privatestaticfinalintCOMPLETING  =1;// 结果赋值privatestaticfinalintNORMAL      =2;// 执行完毕privatestaticfinalintEXCEPTIONAL  =3;// 执行异常privatestaticfinalintCANCELLED    =4;// 任务取消privatestaticfinalintINTERRUPTING =5;// 设置中断状态privatestaticfinalintINTERRUPTED  =6;// 任务中断

    同时源码的注释中也详细给出了可能出现的状态转换:

    NEW -> COMPLETING -> NORMAL // 任务正常执行

    NEW -> COMPLETING -> EXCEPTION // 任务执行异常

    NEW ->CANCELLED // 任务取消

    NEW -> INITERRUPTING -> INTERRUPTED // 任务中断

    注意这里的 COMPLETING 状态是一个很微妙的状态,正因为有他的存在才能实现无锁赋值;大家先留意这个状态,然后在代码中应该能体会到;另外这里还有一个变量需要注意,WaitNode ;使用 Treiber stack 算法实现的无锁栈;其原理说明可以参考下面第三节;

    2. 任务执行

    public void run() {if(state != NEW ||// 确保任务执行完成后,不再重复执行!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))// 确保只有一个线程执行return;try{    Callable c = callable;if(c !=null&& state == NEW) {      V result;booleanran;try{        result = c.call();        ran =true;      }catch(Throwable ex) {        result =null;        ran =false;        setException(ex);// 设置异常结果}if(ran) set(result);// 设置结果}  }finally{    runner =null;ints = state;if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s);// 确保中断状态已经设置}}

    // 设置异步任务结果protected void set(V v) {if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 保证结果只能设置一次outcome = v;    UNSAFE.putOrderedInt(this, stateOffset, NORMAL);// final statefinishCompletion();// 唤醒等待线程}}

    protected void setException(Throwable t) {if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 保证结果只能设置一次outcome = t;    UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);// final statefinishCompletion();  }}

    3. 任务取消

    public boolean cancel(boolean mayInterruptIfRunning) {if(!(state == NEW &&// 只有在任务执行阶段才能取消UNSAFE.compareAndSwapInt(this, stateOffset, NEW,// 设置取消状态mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))returnfalse;try{// in case call to interrupt throws exceptionif(mayInterruptIfRunning) {try{        Thread t = runner;if(t !=null)          t.interrupt();      }finally{// final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);      }    }  }finally{    finishCompletion();  }returntrue;}

    注意 cancel(false) 也就是仅取消,并没有打断;异步任务会继续执行,只是这里首先设置了 FutureTask.state = CANCELLED ,所以最后在设置结果的时候会失败,UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING) ;

    4. 获取结果

    public V get() throws InterruptedException, ExecutionException {ints = state;if(s <= COMPLETING)    s = awaitDone(false,0L);// 阻塞等待returnreport(s);}private V report(int s) throws ExecutionException {// 根据最后的状态返回结果Object x = outcome;if(s == NORMAL)return(V)x;if(s >= CANCELLED)thrownewCancellationException();thrownewExecutionException((Throwable)x);}

    private int awaitDone(boolean timed, long nanos)  throws InterruptedException {finallongdeadline = timed ? System.nanoTime() + nanos :0L;  WaitNode q =null;booleanqueued =false;for(;;) {if(Thread.interrupted()) {      removeWaiter(q);// 移除等待节点thrownewInterruptedException();    }ints = state;if(s > COMPLETING) {// 任务已完成if(q !=null)        q.thread =null;returns;    }elseif(s == COMPLETING)// 正在赋值,直接先出让线程Thread.yield();elseif(q ==null)// 任务还未完成需要等待q =newWaitNode();elseif(!queued)      queued = UNSAFE.compareAndSwapObject(this, waitersOffset,                        q.next = waiters, q);// 使用 Treiber stack 算法elseif(timed) {      nanos = deadline - System.nanoTime();if(nanos <=0L) {        removeWaiter(q);returnstate;      }      LockSupport.parkNanos(this, nanos);    }elseLockSupport.park(this);  }}

    三、Treiber stack

    在《Java 并发编程实战》中讲了, 创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上,同时还要维护数据的一致性 。

    @ThreadSafepublicclass ConcurrentStack <E> {  AtomicReference> top =newAtomicReference<>();privatestaticclass Node <E> {publicfinalE item;publicNode next;public Node(E item) {this.item = item;    }  }public void push(E item) {    Node newHead =newNode<>(item);    Node oldHead;do{      oldHead = top.get();      newHead.next = oldHead;    }while(!top.compareAndSet(oldHead, newHead));  }public E pop() {    Node oldHead;    Node newHead;do{      oldHead = top.get();if(oldHead ==null)returnnull;      newHead = oldHead.next;    }while(!top.compareAndSet(oldHead, newHead));returnoldHead.item;  }}

    总结

    总体来讲源码比较简单,因为其本身只是一个 Future 模式的实现

    但是其中的状态量的设置,还有里面很多无锁的处理方式,才是 FutureTask 带给我们的精华!

    相关文章

      网友评论

        本文标题:并发系列之 Future 框架详解

        本文链接:https://www.haomeiwen.com/subject/tvouiqtx.html