美文网首页
一篇讲明白FeatureTask

一篇讲明白FeatureTask

作者: 一个追寻者的故事 | 来源:发表于2020-09-19 16:02 被阅读0次

    FeatureTask 我之前是真没见过,也没用过。不过不是吃饱撑了没事研究新的类,而是在 Android AsyncTask 的实现中使用到了 FeatureTaskAsyncTask 的实现基本上就是在 FeatureTask基础上套了个壳。所以想理解AsyncTask 必须先理解FeatureTask。那为什么不和AsyncTask一起讲,是因为FeatureTask 是 jdk concurrent 包中为了解决某一类问题(后面会讲)而设计的。另外拆开来讲,有助于更好的理解每块的功能。 好的,废话少说,系好安全带,准备发车了。


    文章目录:

    一、前言
    二、正文

    一、前言

    正式开始之前,为了更好的理解FeatureTask,有必要先了解一下这两个类:LockSupportsun.misc.Unsafe。原因很简单,因为FeatureTask中使用了它们。

    1.1、 LockSupport
    public class LockSupport {
        // 当前线程放弃线程调度,直到获得许可。
        // 如果获得了许可,就会立刻返回。否则线程当前线程放弃线程调度,进入休眠状态。
        // 如下几种情况会被唤醒,从而继续执行:
        // 1.其它线程执行unpark唤醒当前线程
        // 2.其它线程执行 Thread#interrupt 打断当前线程。
        public static void park() 
        // 使线程获取许可,从而继续执行。如果之前线程时blocking,那么它将编程非blocking的。
        public static void unpark(Thread thread) 
    }
    

    功能类似 wait/notify,但是有一些区别:
    1、park/unpark 不要求获取对象的锁。
    2、park 不会释放线程持有的锁。
    3、假如park 时线程处于blocking状态,Thread#interrupt之后不会抛出Exception

    小例子:

    class TestThread(name: String) : Thread(name) {
        override fun run() {
            println("$name: running")
            LockSupport.park()
            if (Thread.interrupted()) {
                println("$name: 被中断了")
            }
            println("$name: 继续执行了")
    
        }
    }
    fun main() {
        val t1 = TestThread("T1")
        val t2 = TestThread("T2")
        t1.start()
        t2.start()
    
        TimeUnit.SECONDS.sleep(5)
        //会使t1获得许可,从而继续执行。
        LockSupport.unpark(t1)      
        // t2被中断,t2处于blocking状态会被唤醒。
        t2.interrupt()
    
        t1.join()
        t2.join()
    }
    

    结果:

    T1: running
    T2: running
    T1: 继续执行了
    T2: 被中断了
    T2: 继续执行了
    

    如果先执行 unpark,再执行park,park执行时会认为获得了许可,立即返回。

    1.2、 sun.misc.Unsafe

    sun.misc.Unsafe 这个类名字 Unsafe - 不安全,它提供了一些可以直接绕过 jvm安全检查的一些机制(例如直接分配内存、回收内存),如果对它的实现不是特别清楚的话,用起来应该是危险的,因此仅开放给了JDK使用,当然非要用也不是不能,利用反射可以使用。下面找几个典型的方法:

    public final class Unsafe {
    // CAS 
    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
    public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
    
    // memory
    public native long allocateMemory(long var1);
    public native void freeMemory(long var1);
    
    public native long objectFieldOffset(Field var1);
    public native long staticFieldOffset(Field var1);
    ...
    }
    

    提供了CPU硬件指令级别的原子操作:compare - and - swap 的支持,我们熟悉的 Atomic 原子类的操作就是基于此实现的。
    还有一些内存相关的操作:分配内存、释放内存
    我们重点关注一下:objectFieldOffset用于获取非静态属性Field在对象实例中的偏移量,然后利用偏移量可以通过 CAS 更新对象实例中的属性,即可实现多线程同步机制,比使用 synchronized 加锁效率要高。使用 CAS 有一种场景:只让第一次修改对象实例中的属性生效,在多线程情况下可以使用synchronized来保证,也可以使用 CAS,我们来看一个例子:

    class Bean {
        // state属性
        private volatile int state;  
        private static final int NEW          = 111;
        private static final int FINISHED     = 112;
        private static final int CANCELED     = 113;
        public Bean(){
            this.state = NEW;
        }
        public void doSomething(){
            U.compareAndSwapInt(this, STATE, NEW, FINISHED);
        }
        public void doSomething2(){
            U.compareAndSwapInt(this, STATE, NEW, CANCELED);
        }
        public int getState(){
            return state;
        }
        private static sun.misc.Unsafe U;
        private static final long STATE;       // state 的偏移地址
        static {
            try {
                // 利用反射获取 Unsafe实例
                Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
                f.setAccessible(true);
                U = (sun.misc.Unsafe) f.get(null);
                
                // 获取 state 属性的偏移量
                STATE = U.objectFieldOffset(Bean.class.getDeclaredField("state"));
            } catch (ReflectiveOperationException e) {
                throw new Error(e);
            }
        }
    }
    
    public static void main(String[] args) {
            Bean b = new Bean();
            b.doSomething();
            b.doSomething2();
            System.out.println("b#state: " + b.getState());
    }
    

    结果:

    b#state: 112
    

    上述例子中 doSomething 、 doSomething2 只有一个方法修改 state 属性生效,即便在多线程情况下,也能保证同步。

    更多关于 sun.misc.Unsafe 的用法自行搜索

    二、正文

    2.1 FeatureTask 是什么,解决什么问题

    我们知道线程的设计是基于 Thread-Runnable 模式的。 即没有办法直接从线程处理的异步任务中返回一个结果。因为Runnable的设计是这样的:

    public interface Runnable {
        public abstract void run();
    }
    

    run方法没有参数、没有返回值。

    FeatureTask 就是一个基于现有的 Thread-Runnable 模式,实现了一个可以获取异步任务返回值的机制。

    FeatureTask类图

    FeatureTask 是一个实现了 RunnableFeature 接口的 具体实现类。
    RunnableFeature 接口 = Runnable 接口 + Feature接口 「没有多出一毛钱」

    也就是说 FeatureTask 首先是一个 Runnable(无参、无返回值)
    再看一下 Feature 的定义就知道了 FeatureTask 是怎么实现的 获取异步任务返回值的。

    // V 代表了返回值类型
    public interface Future<V> {
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        // 获取异步任务处理结果返回值,会一直阻塞调用者。直到异步任务处理结束,或者异步出现异常,又或者调用get()的线程被打断。
        V get() throws InterruptedException, ExecutionException;
        // 增加了超时机制
        V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    Feature 就代表了 异步任务处理 返回结果的。 定义了任务是否已经完成、取消任务 和 获取任务返回值的行为。 即 Feature就是为了解决异步任务返回值而定义的

    也即是说 FeatureTask即是一个普通的Runnable,可以被线程执行。又可以从它身上获取到异步任务处理的结果。

    到此是不是有一瞬间的灵感,大概知道怎么使用 FeatureTask了。类似如下这种:

    // 注意这是示意代码,不能真的运行。
    
    val task = FeatureTask()    // 定义一个Runnable
    val thread = Thread(task)  
    thread.start()  // 开启一个线程
    val result = task.get()  // 等待异步任务处理完成后给出返回结果,此步骤阻塞。
    

    如上示意代码的整体逻辑是没问题的。 接下来就是我该如何把我的异步任务交给 FeatureTask 去执行呢(因为我们知道开启的线程肯定会去执行 FeatureTask#run ),难道在 FeatureTask 构造的时候再传一个 Runnable进去,此Runnable用来封装我们的异步任务代码片段?可是这个Runnable还是没有返回值呀,FeatureTask 也拿不到异步任务返回值,又怎么返回给我们呢。这里引入另一个类:Callable

    public interface Callable<V> {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }
    

    CallableRunnable 比起来的区别是:1、有返回值类型 2、方法可以抛出异常。

    没错。Callable 就是用来替代 Runnable 让我们封装 有返回值的 异步任务代码片段的。我们先看一个 FeatureTask使用的代码示例:

    data class ResultData(var flag: String = "")
    
    class MyCall : Callable<ResultData>{
        override fun call(): ResultData {
            println("this is myCall running, threadName: ${Thread.currentThread().name}")
    
            // do something wasting time
    
            return ResultData("callable")
        }
    }
    
    fun main() {
        val executors = Executors.newCachedThreadPool()
        val task = FutureTask<ResultData>(MyCall())
        executors.execute(task)
        // 获取异步任务返回值, maybe blocking
        val result = task.get()
        println("the call result is : $result")
    }
    

    结果:

    this is myCall running, threadName: pool-1-thread-1
    the call result is : ResultData(flag=callable)
    

    至此,对FeatureTask的使用应该有了一个基本的认识。

    2.2 FeatureTask 实现原理

    原理这块基本上围绕着这个问题进行:
    1、如何实现获取异步任务的结果。
    2、如果异步任务执行过程中出现异常,会怎么处理。
    3、如果在异步任务执行的过程中被取消了,会怎么处理。
    4、get 超时机制如何实现

    如果想要实现类似 FeatueTask的功能,上边几个问题是绕不开的。上边几个问题弄清楚了,原理也就清楚了。

    2.2.1 如何实现获取异步任务的结果

    首先FeatureTask 内部有五种状态:

    • NEW 初始化状态
    • COMPLETING 处理完异步任务等待结束的状态(短暂)
    • NORMAL 正常处理完异步任务后的状态
    • EXCEPTIONAL 异步任务异常后的状态
    • CANCELLED 用户取消异步任务执行后的状态
    • INTERRUPTING 异步任务被打断中的状态(短暂)
    • INTERRUPTED 异步任务被打断后的状态

    内部状态的转化有一下几种:

    NEW -> COMPLETING -> NORMAL
    NEW -> COMPLETING -> EXCEPTIONAL
    NEW -> CANCELLED
    NEW -> INTERRUPTING -> INTERRUPTED
    
    // 代码仅包括核心的逻辑,完整逻辑看源码
    public class FutureTask<V>{
        ...
        private Callable<V> callable;  // 最终运行的异步任务
        private Object outcome;  // 执行完成后用于返回上层的异步任务处理结果
        public FutureTask(Callable<V> callable)   //构造函数
    
        //非完整逻辑,状态判断等逻辑都删除了
        public void run() {
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        // 异步任务执行出现了未捕获的异常
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);    // 异步任务正常执行完成
                }
            } finally {
               ...
            }
        }
    
        //报告异步任务执行成功
        protected void set(V v) {
            // 尝试修改内部状态
            if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
                outcome = v;  //存入异步任务的结果
                U.putOrderedInt(this, STATE, NORMAL); // 最终的状态
                finishCompletion();
            }
        }
        // 报告异步任务执行出现异常
         protected void setException(Throwable t) {
            // 尝试修改内部状态
            if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
                outcome = t;    // Exception 存入结果
                U.putOrderedInt(this, STATE, EXCEPTIONAL); // 最终状态
                finishCompletion();
            }
        }
        
        // 用户主动取消异步任务(不一定会成功,因为CAS保证了一旦状态从NEW改变了,就再也无法接受其它的情况了)
        public boolean cancel(boolean mayInterruptIfRunning) {
            // 如果起始状态不是NEW,则直接取消失败,直接返回。
            if (!(state == NEW &&
                  U.compazreAndSwapInt(this, STATE, NEW,
                      mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
    
    
            try {    // in case call to interrupt throws exception
                if (mayInterruptIfRunning) {
                    try {
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();
                    } finally { // final state
                        U.putOrderedInt(this, STATE, INTERRUPTED);
                    }
                }
            } finally {
                finishCompletion();
            }
            return true;
        }
    
        private void finishCompletion() {
            ...
            done();
            callable = null;        // to reduce footprint
        }
         
        // 异步任务结束后的回调,无论是正常执行结束、还是异步任务出现了Exception、还是用户cancel掉了异步任务,最终都会执行done。这几种情况是互斥的,通过CAS保证了,只能有一种实现。
        protected void done() { }
        ...
    }
    

    上边的代码已经削减了很多,只留下了处理这个话题最核心的代码。

    Callable异步任务的执行,最终依赖的是Runnable#run方法。即在 FeatureTask#run中调用Callable#call的调用,在子线程中完成了一次异步任务的执行。

    异步任务的执行有几种情况:
    1、正常处理完成 通过set方法报告结果
    2、有未捕获的异常 通过setException方法报告结果
    3、用户取消异步任务 通过cancel方法报告结果

    上述几种情况的处理是基于CAS来实现原子操作,也即是说只有一种情况会最终执行。无论走哪一种情况,最终都会报告结果:finishCompletion,也就是说最终都会执行 done 方法, done 这相当于一个回调,即最后一步通知用户的回调。所以基于此,我们可以在done方法里去取异步任务处理的结果,好处是因为异步任务已经结束,最终结果的获取就不会blocking了。

    改进一下上边的 FeatureTask的使用:

        val executors = Executors.newCachedThreadPool()
        val task = object : FutureTask<ResultData>(MyCall()){
            override fun done() {
                println("this is call done, threadName: ${Thread.currentThread().name}")
                val result = get()
                println("the call result is : $result")
            }
        }
    
        executors.execute(task)
    

    执行结果:

    this is myCall running, threadName: pool-1-thread-1
    this is call done, threadName: pool-1-thread-1
    the call result is : ResultData(flag=callable)
    

    好了,下一步看一下,FeatureTask是以何种方式把异步任务处理结果报告给使用者呢,答案就在 get() 方法中:

      public V get() throws InterruptedException, ExecutionException {
            ...
            return report(s);
        }
    
        private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                return (V)x;
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
    

    我们看到了调用 ge() 方法的时候,如果异步任务处理正常(NORMAL),则直接返回异步任务的结果;如果用户取消了异步任务(CANCELLED、INTERRUPTED),则直接抛出异常 CancellationException ; 如果异步任务有未捕获的异常(EXCEPTIONAL),则会抛出 ExecutionException 异常。最终把主动权交给上层用户。

    一个常见的处理模型:

    try {
        get()
       // do something
    } catch (InterruptedException e) {
        // do something                       
    } catch (ExecutionException e) {                    
        // do something
    } catch (CancellationException e) {          
        // do something         
    }
    

    至此整个流程就串起来了,细节的东西需要大家实际去看代码。

    最后 get() 还可能会抛出 InterruptedException,这是处理哪一种情况呢?因为我们知道 get() 会阻塞调用者所在的线程,假如被其它线程执行 Thread#interrupt 打断了所阻塞的线程,get()方法就会抛出 InterruptedException,然后具体被打断后的处理逻辑交给用户去处理。

    2.2.2 get(long timeout, TimeUnit unit) 超时机制如何实现

    get(long timeout, TimeUnit unit)提供了超时机制,在指定时间内还没有返回的话,就会抛出异常 TimeoutException,然后把控制权交给使用者。『 和get()相比,会多抛出一个 TimeoutException

    每一个FeatureTask 对象都维护了一个因调用此 FeatureTaskget 而进入等待状态的线程的单链表。元素的节点是:

        static final class WaitNode {
            volatile Thread thread;    // 当前节点的线程
            volatile WaitNode next;    // 下一个节点
            WaitNode() { thread = Thread.currentThread(); }
        }
    

    如果get指定了超时时间,会使用 LockSupport#parkNanos将当前线程进入阻塞状态,超过时间后回去检查异步任务运行状态,如果异步任务还没有完成,则会抛出异常 TimeoutException

    如果get 没指定超时时间,会使用LockSupport.park 无限期阻塞当前线程,直到任务处理完成调用 LockSupport.unpark 恢复运行。

    我们知道 finishCompletion 无论如何都会调用,不管异步任务正常处理、还是有未捕获的异常,或者被用户取消了都会调用。之前讲解流程的时候,忽略了这个方法中关于 等待节点的处理。

        private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
           ...
        }
    

    会遍历自己维护的 WaitNode 单链表,依次调用对应节点的 LockSupport.unpark(t); 让之前阻塞的线程都恢复运行。这么以来,所有阻塞的线程在这里都能得到异步任务的处理的结果。

    对于超时机制的处理,这里讲的只是基本的处理。更细节的东西还是要参考代码去学习。

    很有意思的一个事情是,和超时相关的话题这已经是第二次了。 第一次是讲 okio中的超时机制(这是一个典型的 生产者/消费者 模型的实践)。 第二次是讲 获取FeatureTask 异步任务处理结果的超时机制。 其实无非就是利用 wait/notify 或者 park/unpark 去处理多线程之间的关系。这两部分的超时机制设计的都很精彩,有时间建议大家去看看。

    相关文章

      网友评论

          本文标题:一篇讲明白FeatureTask

          本文链接:https://www.haomeiwen.com/subject/pfxryktx.html