美文网首页Android开发Android开发Java知识
Callable创建Thread的源码解析

Callable创建Thread的源码解析

作者: 巴黎没有摩天轮Li | 来源:发表于2020-08-30 11:18 被阅读0次

    前言

    Java线程的Thread创建共有三种方式,两种类别。第一种类别是没有返回参数的,另一种是具备获取线程返回结果的。第一种类别有两种创建方式,继承Thread类、实现Runnable接口。第二种类别是使用Callable接口通过中间人FutureTask类处理使用。这里我只说第二种的方式创建线程。

    使用方式

     int i = Runtime.getRuntime().availableProcessors();
     // 创建线程池
     ThreadPoolExecutor executor = new ThreadPoolExecutor(i * 2 + 1, Integer.MAX_VALUE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());
     FutureTask<String> stringFutureTask = new FutureTask<>(() -> {
         for (int j = 0; j < 5; j++) {
             System.out.println(j);
             Thread.sleep(1000);
         }
         return "执行完成";
     });
      // 1
     executor.submit(stringFutureTask);
     try {
          // 阻塞主线程 等待子线程处理完毕后释放
         System.out.println(stringFutureTask.get());
     } catch (ExecutionException e) {
         e.printStackTrace();
     }
    System.out.println("主线程");
    

    打印结果

    0
    1
    2
    3
    4
    执行完成
    主线程
    

    源码分析

    先简单看下代码1处线程池的submit()方法,看是实现ExecutorService这个接口

    public interface ExecutorService extends Executor {
        // 具备返回值的 Callable入参方法
        <T> Future<T> submit(Callable<T> task);
        <T> Future<T> submit(Runnable task, T result);
        // 非具备返回值的Callable方法
        Future<?> submit(Runnable task);
    }
    

    由此可见,入参Callable接口的方法是带有泛型返回值的方法,而入参为Runnable接口的没有返回值,第二个方法的第二个入参result没什么太大用处,暂且不谈。

    Callable

    public interface Callable<V> {
        V call() throws Exception;
    }
    

    嗯.. 带返回值的方法。
    Ok, 我们来看下FutureTask到底是什么鬼。

    FutureTask

    public class FutureTask<V> implements RunnableFuture<V> {}
    

    实现了RunnableFuture,看看RunnableFuture接口是什么。

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();
    }
    

    喔?接口多继承了Runnable与Future接口,原来FutureTask类本身就实现了Runnable接口了哇。我好想明白了什么...线程池在调用execute的时候,一定调用了FutureTask中的run()方法。
    再来看看Future接口都有什么秘密。

    public interface Future<V> {
        // 取消FutureTask任务 当任务已经完成了就无法取消了
        boolean cancel(boolean mayInterruptIfRunning);
        // 是否任务已经取消
        boolean isCancelled();
        // 是否任务执行完成
        boolean isDone();
        // 获取任务返回参数 无限阻塞所在线程
        V get() throws InterruptedException, ExecutionException;
        // 带有阻塞超时时间的获取任务返回参数 建议使用这个方法
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    噢噢,原来是FutureTask的一些调度方法啊。FutureTask骨架是明朗了,FutureTask实现了Runnable接口,并也实现了调度Future接口。我们回到FutureTask类。

    public class FutureTask<V> implements RunnableFuture<V> {
        private volatile int state;
        private static final int NEW          = 0;// 线程任务的创建
        private static final int COMPLETING   = 1;// 任务执行中 
        private static final int NORMAL       = 2;// 任务执行完毕
        private static final int EXCEPTIONAL  = 3;// 任务执行异常
        private static final int CANCELLED    = 4;// 任务取消成功
        private static final int INTERRUPTING = 5;// 任务被打断中
        private static final int INTERRUPTED  = 6;// 任务被打断成功
    
        private Callable<V> callable; // 构造传入的Callable接口
      
        private Object outcome;// 异步线程返回结果
       
        private volatile Thread runner;// 当前任务所运行的线程
        
        private volatile WaitNode waiters;// 记录当调用get方法时,阻塞的线程
    
        // 记录任务所在的线程
        static final class WaitNode {
            volatile Thread thread;
            volatile WaitNode next;
            WaitNode() { thread = Thread.currentThread(); }
        }
    
        // 构造1 没有什么可说的 直接将Callable接口赋值
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW; 
        }
    
        // 构造2 
        public FutureTask(Runnable runnable, V result) {
            // 1
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;  
        }
    
          // 执行任务 run 方法
          public void run() {
            // 状态是创建过程的话,通过CAS 将当前线程赋值给成员变量runner 若失败就直接return
            if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
                return;
            try {
                // 临时变量接一下
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    // 返回值
                    V result;
                    boolean ran;
                    try {
                        // 调用call方法执行
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        // 捕捉异常
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    // 接收成功后将result值赋值给成员变量outcome CAS方式
                    if (ran)
                        set(result);
                }
            } finally {
                runner = null;
                int s = state;
                
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    
    

    我们看下构造方法2 中的代码1处,发掘这里使用了适配器模式,若使用的是构造方法二的初始化方式,则将Runnable转化成了Callable。

    public static Callable<Object> callable(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        // result 默认为null
        return new RunnableAdapter<Object>(task, null);
    }
    
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        // result 是我们初始化传入的,其实并无太大作用
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }
    

    ok, 看到这里我们知晓了,原来FutureTask为了能够实现Callable方法具备返回值,整合了Runnable与Callable,这样的话本质上还是执行Runnable方法。

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        // 这里若当前任务还在执行中,并且过了超时时间,直接抛出Timeout异常。
        if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
    

    核心awaitDone()方法

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            // 等待线程队列
            WaitNode q = null;
            boolean queued = false;
            // 开辟个死循环
            for (;;) {
                // 若当前线程已经被打断了
                if (Thread.interrupted()) {
                    // 从等待线程队列中移除 并抛出异常
                    removeWaiter(q);
                    throw new InterruptedException();
                }
                // 获取当前状态
                int s = state;
                // 若执行完成
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    // 返回当前状态
                    return s;
                }
                // 若当前任务正在执行 
                else if (s == COMPLETING) // cannot time out yet
                    // 1 既然正在执行,就暂时不一直for循环占有CPU资源,将其让出 并同时和其他线程竞争
                    Thread.yield();
                else if (q == null)
                    // 这里只会执行一次
                    q = new WaitNode();
                else if (!queued)
                    // 如果是排队 就将等待队列next 指向当前的等待线程
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
                else if (timed) {
                    // 计算等待超时时间 截止时间减去当前时间
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        // 超时了
                        removeWaiter(q);
                        return state;
                    }
                    // 没有过超时时间就让线程等待
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    LockSupport.park(this);
            }
        }
    

    cancel()方法

    public boolean cancel(boolean mayInterruptIfRunning) {
            if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                      mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
            try {    // in case call to interrupt throws exception
                if (mayInterruptIfRunning) {
                    try {
                        // 直接将当前线程打断
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();
                    } finally { // final state
                        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                    }
                }
            } finally {
                finishCompletion();
            }
            return true;
        }
    

    最终report()

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
    

    将成员变量outcome返回,这个outcome赋值是在run()方法调用完c.call()方法后进行的。

    总结

    FutureTask 两种构造器,最终都转化成了 Callable,所以在 run 方法执行的时候,只需要执行 Callable 的 call 方法即可,在执行 c.call() 代码时,如果入参是 Runnable 的话, 调用路径为 c.call() -> RunnableAdapter.call() -> Runnable.run(),如果入参是 Callable 的话,直接调用。

    通过 FutureTask 把 Runnnable、Callable、Future 都串起来了,使 FutureTask 具有三者的功能,统一了 Runnnable 和 Callable,更方便使用。

    相关文章

      网友评论

        本文标题:Callable创建Thread的源码解析

        本文链接:https://www.haomeiwen.com/subject/iqkjjktx.html