1.Runnable接口
The Runnable interface should be implemented by any class
whose instances are intended to be executed by a thread. The
class must define a method of no arguments called run.
This interface is designed to provide a common protocol for
objects that wish to execute code while they are active. For
example, Runnable is implemented by class Thread. Being active
simply means that a thread has been started and has not yet
been stopped.
In addition, Runnable provides the means for a class to be active
while not subclassing Thread. A class that implements Runnable
can run without subclassing Thread by instantiating a Thread
instance and passing itself in as the target. In most cases, the
Runnable interface should be used if you are only planning to
override the run() method and no other Thread methods. This is
important because classes should not be subclassed unless the
programmer intends on modifying or enhancing the fundamental
behavior of the class.
Runnable接口由如下类实现:该类需要被线程执行。该类需要定义一个无参方法run。
Runnable提供了一种不是继承Thread的方式。在大多数情况下,如果仅仅是覆写run方法而不是其他Thread方法,则应该使用Runnable接口。除非打断修改或增强类的基本行为,否则不应该对类进行子类化。
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
2.Callable接口
A task that returns a result and may throw an exception.
Implementors define a single method with no arguments called
call.
The Callable interface is similar to Runnable, in that both are
designed for classes whose instances are potentially executed by
another thread. A Runnable, however, does not return a result
and cannot throw a checked exception.
The Executors class contains utility methods to convert from
other common forms to Callable classes.
返回结果或抛出异常的任务,实现者定义一个无参call方法。
与Runnable接口类似,该类实例可能由另一个线程执行。但是Runnable不能返回结果或抛出受查异常。
Executors类包含将其他常用形式转换为Callable类的实用方法。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
3.Future
A Future represents the result of an asynchronous computation.
Methods are provided to check if the computation is complete, to
wait for its completion, and to retrieve the result of the
computation. The result can only be retrieved using method get
when the computation has completed, blocking if necessary until
it is ready. Cancellation is performed by the cancel method.
Additional methods are provided to determine if the task
completed normally or was cancelled. Once a computation has
completed, the computation cannot be cancelled. If you would
like to use a Future for the sake of cancellability but not provide a
usable result, you can declare types of the form Future<?> and
return null as a result of the underlying task.
Future表示异步计算的结果。提供检查计算是否完成、等待计算完成以及取出计算结果的方法。只有在计算完成时,get方法才能取到结果,必要时会阻塞直到其计算完成。cacel方法执行取消动作。提供了其他方法检测任务是正常完成还是被取消。当计算完成后,其不能被取消。如果为了取消而不是提供有用结果而使用Future,可以声明Future<?>形式的类型,并返回null。
示例用法(如下的类都是虚构的):
interface ArchiveSearcher { String search(String target); }
class App {
ExecutorService executor = ...
ArchiveSearcher searcher = ...
void showSearch(final String target)
throws InterruptedException {
Future<String> future
= executor.submit(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
displayOtherThings(); // do other things while searching
try {
displayText(future.get()); // use future
} catch (ExecutionException ex) { cleanup(); return; }
}
}
FutureTask类是Future的一个实现,并实现了Runnable,因此可以由Executor执行。上述带submit结构可由下面的形式代替:
FutureTask<String> future =
new FutureTask<String>(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
executor.execute(future);
如下为Future的完整形式:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
关于cancle方法的说明:
- 在任务已经完成、已经被取消或者由于其他原因不能被取消时,取消会失败。
- 任务还未启动时调用cacel,会成功,并且任务永远都不能运行
- 任务已经启动了,mayInterruptIfRunning 参数会决定执行该任务的线程是否会被中断,以中断该任务
- 该方法返回后,后续的isDone()调用总是返回true。如果该方法返回true,后续的isCancelled()总是返回true。
4.RunnableFuture
A Future that is Runnable. Successful execution of the run
method causes completion of the Future and allows access to its
results.
是Runnable的Future,成功执行run方法将使Future完成,并允许访问其他结果。
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
5.FutureTask
A cancellable asynchronous computation. This class provides a
base implementation of Future, with methods to start and cancel
a computation, query to see if the computation is complete, and
retrieve the result of the computation. The result can only be
retrieved when the computation has completed; the get methods
will block if the computation has not yet completed. Once the
computation has completed, the computation cannot be restarted
or cancelled (unless the computation is invoked using
runAndReset()).
A FutureTask can be used to wrap a Callable or Runnable object.
Because FutureTask implements Runnable, a FutureTask can be
submitted to an Executor for execution.
In addition to serving as a standalone class, this class provides
protected functionality that may be useful when creating
customized task classes.
可取消的异步计算。此类提供了Future的基本实现,包括启动和取消计算的方法,查询计算是否完成的查询,以及检索计算结果。 只有在计算完成后才能检索结果; 如果计算尚未完成,get方法将阻塞。 计算完成后,无法重新启动或取消计算(除非使用runAndReset()调用计算)。
FutureTask可用于包装Callable或Runnable对象。 因为FutureTask实现了Runnable,所以可以将FutureTask提交给Executor执行。
除了作为独立类之外,此类还提供了在创建自定义任务类时可能有用的保护方法。
Revision notes: This differs from previous versions of this
class that relied on AbstractQueuedSynchronizer, mainly to
avoid surprising users about retaining interrupt status during
cancellation races. Sync control in the current design relies
on a "state" field updated via CAS to track completion, along
with a simple Treiber stack to hold waiting threads.
Style note: As usual, we bypass overhead of using
AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
修订说明:之前版本依赖于AQS,现版本主要是为了避免在取消争用期间保留中断状态。当前设计中的同步控制依赖于通过CAS更新state域来跟踪完成,以及用于保存等待线程简单的Treiber栈。
样式说明:像之前一样,绕过了使用AtomicXFieldUpdaters的开销,而是直接使用Unsafe内部方法。
5.1 状态的变化
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
任务的运行状态初始化为NEW。任务状态转为终止状态只会发生在方法set setException和cancel方法。在完成期间,state可能会一个短暂的COMPLETING或者INTERRUPTING。从这些中间状态到最终状态的转换使用更便宜的ordered/惰性写入,因为值是唯一的并且不能进一步修改。
可能的状态转换:
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
5.2 构造器及域
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
如果不需要结果,考虑使用下面的形式:
Future<?> f = new FutureTask<Void>(runnable, null);
从构造器可以看出,任务的初始状态为NEW。
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
- callable代表执行的任务,运行后置为null;
- outcome代表返回结果或者要抛出的异常,从get()获取的
- runner代表运行callable的线程,在run()中CAS赋值
- waiters代表等待线程的Treiber栈
5.3 run
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
- step1.线程状态不为NEW时返回;设置任务的runner为当前线程
- step2.运行任务result = c.call()
正常运行完成后,set(result)
发生异常,setException(ex) - step3.运行完成后,设置runner = null
重新读取state以防有漏掉的中断:s >= INTERRUPTING时,会调用handlePossibleCancellationInterrupt(s)
runner作为锁使用,防止其他线程并发调用run:
- run之前首先通过CAS尝试获得锁,并将runner置为当前线程
- run之后通过将runner = null释放锁
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
从上面可以看出,run有两条状态转移路线:
- 1)正常运行set(result)
NEW -> COMPLETING -> NORMAL - 2)发生异常
NEW -> COMPLETING -> EXCEPTIONAL
关于finishCompletion:
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
- step1.将waiters置为null
- step2.逐个唤醒在栈中阻塞的线程LockSupport.unpark(t)
- step3.调用保护方法done(),子类可以实现已完成想要的功能
- step4.将callable置为null
5.4 get
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
- step1.如果任务还没有完成,则调用awaitDone
- step2.任务完成,则report(s)
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
- step1.如果发生中断,则removeWaiter(q),并抛出异常
- step2.如果已经完成,则返回状态
- step3.状态为COMPLETING(表示已经执行完了,正在最后的设置期间),则调用Thread.yield(),因为当前线程是get,一般是另外的线程在执行任务,所以让执行任务的线程优先获得执行权限
- step4.状态为< COMPLETING,此时任务还在执行,则创建q = WaitNode()
- step5.如果创建的q没有入队,则入队,并置于队首,即入栈
- step6.最当前线程进行进行阻塞
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
5.5 cancel
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
- step1.不是NEW状态,直接返回false,表示此时任务已经执行完了
- step2.为NEW状态,如果 mayInterruptIfRunning 为false
NEW - > CANCELLED - step3.为NEW状态,如果 mayInterruptIfRunning 为true
NEW -> INTERRUPTING -> INTERRUPTED
并会对运行任务的runner进行中断
6.为什么FutureTask不再基于AQS
ThreadPoolExecutor executor = ...;
executor.submit(task1).cancel(true);
executor.submit(task2);
虽然中断的是task1,但可能task2得到中断信号。
JDK1.6的 FutureTask.Sync.innerCancel的代码:
boolean innerCancel(boolean mayInterruptIfRunning) {
for (;;) {
int s = getState();
if (ranOrCancelled(s))
return false;
if (compareAndSetState(s, CANCELLED))
break;
}
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null) //1
r.interrupt(); //2
}
releaseShared(0);
done();
return true;
}
按照如下的执行流程,task2得到中断信号:
- step1.主线程调用cancel(true)想取消task1,执行完1处检查后,停住
- step2.Thread1执行完task1,开始执行task2
- step3.主线程此时继续执行2处的r.interrupt(),那么task2将会被中断
看看新版本怎么处理这个中断遗留问题:
cancel代码:
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
run最后的代码:
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
/**
* Ensures that any interrupt from a possible cancel(true) is only
* delivered to a task while in run or runAndReset.
*/
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
可知run最后对此进行了特别处理:
- 如果该任务已经被取消,且未被取消完成,则handlePossibleCancellationInterrupt(s)
- handlePossibleCancellationInterrupt其实很简单,就是在主线cancel()完成之前,在这里自旋,Thread.yield()让执行取消的主线程更容易获得执行机会。
这里根本就是在主线程执行取消指定任务时,让执行该取消任务的线程自旋等待 主线程中断操作 完成。
网友评论