美文网首页Java 杂谈java
java Future,FutureTask,Callable

java Future,FutureTask,Callable

作者: Cc_7397 | 来源:发表于2019-01-14 00:23 被阅读30次

    Future

    Future的作用就是在主线程之外另外起一个线程去做另一件事,让java同时可以并行处理。并且提供方法随时去查看Future的是否完成,并取得任务的结果。本篇文章会详细讲解流程和每个方法。

    先写一个小例子:
    这个例子中要做3件事,其中job2和job3各耗时2000毫秒,而Future可以让他们同时进行

      long a = System.currentTimeMillis();
      System.out.println("do job1");
      Callable<String> callable = () -> { //子线程任务,实现Callable<T> 接口,返回值为T
            Thread.sleep(2000);
            System.out.println("do job2");
            return "job2 is done";
       };
      FutureTask<String> task = new FutureTask<>(callable); //new 一个FutureTask 将任务引用传递进去
      new Thread(task).start();  //启动Future任务
      Thread.sleep(2000);  
      System.out.println("do job3");
      while (!task.isDone()) {  //循环判断Future任务是否完成了
      Thread.onSpinWait(); //相当于Thread.sleep(); 不过效率更高
      }
      System.out.println("job2结果 => " + task.get() + "时间:" + (System.currentTimeMillis() - a));
    

    结果

    do job1
    do job2
    do job3
    job2结果 => job2 is done时间:2006
    

    可以看到Future确实让任务并行处理了。
    我们这里使用了isDone()方法查询计算是否完成了,下面细节讲解运行原理和它提供的方法。

    Future提供的方法

    FutureTask实现了Future<V>接口,该接口提供了以下方法:

    • boolean cancel(boolean mayInterruptIfRunning);//取消计算
    • boolean isCancelled(); //判断计算是否被取消。
    • boolean isDone(); //判断计算是否结束。
    • V get();//获取结果,如果计算没结束会一直堵塞到结束。
    • V get(long timeout, TimeUnit unit) ;//获取结果,如果计算没结束最多等待一定时间。

    运行原理和流程

    1.首先先看下FutureTask 有哪些属性呢

    • 运行状态的判断:
        private volatile int state; //存储状态的变量 使用volatile
        private static final int NEW          = 0; //代表是刚刚创建没进行如何操作
        private static final int COMPLETING   = 1; //正在计算中
        private static final int NORMAL       = 2; //正常
        private static final int EXCEPTIONAL  = 3; //发生了异常
        private static final int CANCELLED    = 4; //被取消了
        private static final int INTERRUPTING = 5; //中断中
        private static final int INTERRUPTED  = 6; //以被中断
    
    • 其他属性
     /** The underlying callable; nulled out after running */
        private Callable<V> callable; //我们提交的任务
        /** The result to return or exception to throw from get() */
        private Object outcome; //存储结果 或者异常
        /** The thread running the callable; CASed during run() */
        private volatile Thread runner; //记录运行的线程
        /** Treiber stack of waiting threads */
        private volatile WaitNode waiters; //等待结果的队列
    

    2.下面讲解运行原理和流程
    首先我们创建了一个Callable 记录我们任务的方法,然后new FutureTask 会把Callable赋值给callable属性,将state 设置为NEW也就是0。然后启动线程调用run方法

       public void run() {
            if (state != NEW ||
                !RUNNER.compareAndSet(this, null, Thread.currentThread())) //如果状态不是new或者将runner属性赋值为当前线程时失败就直接返回,这里表明了我们的任务只能启动一次。
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) { // callable 不为空,状态为new才继续计算
                    V result;
                    boolean ran;//flag记录是否计算成功
                    try {
                        result = c.call();//调用我们的方法
                        ran = true;//完成了 把flag设置为ture
                    } catch (Throwable ex) {  //有异常 结果设置为空 flag设为false
                        result = null;
                        ran = false;
                        setException(ex);  //处理异常的方法
                    }
                    if (ran)  //计算成功
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;  
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING) //如果有其它线程在中断任务会等到中断结束
                    handlePossibleCancellationInterrupt(s);  
            }
        }
    

    先讲解下CAS 这里面好多CAS操作

    RUNNER.compareAndSet(this, null, Thread.currentThread())这里VarHandle 来提供CAS操作,我这里是jdk11,你们的版本可能是用unsafe来操作的。他们的作用是一样的,就是先获取某个属性相对对象的内存地址偏移值,然后用CAS方式修改属性的值,因为一个对象属性在内存中的排列是固定的,这样只要有对象的地址和属性的偏移值就能定位属性在内存的地址。这里的RUNNER 是这样来的,可以看到定位了FutureTask这个类的runner属性,类型是Thread

     MethodHandles.Lookup l = MethodHandles.lookup();
     RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class);
    

    RUNNER.compareAndSet(A,B,C)方法:修改A对象的RUNNER所代表的属性,如果当前值是B就修改为C,如果失败会一直重试直到,当前属性不是B了或者修改成功。
    使用unsafe也是一样的,只不过不会去先取得VarHandle这个句柄,而是在调用时将属性偏移值一起传递进去。

    if (state != NEW ||
                !RUNNER.compareAndSet(this, null, Thread.currentThread())) 
    

    所以意思计算如果状态不是new或者将runner属性赋值为当前线程时失败

    继续

    发生异常

    后面就是调用我们自己写的方法了,如果发送了异常我们看下

     protected void setException(Throwable t) {
            if (STATE.compareAndSet(this, NEW, COMPLETING)) {
                outcome = t;
                STATE.setRelease(this, EXCEPTIONAL); // final state
                finishCompletion();//唤起所有等待线程
            }
        }
    

    就是先发状态设置为计算中,然后记录下异常,在把状态设置为EXCEPTIONAL异常,注意setRelease方法表示设置值后这个值不能再被修改。

    计算完成

       protected void set(V v) {
            if (STATE.compareAndSet(this, NEW, COMPLETING)) {
                outcome = v;
                STATE.setRelease(this, NORMAL); // final state
                finishCompletion();
            }
        }
    

    其实和发送异常是一样的,只是记录状态不一样
    都会调finishCompletion()方法

    private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) { //方法内变量  记录链表 
                if (WAITERS.weakCompareAndSet(this, q, null)) {//将属性的设置为空
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            done();//空方法 ,给用户扩展用
    
            callable = null;        // to reduce footprint
        }
    

    这里的WaitNode 是一个链表,记录使用等待结果的线程。上面说的get方法会堵塞直到计算完成,被堵塞的线程就会存储到这个链表。

            for (WaitNode q; (q = waiters) != null;) { //方法内变量  记录链表 
                if (WAITERS.weakCompareAndSet(this, q, null)) {//将属性的设置为空
            //这里可能会有新的队列
    

    这里会一直循环,防止将waiters属性赋值为空之后又有新的线程加入到队列中
    然后循环链表把线程唤起使用 LockSupport.unpark(t),这个方法点进去可以发现其实用的是前面提到的Unsafe,juc的包很多都是用Unsafe实现的。

    下面讲解各个方法

    isDone

     public boolean isDone() {
            return state != NEW;
        }
    

    非常简单。。。

    get

        /**
         * @throws CancellationException {@inheritDoc}
         */
        public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }
    
        /**
         * @throws CancellationException {@inheritDoc}
         */
        public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
            return report(s);
        }
    

    两个重载方法,一个传入了最大等待时间
    如果状态的<= COMPLETING 也就是NEW 和COMPLETING 就进入等待的方法

     private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            // The code below is very delicate, to achieve these goals:
            // - call nanoTime exactly once for each call to park
            // - if nanos <= 0L, return promptly without allocation or nanoTime
            // - if nanos == Long.MIN_VALUE, don't underflow
            // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
            //   and we suffer a spurious wakeup, we will do no worse than
            //   to park-spin for a while
            long startTime = 0L;    // Special value 0L means not yet parked
            WaitNode q = null;  //记录当前线程 
            boolean queued = false;  //记录是否把当前线程加入到了队列
            for (;;) {
                int s = state;
                if (s > COMPLETING) { //大于计算中,代表完成||中断||取消 直接返回 
                    if (q != null)
                        q.thread = null; //当前线程设置为空,finishCompletion() 会判断 链表队列里的线程是否为空 
                    return s;
                }
                else if (s == COMPLETING) //如果在计算中 放开cpu资源 
                    // We may have already promised (via isDone) that we are done
                    // so never return empty-handed or throw InterruptedException
                    Thread.yield();
                else if (Thread.interrupted()) { //如果当前线程被中断 
                    removeWaiter(q); //从等待队列中移除当前线程对象
                    throw new InterruptedException(); 
                }
                else if (q == null) {  //这里只有第一次循环会进入
                    if (timed && nanos <= 0L) //如果是等待一段时间,并且时间小于等于0 直接退出了 
                        return s;
                    q = new WaitNode(); //new 一个链表元素  这里new的时候会把当前线程对象放进这个元素里面 
                }
                else if (!queued) //如果没加入 FutureTask的链表里就加入 
                    queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); //把waiters 属性设置成q的下一个 替换 waiters 属性为q
                else if (timed) { //设置了等待时间才会进入
                    final long parkNanos; //记录时间
                    if (startTime == 0L) { // first time  第一次进入记录开始时间
                        startTime = System.nanoTime();
                        if (startTime == 0L)
                            startTime = 1L;
                        parkNanos = nanos;
                    } else { //第二次进入
                        long elapsed = System.nanoTime() - startTime; //加上 刚才使用的时间
                        if (elapsed >= nanos) {//如果已经超过等待时间直接退出了
                            removeWaiter(q);
                            return state;
                        }
                        parkNanos = nanos - elapsed; //计算还要等待的时间
                    }
                    // nanoTime may be slow; recheck before parking
                    if (state < COMPLETING)//重新check 下状态
                        LockSupport.parkNanos(this, parkNanos); //挂起线程剩下的时间  这里是毫秒
                }
                else
                    LockSupport.park(this); // 不设置时间,挂起线程 
            }
        }
    

    梳理下流程,
    1.先判断状态
    2.把线程加入等待队列

    1. 有等待时间就 park 那么多时间
    2. 没等待时间就直接 park
    3. 其它线程唤醒 重新判断状态
      这个方法会返回这个状态,计算等待结束后的状态。可能是完成,异常等。然后会掉report方法
        private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                return (V)x;
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
    

    正常就返回结果,否则会抛个异常出去。使用我们自己写的方法出现了异常最终会在这里抛出来。

    isCancelled

      public boolean isCancelled() {
            return state >= CANCELLED;
        }
    

    也很简单了

    cancel

    要注意一点,java是不能直接强制停掉线程的。这里取消是使用中断的方式 interrupt ,然后把状态修改为INTERRUPTING或者CANCELLED
    中断一个线程并不会停掉线程,只是告诉线程你要停掉了,被中断的线程可以使用Thread.interrupted() 来查看我是不是被中断了,另外线程在堵塞时如果被中断会从堵塞中恢复抛出一个中断异常InterruptedException。

       public boolean cancel(boolean mayInterruptIfRunning) {
            if (!(state == NEW && STATE.compareAndSet
                  (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                return false;
            try {    // in case call to interrupt throws exception
                if (mayInterruptIfRunning) {
                    try {
                        Thread t = runner;
                        if (t != null)
                            t.interrupt();
                    } finally { // final state
                        STATE.setRelease(this, INTERRUPTED);
                    }
                }
            } finally {
                finishCompletion();//唤醒等待队列
            }
            return true;
        }
    
    

    取消只会在状态为NEW时才有效,但是在任务计算完之前 状态都是NEW
    run方法的状态判断

     if (state != NEW ||
                !RUNNER.compareAndSet(this, null, Thread.currentThread())) //如果状态不是new或者将runner属性赋值为当前线程时失败就直接返回,这里表明了我们的任务只能启动一次。
                return;
    

    所以你提交的任务并不会在计算时停掉,如果在计算的run方法的状态判断前那么不会计算。
    如果在判断后,那么计算还是会进行,但结果不会存储,并且在get时会抛这个异常。

         if (s >= CANCELLED)
                throw new CancellationException();
    

    这个方法的参数mayInterruptIfRunning说一下。

    • ture 代表会中断线程,状态会改为INTERRUPTING 并且中断interrupt() 中断后会把状态改成INTERRUPTED
    • flase 则只是会 把状态修改为CANCELLED

    最后

    其实里面的方法不是很难,重点是在并发时的处理和设计,比如get的同时取消,多个线程同时get等,多个方法通过共享变量来通信和协调。这个jdk的类,所以可以当作一个标准优秀的案例来参考和学习。这是作者第一次写文章,如果有不对的地方大家一定要提出来。

    相关文章

      网友评论

        本文标题:java Future,FutureTask,Callable

        本文链接:https://www.haomeiwen.com/subject/lmwldqtx.html