美文网首页
Callable实现子线程获取函数返回值

Callable实现子线程获取函数返回值

作者: Cris_Ma | 来源:发表于2019-05-28 14:15 被阅读0次

    Callable接口

    Java中的子线程通常是通过Thread或者Runnable的方式实现的,但是这种方式只能通过回调,或者共享变量等方式来传递数据,而Callable则是可以获取返回结果的一种子线程实现方式。

    Callable是一个接口,源码如下:

    public interface Callable<V> {
        V call() throws Exception;
    }
    

    非常简单,只有一个方法,和一个泛型V,所以我们创建Callable对象的时候,也只需要指定返回类型并实现call方法就可以了。

    Future接口

    看完了Callable接口,会发现它非常简单,没有办法在子线程中直接通过它来获取到返回结果的,这时候就需要Future发挥作用了。源码如下:

    public interface Future<V> {
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws InterruptedException, ExecutionException;
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    
    • boolean cancel(boolean mayInterruptIfRunning) 在任务开始前取消,传入值表示任务开始后是否允许打断,返回值表示是否取消成功(任务已经开始不允许打断,已经运行结束,已经取消等等状态会返回失败)
    • boolean isCancelled() 是否已经被取消
    • boolean isDone() 是否完成任务
    • V get() 尝试获取返回结果,阻塞方法
    • V get(long timeout, TimeUnit unit) 同上,可以指定超时时间

    可以看到,Future实际上可以理解为Callable的管理类。

    在线程池中执行任务时,除了execute方法之外,还有一个submit方法:

        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    

    它返回的就是一个Future对象,可以通过它来回去Callable任务的执行结果:

    
    Callable<Integer> callable = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    System.out.println("Sub Thread is calculating...");
                    Thread.sleep(10000);
                    return 10;
                }
            };
            
    Future<Integer> future = Executors.newCachedThreadPool().submit(callable);
        try {
                System.out.println("Main Thread start waiting result... ");
    
                int res = future.get();
                System.out.println("Main Thread get result: " + res);
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
    //运行结果:
    Main Thread start waiting result... 
    Sub Thread is calculating...
    Main Thread get result: 10
    

    FutureTask

    如果不用Java提供的线程池,直接用Thread怎样在子线程中运行Callable呢? 这时候就要用到FutureTask类了。

    FutureTask实现了FutureRunnable接口,这就意味着,它既可以放在Thread中去运行,又能够对任务进行管理,下面是源码:

    
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
        }
    
        public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }
    

    可以看到,构造函数中传入了待运行的任务Callable对象或者Runnable对象和指定的返回结果,V result用来指定运行完成后的返回值,如果不想用指定值,可以用Future<?> f = new FutureTask<Void>(runnable, null)来返回null。采用Callable构造方法创建的FutureTask对象,执行完毕返回的是实际运算结果,而Runnable 构造函数返回值是传入的result。

    task的状态

    FutureTask中持有的任务对象,有以下几种状态:

        private static final int NEW          = 0; //新建或运行中
        private static final int COMPLETING   = 1;//任务运行结束,正在处理一些后续操作
        private static final int NORMAL       = 2;//任务已经完成,COMPLETING的下一个状态
        private static final int EXCEPTIONAL  = 3;//任务抛出异常,COMPLETING的下一个状态
        private static final int CANCELLED    = 4;//任务被取消
        private static final int INTERRUPTING = 5;//收到打断指令,还没有执行interrupt
        private static final int INTERRUPTED  = 6;//收到打断指令,也执行了interrupt
    

    可能的状态变化主要有以下几种:

         * Possible state transitions:
         * NEW -> COMPLETING -> NORMAL
         * NEW -> COMPLETING -> EXCEPTIONAL
         * NEW -> CANCELLED
         * NEW -> INTERRUPTING -> INTERRUPTED
    

    task的执行过程

    FutureTask实现了Runnable接口,是可以直接放在Thread中执行的,实际上运行的就是它的run方法:

    public void run() {
        //r如果当前状态不是NEW,说明任务已经执行完成了,直接返回
        //如果当前状态是NEW,尝试用CAS方式将当前线程赋值给RUNNER,赋值前RUNNER的值应该是null,否则赋值失败
        //赋值失败表示已经有线程执行了run方法,直接返回
            if (state != NEW ||
                !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        //运行中抛出异常
                        setException(ex);
                    }
                    //ran为true,说明正常运行结束,得到了返回结果
                    if (ran)
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    

    执行结果其实是比较简单的,通过RUNNER来记录执行任务的线程,从而保证只有一个线程可以执行该任务。运行结束后有两个出口:

    • setException(ex); 运行中出错,抛出异常
    • set(result); 任务执行完毕,获取到返回值
        protected void setException(Throwable t) {
            if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
                outcome = t;
                U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
                finishCompletion();
            }
        }
    
        protected void set(V v) {
            if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
                outcome = v;
                U.putOrderedInt(this, STATE, NORMAL); // final state
                finishCompletion();
            }
        }
    
        private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            //唤醒等待线程
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            done();
    
            callable = null;        // to reduce footprint
        }
    

    这两个方法实际上是一样的,都是将状态赋值为COMPLETING,然后保存结果(运行结果或错误信息),再执行finishCompletion方法,通知WAITERS里记录的等待线程继续执行,并清空WAITERS

    获取返回结果

    获取返回结果是通过get()方法:

        public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }
    
        private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            // The code below is very delicate, to achieve these goals:
            // - call nanoTime exactly once for each call to park
            // - if nanos <= 0L, return promptly without allocation or nanoTime
            // - if nanos == Long.MIN_VALUE, don't underflow
            // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
            //   and we suffer a spurious wakeup, we will do no worse than
            //   to park-spin for a while
            long startTime = 0L;    // Special value 0L means not yet parked
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING)
                    // We may have already promised (via isDone) that we are done
                    // so never return empty-handed or throw InterruptedException
                    Thread.yield();
                else if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
                else if (q == null) {
                    if (timed && nanos <= 0L)
                        return s;
                    q = new WaitNode();
                }
                else if (!queued)
                //cas机制,将新建节点q的next指向原来的节点workers,然后将workers更新为新建的节点。workers(WAITERS)实际上就是持有了所有等待线程的一个链表
                    queued = U.compareAndSwapObject(this, WAITERS,
                                                    q.next = waiters, q);
                else if (timed) {
                    final long parkNanos;
                    if (startTime == 0L) { // first time
                        startTime = System.nanoTime();
                        if (startTime == 0L)
                            startTime = 1L;
                        parkNanos = nanos;
                    } else {
                        long elapsed = System.nanoTime() - startTime;
                        if (elapsed >= nanos) {
                            removeWaiter(q);
                            return state;
                        }
                        parkNanos = nanos - elapsed;
                    }
                    // nanoTime may be slow; recheck before parking
                    if (state < COMPLETING)
                        LockSupport.parkNanos(this, parkNanos);
                }
                else
                    LockSupport.park(this);
            }
        }
    
        static final class WaitNode {
            volatile Thread thread;
            volatile WaitNode next;
            WaitNode() { thread = Thread.currentThread(); }
        }
    

    get()方法比较简单,先判断当前状态,如果状态 < COMPLETING,说明任务没有执行完毕,直接调用awaitDone方法。

    awaitDone方法可以接受两个参数,用来指定是否设置超时时间。它内部是一个无限for循环。下面是awaitDone方法的执行步骤(忽略超时设置):

    1. 进入awaitDone方法时,state一定是小于COMPLETING的,第一次会走else if (q == null)分支,创建一个WaitNode()对象用来保存当前线程

    2. 第二次循环q已经不是null了,如果任务仍然没有结束,会执行else if (!queued)分支,queued表示创建的WaitNode()是否已经添加到链表里,如果没有尝试添加,直到添加成功为止。

    3. 等待线程添加成功以后进入下一个循环,此时如果任务仍然没有结束,会走到else分支,挂起当前线程(阻塞)

    4. 此处阻塞的是等待结果的线程,也就是调用FutureTaskget()方法的线程,而不是执行任务的线程。阻塞线程用的是LockSupport.park(this)方法,唤醒的方法是LockSupport.unpark(),该方法在上边的finishCompletion()中出现了,也就是说,任务执行结束(运行完,抛出异常,被取消)时,等待的线程才会被唤醒,继续下一次循环。

    5. 任务结束以后,如果state是COMPLETING状态,说明一些清理任务还没有执行完,等待的线程会让出cpu,让其他线程优先执行

    6. 直到state 大于COMPLETING,说明FutureTask已经完全结束了,此时会会执行(s > COMPLETING)分支,把节点置空,并返回。

    awaitDone返回以后,说明任务已经执行完成了,会进入report方法:

        private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                return (V)x;
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
    

    可以看到,如果是正常结束,或者抛出异常结束,会返回结果,而如果是被取消,则会抛出异常。

    总结

    1. FutureTask可以视为一个管理Callable任务的工具类,执行Callable任务的是FutureTaskrun方法,所以,可以通过 new Thread(futuretask)的方法来实现子线程执行任务

    2. 获取执行结果是通过FutureTaskget方法,调用该方法后,如果线程会被挂起,知道任务结束为止

    3. 获取结果的线程数量没有限定,可以是任意个线程

    4. 获取结果的线程被挂起以后,可以通过取消,超时等方法在任务执行完毕以前结束挂起状态。

    相关文章

      网友评论

          本文标题:Callable实现子线程获取函数返回值

          本文链接:https://www.haomeiwen.com/subject/gifdtctx.html