Future
这个类代表了一个异步操作的结果。方法用于检查计算是否完成,等待计算完成,并检索计算结果。只有当计算完成时,才可以使用方法{@code get}检索结果,如果有必要,可以进行阻塞,直到计算完成为止。取消由{@code cancel}方法执行。还提供了其他方法来确定任务是正常完成还是被取消。一旦计算完成,就不能取消计算。如果为了可取消性而使用{@code Future},但不提供可用的结果,则可以声明形式{@code Future<?作为底层任务的结果返回{@code null}。
package com.arlley.concurrent.api;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* 这个类代表了一个异步操作的结果。
* 方法用于检查计算是否完成,等待计算完成,并检索计算结果。
* 只有当计算完成时,才可以使用方法{@code get}检索结果,如果有必要,可以进行阻塞,直到计算完成为止。
* 取消由{@code cancel}方法执行。
* 还提供了其他方法来确定任务是正常完成还是被取消。
* 一旦计算完成,就不能取消计算。
* 如果为了可取消性而使用{@code Future},但不提供可用的结果,
* 则可以声明形式{@code Future<?>作为底层任务的结果返回{@code null}。
*/
public interface Future<V> {
/**
* 尝试取消当前任务,如果当前任务已经完成,或者已经被取消,或者因为其他原因不能取消,那么这个尝试是失败的。
* 如果成功的取消了,当调用取消的时候,当前任务没有开始执行,这个任务将永远不会被执行。
* 如果当前任务正在执行,那么mayInterruptIfRunning参数将会决定着是否当前执行任务的线程在尝试取消任务的时候被中断。
*
* 当这个方法返回的时候 {@link #isDone} 将会总是返回{@code true}
* 如果这个方法返回{@code true}的时候 {@link #isCancelled}也将总是返回{@code true}
*
* @param mayInterruptIfRunning {@code true} 如果被取消的任务需要被中断 否则正在执行的任务一直会被执行。
* @return {@code false} 当前任务不能被取消比如当前任务已经被完成 否则{@code true}
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 返回{@code true} 如果当前线程在正常完成之前被取消。
* @return 返回{@code true} 如果当前线程在正常完成之前被取消。
*/
boolean isCancelled();
/**
*如果任务已经完成了 就会返回{@code true}
*
* 正常的结束,异常,或者取消,这个方法都会返回{@code true}
* @return 如果任务已经完成了 就会返回{@code true}
*/
boolean isDone();
/**
* 等待需要完成的计算,然后返回他的结果。
* @return 计算的结果。
* @throws CancellationException 如果当前任务呗取消
* @throws InterruptedException 如果当前任务呗中断
* @throws ExecutionException 如果当前任务抛出执行异常
*/
V get() throws InterruptedException, ExecutionException;
/**
*在给定的时间内等待需要完成的计算,然后返回他的结果。
* @param timeout 超时时间
* @param unit 给定的时间单位
* @return 计算结果
* @throws InterruptedException 如果当前任务被中断
* @throws ExecutionException 如果当前任务抛出异常
* @throws TimeoutException 如果超时。
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
package com.arlley.concurrent.api;
/**
*一个可能返回结果或者抛出异常的任务。实现者实现一个简单的无参方法 叫做call
*
* Callable接口和Runnable一样,这样的类都是的实例都是被另一个线程执行的。一个Runnable不能返回结果和抛出异常
*
* Executors类提供了实用的方法,从其他的一般的形式转化为Callable的形式。
* @param <V>
*/
@FunctionalInterface
public interface Callable<V> {
/**
*计算返回一个结果,如果不能返回,抛出异常。
* @return 计算结果
* @throws Exception 如果不能返回结果 则抛出异常。
*/
V call() throws Exception;
}
package com.arlley.concurrent.api;
import com.sun.source.tree.NewArrayTree;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;
/**
* 一个可以取消的异步计算。这个类给{@link Future}的简单实现,提供了启动和取消计算,
* 查询当前计算是否已经完成。反馈计算的结果。并且当计算被完成的时候结果才会反馈。(如果计算还没有完成,get就会被阻塞)
* 一旦计算完成,当前任务就不会被重启或者取消,除非被调用{@link #runAndReset}方法。
*
* futureTask可以用来包裹{@link java.util.concurrent.Callable} 和{@link Runnable} 类。
* 因为 futureTask实现了Runnable方法 所以可以被提交到{@link java.util.concurrent.Executors}
*
* 除了作为一个独立的类之外,这个类还提供了{@code protected}功能,这在创建定制的任务类时可能很有用。
* @param <V> 返回的结果类型
*/
public class FutureTask<V> implements RunnableFuture{
/**
* 此任务的运行状态,最初是NEW。
* 运行状态仅在set、setException和cancel方法中转换为终端状态。
* 在完成期间,状态可以取一个暂态值,运行中(在设置结果时)或中断中(仅在中断运行程序以满足取消(true)时)。
* 从这些中间状态到最终状态的转换使用更便宜的有序/延迟写操作,因为值是惟一的,不能进一步修改。可能的状态转换:
* NEW->COMPLETING->NORMAL
* NEW->COMPLETING->EXCEPTIONAL
* NEW- >CANCELLED
* NEW>INTERRUPTING->INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/**要运行的Callable 运行结束之后会被置为null*/
private Callable<V> callable;
/**运行的结果*/
private Object outcome;
/**执行callable的线程*/
private volatile Thread runner;
private volatile WaitNode waiters;
/**
*
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
/**
*创建一个执行callable的futureTask。
* @param callable
*/
public FutureTask(Callable<V> callable){
if(Objects.isNull(callable)){
throw new NullPointerException();
}
this.callable = callable;
this.state = NEW;
}
/**
* 创建一个执行给定的runnable的futureTask,返回给定的结果。
* @param runnable
* @param result
*/
public FutureTask(Runnable runnable, V result){
// TODO 实现Executors工具类
}
@Override
public void run() {
if(state != NEW || !RUNNER.compareAndSet(this,null, Thread.currentThread())){
return;
}
try{
Callable<V> callable = this.callable;
boolean ran = false;
V result = null;
try {
result = callable.call();
ran = true;
} catch (Exception e) {
ran = false;
setException(e);
}
if(ran){
set(result);
}
}finally {
runner = null;
int s = state;
if(s >= INTERRUPTING){
handlePossibleCancellationInterrupt(s);
}
}
}
/**
* Executes the computation without setting its result, and then
* resets this future to initial state, failing to do so if the
* computation encounters an exception or is cancelled. This is
* designed for use with tasks that intrinsically execute more
* than once.
*
* @return {@code true} if successfully run and reset
*/
protected boolean runAndReset() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
private void handlePossibleCancellationInterrupt(int s) {
if(s == INTERRUPTING){
while (s == INTERRUPTING){
Thread.yield();
}
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// 判断是否是NEW 并且cas操作可以取消 如果失败 那么取消失败。
if(!(state == NEW && STATE.compareAndSet(this, NEW, mayInterruptIfRunning?INTERRUPTING:CANCELLED))){
return false;
}
try{
if(mayInterruptIfRunning) {
try {
Thread thread = runner;
if (Objects.nonNull(thread)) {
thread.interrupt();
}
}finally {
STATE.setRelease(this, INTERRUPTED);
}
}
}finally {
finishCompletion();
}
return true;
}
/**
*
*/
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (WAITERS.weakCompareAndSet(this, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
protected void done() {
}
@Override
public boolean isCancelled() {
return this.state >= CANCELLED;
}
@Override
public boolean isDone() {
return this.state != NEW;
}
@Override
public Object get() throws InterruptedException, ExecutionException {
int s = state;
if(s <= COMPLETING){
s = awaitDone(false, 0L);
}
return report(s);
}
private int awaitDone(boolean timed, long nanos) throws InterruptedException{
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
} else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
} else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
} else if (!queued)
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
} else
LockSupport.park(this);
}
}
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!WAITERS.compareAndSet(this, q, s))
continue retry;
}
break;
}
}
}
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
protected void set(V v) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = v;
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = t;
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();
}
}
// VarHandle mechanics 实现原子操作的变量句柄
private static final VarHandle STATE;
private static final VarHandle RUNNER;
private static final VarHandle WAITERS;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
STATE = l.findVarHandle(java.util.concurrent.FutureTask.class, "state", int.class);
RUNNER = l.findVarHandle(java.util.concurrent.FutureTask.class, "runner", Thread.class);
WAITERS = l.findVarHandle(java.util.concurrent.FutureTask.class, "waiters", WaitNode.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
// Reduce the risk of rare disastrous classloading in first call to
// LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
}
}
网友评论