本文将主要讲解 J.U.C 中的 Future 框架,并分析结合源码分析其内部结构逻辑;
一、Future 框架概述
JDK 中的 Future 框架实际就是 Future 模式的实现,通常情况下我们会配合线程池使用,但也可以单独使用;下面我们就单独使用简单举例;
1. 应用实例
FutureTask future =newFutureTask<>(() -> { log.info("异步任务执行..."); Thread.sleep(2000); log.info("过了很久很久...");return"异步任务完成";});log.info("启动异步任务...");newThread(future).start();log.info("继续其他任务...");Thread.sleep(1000);log.info("获取异步任务结果:{}", future.get());
打印:
[15:38:03,231INFO ] [main] - 启动异步任务...
[15:38:03,231INFO ] [main] - 继续其他任务...
[15:38:03,231INFO ] [Thread-0] - 异步任务执行...
[15:38:05,232INFO ] [Thread-0] - 过了很久很久...
[15:38:05,236INFO ] [main] - 获取异步任务结果:异步任务完成
如上面代码所示,首先我们将要执行的任务包装成 Callable,这里如果不需要返回值也可以使用 Runnable;然后构建 FutureTask 由一个线程启动,最后使用 Future.get() 获取异步任务结果;
2. Future 运行逻辑
对于 Future 模式的流程图如下:
对比上面的实例代码,大家可能会发现有些不一样,因为在 FutureTask 同时继承了 Runnable 和 Future 接口,所以再提交任务后没有返回Future,而是直接使用自身调用 get;下面我们就对源码进行实际分析;
二、源码分析
1. FutureTask 主体结构
publicinterface RunnableFuture<V> extends Runnable, Future<V> {}publicclass FutureTask<V> implements RunnableFuture<V> {privatevolatileintstate;// 任务运行状态privateCallable callable;// 异步任务privateObject outcome;// 返回结果privatevolatileThread runner;// 异步任务执行线程privatevolatileWaitNode waiters;// 等待异步结果的线程栈(通过Treiber stack算法实现)public FutureTask(Callable<V> callable) {// 需要返回值if(callable ==null)thrownewNullPointerException();this.callable = callable;this.state = NEW;// ensure visibility of callable}public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;// ensure visibility of callable} ...}
另外在代码中还可以看见有很多地方都是用了 CAS 来更新变量,而 JDK1.6 中甚至使用了 AQS 来实现;其原因就是同一个 FutureTask 可以多个线程同时提交,也可以多个线程同时获取; 所以代码中有很多的状态变量:
// FutureTask.state 取值privatestaticfinalintNEW =0;// 初始化到结果返回前privatestaticfinalintCOMPLETING =1;// 结果赋值privatestaticfinalintNORMAL =2;// 执行完毕privatestaticfinalintEXCEPTIONAL =3;// 执行异常privatestaticfinalintCANCELLED =4;// 任务取消privatestaticfinalintINTERRUPTING =5;// 设置中断状态privatestaticfinalintINTERRUPTED =6;// 任务中断
同时源码的注释中也详细给出了可能出现的状态转换:
NEW -> COMPLETING -> NORMAL // 任务正常执行
NEW -> COMPLETING -> EXCEPTION // 任务执行异常
NEW ->CANCELLED // 任务取消
NEW -> INITERRUPTING -> INTERRUPTED // 任务中断
注意这里的 COMPLETING 状态是一个很微妙的状态,正因为有他的存在才能实现无锁赋值;大家先留意这个状态,然后在代码中应该能体会到;另外这里还有一个变量需要注意,WaitNode ;使用 Treiber stack 算法实现的无锁栈;其原理说明可以参考下面第三节;
2. 任务执行
public void run() {if(state != NEW ||// 确保任务执行完成后,不再重复执行!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))// 确保只有一个线程执行return;try{ Callable c = callable;if(c !=null&& state == NEW) { V result;booleanran;try{ result = c.call(); ran =true; }catch(Throwable ex) { result =null; ran =false; setException(ex);// 设置异常结果}if(ran) set(result);// 设置结果} }finally{ runner =null;ints = state;if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s);// 确保中断状态已经设置}}
// 设置异步任务结果protected void set(V v) {if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 保证结果只能设置一次outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL);// final statefinishCompletion();// 唤醒等待线程}}
protected void setException(Throwable t) {if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 保证结果只能设置一次outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);// final statefinishCompletion(); }}
3. 任务取消
public boolean cancel(boolean mayInterruptIfRunning) {if(!(state == NEW &&// 只有在任务执行阶段才能取消UNSAFE.compareAndSwapInt(this, stateOffset, NEW,// 设置取消状态mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))returnfalse;try{// in case call to interrupt throws exceptionif(mayInterruptIfRunning) {try{ Thread t = runner;if(t !=null) t.interrupt(); }finally{// final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } }finally{ finishCompletion(); }returntrue;}
注意 cancel(false) 也就是仅取消,并没有打断;异步任务会继续执行,只是这里首先设置了 FutureTask.state = CANCELLED ,所以最后在设置结果的时候会失败,UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING) ;
4. 获取结果
public V get() throws InterruptedException, ExecutionException {ints = state;if(s <= COMPLETING) s = awaitDone(false,0L);// 阻塞等待returnreport(s);}private V report(int s) throws ExecutionException {// 根据最后的状态返回结果Object x = outcome;if(s == NORMAL)return(V)x;if(s >= CANCELLED)thrownewCancellationException();thrownewExecutionException((Throwable)x);}
private int awaitDone(boolean timed, long nanos) throws InterruptedException {finallongdeadline = timed ? System.nanoTime() + nanos :0L; WaitNode q =null;booleanqueued =false;for(;;) {if(Thread.interrupted()) { removeWaiter(q);// 移除等待节点thrownewInterruptedException(); }ints = state;if(s > COMPLETING) {// 任务已完成if(q !=null) q.thread =null;returns; }elseif(s == COMPLETING)// 正在赋值,直接先出让线程Thread.yield();elseif(q ==null)// 任务还未完成需要等待q =newWaitNode();elseif(!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);// 使用 Treiber stack 算法elseif(timed) { nanos = deadline - System.nanoTime();if(nanos <=0L) { removeWaiter(q);returnstate; } LockSupport.parkNanos(this, nanos); }elseLockSupport.park(this); }}
三、Treiber stack
在《Java 并发编程实战》中讲了, 创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上,同时还要维护数据的一致性 。
@ThreadSafepublicclass ConcurrentStack <E> { AtomicReference> top =newAtomicReference<>();privatestaticclass Node <E> {publicfinalE item;publicNode next;public Node(E item) {this.item = item; } }public void push(E item) { Node newHead =newNode<>(item); Node oldHead;do{ oldHead = top.get(); newHead.next = oldHead; }while(!top.compareAndSet(oldHead, newHead)); }public E pop() { Node oldHead; Node newHead;do{ oldHead = top.get();if(oldHead ==null)returnnull; newHead = oldHead.next; }while(!top.compareAndSet(oldHead, newHead));returnoldHead.item; }}
总结
总体来讲源码比较简单,因为其本身只是一个 Future 模式的实现
但是其中的状态量的设置,还有里面很多无锁的处理方式,才是 FutureTask 带给我们的精华!
网友评论