关于Thread和Runnable的区别:
1)从对象编程来说:Thread是继承;Runnable是组合,会比继承耦合性低,更加灵活。
2)从对象共享角度:Runnable实例可以由过个线程实例共享,会产生并发问题。
3)对象创建成本:Thread在创建的时候JVM就会为其分配调用栈空间,内核线程等资源;而Runnable是普通的类,作为参数传给Thread,所以Runnable创建成本相对较低。
关于用户线程(User)和守护线程(Daemon):
用户线程会阻止JVM正常停止;
守护线程不会影响JVM正常停止,所以守护线程通常用于执行一些重要性不高的任务。
关于Callable和Runnable的区别:
Callable方式需要FutureTask实现类的支持,用于接收运算结果。
FutureTask是Future接口的实现类,且FutureTask也可用于闭锁的操作,因为get() 会阻塞当前线程直到Callable返回结果。
FutureTask
FutureTask提供了支持cancel的异步计算方式,它实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable和Future接口。
FutureTask可以用来包装一个Callable和Runnable对象,且因为实现了Runnable,所以FutureTask可以被提交到线程池处理。
源码分析
因为FutureTask实现了Future接口,所以它会实现Future接口的相关方法,比如说get(), cancel()等等。
成员变量以及构造函数
有三个volatile成员变量,会通过UNSAFE类来进行CAS操作。
另外定义了state变量对应的7中状态。
callable变量意味着FutureTask会将Runnable也封装成Callable来处理。
outcome则是返回这或者异常的信息
private volatile int state;
private static final int NEW = 0;//初始是NEW
private static final int COMPLETING = 1;//在set()和setException()里面先CAS更新成COMPLETING,加锁操作
private static final int NORMAL = 2;//set()方法执行成功从COMPLETING到NORMAL
private static final int EXCEPTIONAL = 3;//setException()方法执行成功从COMPLETING到EXCEPTIONAL
private static final int CANCELLED = 4;//cancel(false)的时候
private static final int INTERRUPTING = 5;//cancel(true)的时候
private static final int INTERRUPTED = 6;//cancel(true)最终状态
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
构造函数
FutureTask内部有Callable类型的成员变量,所以Runnable会通过一个适配器RunnableAdapter,转换成Callable,内部还是调用的Runnable的run()方法。
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
//接受Runnable的构造函数
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
//适配器将Runnable转换成Callable
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
//Adapter类,Callable的call方法调用Runnable的run方法。
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
run()
run()方法主要是调用Callable的call方法,如果有异常则通过setException(Throwable t)设置异常;如果没有异常,则通过set(V result)方法,来设置返回值。且这两个set方法最终都会调用finishCompletion()来unpark唤醒WaitNode节点里的等待线程去获得结果。
run()方法涉及到的方法有setException(), set(), finishCompletion(), handlePossibleCancellationInterrupt()。
涉及到的state有COMPLETING, EXCEPTIONAL, NORMAL
且在call()方法返回之前,都是NEW状态,这样cancel()才会有机会。
且run方法最好检查INTERRUPTING状态,会在handlePossibleCancellationInterrupt自旋直到cancel()方法结束,state变成INTERRUPTED状态(下面代码里分析)。
public void run() {
//如果state不是NEW或者CAS更新runner成员变量失败,则直接return。
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//直接调用Callable的call方法并拿到result
result = c.call();
ran = true;
} catch (Throwable ex) {
//如果call()方法抛出异常,ran=false
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
//如果抛出异常,则调用该方法,更新state状态到COMPLETING,
//将Throwable赋值给outcome变量,再更新state到EXCEPTIONAL.
//最后finishCompletion()唤醒WaitNode中等待线程节点
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
//因为上面更新到COMPLETING成功,这边更新EXCEPTIONAL则不需要CAS操作,而是putOrderedInt内存操作
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
//先通过CAS获得更新资格,将waiter变量更新为null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
//拿到q对应的线程,更新为null,并唤醒该线程
q.thread = null;
LockSupport.unpark(t);
}
//然后操作q的后继节点
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//done()方法由子类实现
done();
callable = null; // to reduce footprint
}
//如果没有异常,更新state到COMPLETING先,然后将result设置到outcome,更新state到NORMAL
//最后finishCompletion唤醒等待线程
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
//final的时候会检查一下state是不是被中断,如果一直在中断过程中,则当前线程yield让出CPU
//这边自旋等待调用中断的线程执行完毕,因为此时run方法已经结束,runner也被重置为null
// 所以别的线程可能有机会提交这个FutureTask,而执行run方法,这时候cancel可能被应用到不同的Task上。
// 所以这里要自旋直到cancel()方法执行结束
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
cancel()
cancel的时候有几种情况:
1.任务还没开始,直接返回false
2.任务已经开始:
2.1调用cancel(false),就是最后调用finishCompletion()来唤醒等待线程
2.2调用cancel(true),则通过runner获得当前运行线程,调用运行线程的Interrupt()方法来设置目标线程的中断标记位为true
这里涉及到的state状态有INTERRUPTING, CANCELLED, INTERRUPTED。
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
//如果参数是true,则更新为INTERRUPTING状态,否则CANCELLED状态
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();//通过设置中断标志位来控制目标线程的生命周期
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
get()
如果state的状态还没到COMPLETING,就在awaitDone()里面通过LockSupport.park()挂起当前线程,直到run()方法执行结束唤醒等待线程。
异常:
1.get()方法响应异常中断,会抛出InterruptedException。
2.且如果是get(long timeout, TimeUnit unit)方法,还会抛出TimeoutException。
3.另外如果Callable的call方法没有捕捉异常而抛出异常,则get()方法会抛出ExecutionException。FutureTask的run()方法在执行Callable的call()方法时,会将call抛出的异常捕获包装成ExecutionException,这意味着call()方法出现异常,不会直接导致执行线程运行结束(相比正常的Thread的run()方法抛出异常可能导致执行线程提前终止生命周期)。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//如果state还没到COMPLETING,则开始挂起当前线程
s = awaitDone(false, 0L);
//run()方法结束之后会唤醒当前线程,去拿到执行结果。
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//判断当前线程是否被中断
if (Thread.interrupted()) {
//如果被中断,则需要把waitNode的节点都垃圾回收掉,抛出InterruptedException
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
//如果已经结束,则将waitNode的thread设置为null,并返回state值
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//COMPLETING表示set开始,快结束了,则让执行线程让出CPU等待一下
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//这里s就还没到COMPLETING,如果waitNode是null,则创建WaitNode
else if (q == null)
q = new WaitNode();
//如果q不为null,则需要添加到WaitNode的next后继节点
else if (!queued)
//这个UNSAFE操作,先更新q的next=waiter,再将qCAS更新到waiters变量
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//如果是有等待时间的,则在等待时间过后,removeWaiter()来处理等待的node
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
//处理最终结果
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
//如果state是NORMAL,则直接返回outcome值
return (V)x;
if (s >= CANCELLED)
//则就是cancel()使得线程被Interrupt了或者cancel了,抛出CancellationException
throw new CancellationException();
//否则就是抛出异常了
throw new ExecutionException((Throwable)x);
}
因为是无线循环,所以里面的q = new WaitNode()和UNSAFE更新操作在park之前都会做一边,即park之前会将当前线程封装到WaitNode链表里。
网友评论