美文网首页
Java多线程编程:FutureTask异步任务详解

Java多线程编程:FutureTask异步任务详解

作者: 江溢jonny | 来源:发表于2018-03-17 18:49 被阅读0次

    简书江溢Jonny,转载请注明原创出处,谢谢!

    本文内容将基于JDK1.7的源码进行讨论,并且在文章的结尾,笔者将会给出一些经验之谈,希望能给学习者带来些帮助。

    关注我的公众号,获得更多干货~


    举个例子

    我们以一个例子开始开始本文内容。

    有一个作家,他准备开始写作,写作时间大约1个小时,作家想“那就在写作的时候顺便煮一些食物”,写作完刚好吃一点热食物。煮食物的时间我们假设是2个小时,那么煮食物的这个过程就是一个“异步任务”,我们把它用代码实现出来:

    public static class Food implements Callable<String>{
    
        public String call() {
            System.out.println("hot food starts");
            try {
                // 煮食物ing
                Thread.sleep(20000l);
            } catch (Exception e) {
                // ignore
            }
            System.out.println("hot food ends");
            return "food is ok";
        }
    }
    
    public static void main(String[] args) {
        System.out.println("writing starts");
        FutureTask<String> futureTask = new FutureTask<String>(new Food());
        // 使用新线程
        Thread thread = new Thread(futureTask);
        thread.start();
        try {
            // 写作ing
            Thread.sleep(20000l);
        } catch (Exception e) {
            // ignore
        }
        System.out.println("writing ends");
    
        try {
            String result = futureTask.get();
            System.out.println(result);
        } catch (Exception e) {
            // ignore
        }
    }
    

    为什么要异步

    有些时候,为了快速响应,或者节省任务执行时间,有些任务是可以并行执行的。
    举个例子,我们正在执行某个计算的时候,需要通过http请求获得某个远程服务的结果,而计算过程也是一个耗时操作,可以在计算开始前先发起一个异步任务做http请求,在需要使用到远程服务结果的位置,查看当前异步任务是否已经执行完成,可以做到两件事情同步进行,缩短了任务执行时间。

    异步任务

    再举个例子,在一个客户端程序里面,包含了“提交”和“取消”两个功能,在应用点击“提交”开始执行以后,将立马发起一个异步线程,开始执行任务,但是此时客户端用户仍然可以随便操作,并不会就此卡住,在任务正常执行完可以在窗口显示执行结果。当任务执行完前,用户点击“取消”以后,异步任务将被取消,后台线程就停止了。

    FutureTask源码分析

    FutureTask实现了Future的接口,它的计算实际上是通过Callable接口来实现的,相当于一种可以生成结果的Runnable。
    那我们一起来看看在JDK 1.7里面,是怎么实现这个异步任务的。

    状态码

    FutureTask任务执行的核心在内部类Sync类中,在Sync类的内部,包含了以下几种状态码:
    READY:FutureTask任务创建成功以后,初始状态码;
    RUNNING:任务开始启动以后的状态码;
    RAN:无论是任务执行成功还是任务执行过程中抛了异常,都将走入到该状态码;
    CANCELLED:任务执行过程中被调用innerCancel取消后,进入该状态码;

    状态码时序图

    其他信息

    除此之外,Sync还包含了其他信息:
    执行结果:在Sync类中用result字段表示任务执行结果;
    异常:用该字段表示任务执行过程中抛出的异常信息;

    Sync数据结构定义大致如下:

    // 状态码定义在AbstractQueuedSynchronizer中
    private final class Sync extends AbstractQueuedSynchronizer {
            // 执行结果
            private V result; 
            // 异常信息
            private Throwable exception;
    }
    

    创建任务

    源码如下:

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable); 
    }
    

    参数是一个Callable类型的接口,这个接口不同于Runnable,是有返回值的,定义如下:

    public interface Callable<V> {
        V call() throws Exception;
    }
    

    然后我们一起来看看整个FutureTask任务的执行过程。

    任务执行

    任务开始执行后,程序源码如下:

    void innerRun() {
        // 把任务状态从READY改成RUNNING
        if (!compareAndSetState(READY, RUNNING))
            return;
    
        runner = Thread.currentThread();
        if (getState() == RUNNING) {
            V result;
            try {
                // 异步任务开始启动
                result = callable.call();
            } catch (Throwable ex) {
                // 这里调用了innerSetException方法
                setException(ex); 
                return;
            }
            // 任务执行成功后,设置任务执行结果
            set(result);
        } else {
            releaseShared(0); // cancel
        }
    }
    

    我们再来看看,在任务执行成功以后,set方法都做了什么事情:

    protected void set(V v) {
        sync.innerSet(v);
    }
    
    class Sync {
        ...
        void innerSet(V v) {
            for (;;) {
                // 获得当前任务状态
                int s = getState();
                if (s == RAN)
                    return;
                if (s == CANCELLED) {
                    releaseShared(0);
                    return;
                }
    
                // 把任务状态设置为"RAN"(已完成)
                if (compareAndSetState(s, RAN)) {
                    result = v;
                    releaseShared(0);
                    // 这实际上是需要开发者实现的钩子方法
                    done(); 
                    return;
                }
            }
        }
    }
    

    任务取消

    任务在执行的过程中,可以选择取消执行,比如,一个查询同时从多个网址查询搜索结果,然而产品的需求是只需要返回其中一个搜索结果,因此,当有任务已经完成了搜索结果,那么其他查询线程就无需继续执行了,因此可以发起cancel的操作,减少网络消耗。

    boolean innerCancel(boolean mayInterruptIfRunning) {
        for (;;) {
            int s = getState();
            // 任务可能此时已被取消,或者已经执行完成
            if (ranOrCancelled(s))
                return false;
            if (compareAndSetState(s, CANCELLED))
                break;
        }
        if (mayInterruptIfRunning) {
            Thread r = runner;
            if (r != null)
                // 如果任务还在执行,尝试中断
                r.interrupt();
        }
        releaseShared(0);
        done();
        return true;
    }
    

    结果获取

    我们再看看获取任务执行结果的get两个方法,一个是不带阻塞时间的get()方法和另外一个带了阻塞时长的get(long timeout, TimeUnit unit)方法。这两个方法对应源码如下:

    // 不带阻塞时长
    public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }
    
    // 带阻塞时长
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return sync.innerGet(unit.toNanos(timeout));
    }
    

    再看看两个方法对应的sync.innerGet方法:

    V innerGet() throws InterruptedException, ExecutionException {
        // 阻塞式等待,但该方法可以被中断
        acquireSharedInterruptibly(0);
        // 如果此时任务已经被取消了,那么将抛一个异常出来
        if (getState() == CANCELLED)
            throw new CancellationException();
        // 任务执行过程中有异常,重新抛出异常
        // 该异常是在innerSetException方法中设置的
        if (exception != null)
            throw new ExecutionException(exception);
        return result;
    }
    
    V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
        if (!tryAcquireSharedNanos(0, nanosTimeout))
            throw new TimeoutException();
        // 如果此时任务已经被取消了,那么将抛一个异常出来
        if (getState() == CANCELLED)
            throw new CancellationException();
        // 任务执行过程中有异常,重新抛出异常
        // 该异常是在innerSetException方法中设置的
        if (exception != null)
            throw new ExecutionException(exception);
        return result;
    }
    
    

    以上就是FutureTask的源码解读,不过FutureTask内容比较简单,不包含AQS源码只有大约400行。接下来我来简单讲讲在使用过程中的一些经验。

    经验之谈

    搭配线程使用

    FutureTask仅仅是一个任务执行框架,在执行过程中并没有创建一个新的线程,在本文最初的实例中,我仅仅是创建了一个新的Thread类,并启动该Thread类,当然你们也可以搭配ThreadPoolExecutor线程池使用。说到这里,ExecutorService类正是如此使用的:

    public abstract class AbstractExecutorService implements ExecutorService {
        ...
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            // 创建一个新的FutureTask
            RunnableFuture<T> ftask = newTaskFor(task, result);
            // 执行该FutureTask
            execute(ftask);
            return ftask;
        }
        ...
    }
    

    做好线程中断策略

    你们不要以为,FutureTask提供了cancel方法,任务就一定能被取消,而实际上,底层还是依赖Thread提供的interrupt方法,因此,为了实现cancel功能,需要线程能够主动响应中断。
    换句话说,如果任务不检查中断取消标志,可能任务永远也不会结束。
    所以对中断的一个正确理解是:它不会真正地中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时候中断自己。当然,JDK中有一些库函数可以主动响应这些中断,如Thread.sleep和BlockingQueue.put方法等。

    以上就是全部内容了,如果你喜欢,欢迎关注我的公众号~
    这是给我不断写作的最大鼓励,谢谢~


    相关文章

      网友评论

          本文标题:Java多线程编程:FutureTask异步任务详解

          本文链接:https://www.haomeiwen.com/subject/fowrqftx.html