前言
Java线程的Thread创建共有三种方式,两种类别。第一种类别是没有返回参数的,另一种是具备获取线程返回结果的。第一种类别有两种创建方式,继承Thread类、实现Runnable接口。第二种类别是使用Callable接口通过中间人FutureTask类处理使用。这里我只说第二种的方式创建线程。
使用方式
int i = Runtime.getRuntime().availableProcessors();
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(i * 2 + 1, Integer.MAX_VALUE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
FutureTask<String> stringFutureTask = new FutureTask<>(() -> {
for (int j = 0; j < 5; j++) {
System.out.println(j);
Thread.sleep(1000);
}
return "执行完成";
});
// 1
executor.submit(stringFutureTask);
try {
// 阻塞主线程 等待子线程处理完毕后释放
System.out.println(stringFutureTask.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("主线程");
打印结果
0
1
2
3
4
执行完成
主线程
源码分析
先简单看下代码1处线程池的submit()方法,看是实现ExecutorService这个接口
public interface ExecutorService extends Executor {
// 具备返回值的 Callable入参方法
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
// 非具备返回值的Callable方法
Future<?> submit(Runnable task);
}
由此可见,入参Callable接口的方法是带有泛型返回值的方法,而入参为Runnable接口的没有返回值,第二个方法的第二个入参result没什么太大用处,暂且不谈。
Callable
public interface Callable<V> {
V call() throws Exception;
}
嗯.. 带返回值的方法。
Ok, 我们来看下FutureTask到底是什么鬼。
FutureTask
public class FutureTask<V> implements RunnableFuture<V> {}
实现了RunnableFuture,看看RunnableFuture接口是什么。
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
喔?接口多继承了Runnable与Future接口,原来FutureTask类本身就实现了Runnable接口了哇。我好想明白了什么...线程池在调用execute的时候,一定调用了FutureTask中的run()方法。
再来看看Future接口都有什么秘密。
public interface Future<V> {
// 取消FutureTask任务 当任务已经完成了就无法取消了
boolean cancel(boolean mayInterruptIfRunning);
// 是否任务已经取消
boolean isCancelled();
// 是否任务执行完成
boolean isDone();
// 获取任务返回参数 无限阻塞所在线程
V get() throws InterruptedException, ExecutionException;
// 带有阻塞超时时间的获取任务返回参数 建议使用这个方法
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
噢噢,原来是FutureTask的一些调度方法啊。FutureTask骨架是明朗了,FutureTask实现了Runnable接口,并也实现了调度Future接口。我们回到FutureTask类。
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;// 线程任务的创建
private static final int COMPLETING = 1;// 任务执行中
private static final int NORMAL = 2;// 任务执行完毕
private static final int EXCEPTIONAL = 3;// 任务执行异常
private static final int CANCELLED = 4;// 任务取消成功
private static final int INTERRUPTING = 5;// 任务被打断中
private static final int INTERRUPTED = 6;// 任务被打断成功
private Callable<V> callable; // 构造传入的Callable接口
private Object outcome;// 异步线程返回结果
private volatile Thread runner;// 当前任务所运行的线程
private volatile WaitNode waiters;// 记录当调用get方法时,阻塞的线程
// 记录任务所在的线程
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
// 构造1 没有什么可说的 直接将Callable接口赋值
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
// 构造2
public FutureTask(Runnable runnable, V result) {
// 1
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
// 执行任务 run 方法
public void run() {
// 状态是创建过程的话,通过CAS 将当前线程赋值给成员变量runner 若失败就直接return
if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
// 临时变量接一下
Callable<V> c = callable;
if (c != null && state == NEW) {
// 返回值
V result;
boolean ran;
try {
// 调用call方法执行
result = c.call();
ran = true;
} catch (Throwable ex) {
// 捕捉异常
result = null;
ran = false;
setException(ex);
}
// 接收成功后将result值赋值给成员变量outcome CAS方式
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
我们看下构造方法2 中的代码1处,发掘这里使用了适配器模式,若使用的是构造方法二的初始化方式,则将Runnable转化成了Callable。
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
// result 默认为null
return new RunnableAdapter<Object>(task, null);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
// result 是我们初始化传入的,其实并无太大作用
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
ok, 看到这里我们知晓了,原来FutureTask为了能够实现Callable方法具备返回值,整合了Runnable与Callable,这样的话本质上还是执行Runnable方法。
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
// 这里若当前任务还在执行中,并且过了超时时间,直接抛出Timeout异常。
if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
核心awaitDone()方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 等待线程队列
WaitNode q = null;
boolean queued = false;
// 开辟个死循环
for (;;) {
// 若当前线程已经被打断了
if (Thread.interrupted()) {
// 从等待线程队列中移除 并抛出异常
removeWaiter(q);
throw new InterruptedException();
}
// 获取当前状态
int s = state;
// 若执行完成
if (s > COMPLETING) {
if (q != null)
q.thread = null;
// 返回当前状态
return s;
}
// 若当前任务正在执行
else if (s == COMPLETING) // cannot time out yet
// 1 既然正在执行,就暂时不一直for循环占有CPU资源,将其让出 并同时和其他线程竞争
Thread.yield();
else if (q == null)
// 这里只会执行一次
q = new WaitNode();
else if (!queued)
// 如果是排队 就将等待队列next 指向当前的等待线程
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
else if (timed) {
// 计算等待超时时间 截止时间减去当前时间
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
// 超时了
removeWaiter(q);
return state;
}
// 没有过超时时间就让线程等待
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
cancel()方法
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
// 直接将当前线程打断
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
最终report()
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
将成员变量outcome返回,这个outcome赋值是在run()方法调用完c.call()方法后进行的。
总结
FutureTask 两种构造器,最终都转化成了 Callable,所以在 run 方法执行的时候,只需要执行 Callable 的 call 方法即可,在执行 c.call() 代码时,如果入参是 Runnable 的话, 调用路径为 c.call() -> RunnableAdapter.call() -> Runnable.run(),如果入参是 Callable 的话,直接调用。
通过 FutureTask 把 Runnnable、Callable、Future 都串起来了,使 FutureTask 具有三者的功能,统一了 Runnnable 和 Callable,更方便使用。
网友评论