美文网首页
【java并发编程】全面理解Future模式的原理和使用(Fut

【java并发编程】全面理解Future模式的原理和使用(Fut

作者: DoubleBin | 来源:发表于2018-11-28 03:17 被阅读0次

    一、前言

        通常,java中创建多线程的两种方式:

    • 直接继承Thread;
    • 实现Runnable接口。

        考虑到一些逻辑需要一定的先后顺序,如果直接用这两种方式都会有共同的缺点:

    • 通常为阻塞式(通过join等待一个线程结束,但这样就失去了多线程的意义),或者通过wait、notify、notifyAll并结合状态变量等来进行并发设计,设计起来相当复杂;
    • 线程执行完成后难以获取线程执行结果(需要通过共享变量、线程间通信等方式来获取, 比较复杂)

        由此,我们想到了多线程开发中常见的Future模式。开发中经常有一些操作可能比较耗时,但又不想阻塞式的等待,这时可以先执行一些其它操作,等其它操作完成后再去获取耗时操作的结果,这就是Future模式的描述。对应于生活中例子比比皆是:比如,打开电饭煲烧米饭后继续炒菜,等炒菜完了去看下米饭有没有煲熟,过程中无需死等电饭煲把饭煲熟,只有在炒完菜后这个时间点,我们才尝试去看电饭煲煲饭的结果,这就是Future模式的一个生活原型。
        java从1.5开始,在并发包中提供了Future模式的设计,我们这要结合Callable、Future/FutureTask就能很容易的使用Future模式。

    二、Future模式的一个简单示例

         我们来看一个简单示例:

        public static void main(String[] args) throws InterruptedException, ExecutionException
        {
            
            ExecutorService executor = Executors.newCachedThreadPool();
            Future<Integer> future = executor.submit(new Callable<Integer>(){
    
                @Override
                public Integer call()
                    throws Exception
                {
                    int total = 0;
                    for(int i = 5001; i<=10000; i++){
                        total += i;
                    }
                    return total;
                }
                
            });
            
            System.out.print("Submit future task now...");
            executor.shutdown();
            
            int total = 0;
            for(int i = 1; i<=5000; i++){
                total += i;
            }
            
            total += future.get();
            
            System.out.print("1+2+...+10000 = " + total);
        }
    

    示例中计算了1~10000且步长为1的等比数列之和,将数列拆均分成两部分分别求和,最后进行累计。Future模式通常需要配合ExecutorService和Callable一起使用,代码中采用ExecutorService的submit方法提交Callable线程,在主线程任务完成后获取Callable线程的结果。

    三、源码分析

    1.     
      我们先直接看下Future类型的源码:
    public interface Future<V> {
    
        boolean cancel(boolean mayInterruptIfRunning);
    
        boolean isCancelled();
      
        boolean isDone();
    
        V get() throws InterruptedException, ExecutionException;
    
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    
    

        
    Future接口的5个方法含义如下:

    • cancel(boolean mayInterruptIfRunning) : 取消任务, 取消成功返回true;入参mayInterruptIfRunning表示是否允许取消正在执行中的任务。
    • isCancelled() : 返回是否取消成功
    • isDone() : 返回任务是否已经完成
    • get() : 返回执行结果,如果任务没有完成会阻塞到任务完成再返回
    • get(long timeout, TimeUnit unit) 获取执行结果并设置超时时间,如果超时返回null
    1.     
      Future模式通常需要配合ExecutorService和Callable一起使用,通过ExecutorService的submit方法提交Callable线程。我们知道,execute()方法在Executor接口中定义,而submit()方法在ExecutorService接口中定义,ExecutorService接口继承Executor接口:
        public interface Executor {
            void execute(Runnable command);
        }
        
        public interface ExecutorService extends Executor {
            ...
            <T> Future<T> submit(Callable<T> task);
                
            <T> Future<T> submit(Runnable task, T result);
                
            Future<?> submit(Runnable task);
            ...
        }
    
    1.     
      ExecutorService只是一个接口,我们以上一节的newCachedThreadPool为例,看下它的源码:
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
    1.     
      上面结果返回的是一个ThreadPoolExecutor,它是ExecutorService的一个子类,看ThreadPoolExecutor源码可以发现,ThreadPoolExecutor没有实现submit方法,它的submit方法由其直接父类AbstractExecutorService实现:
        ...
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
        ...
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
        ...    
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
        ...
    
    1.     
      在上面三个submit方法中,无论是Callable接口还是Runnable接口,均是转化成了RunnableFuture实例,看下RunnableFuture的实现:
    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }
    
    1.     
      RunnableFuture接口同时继承了Runnable接口和Future接口。再看下上面讲Callable或Runnable转化成RunnableFuture实例的实现:
        ...
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
        
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
        ...
    
    1.     
      通过两个newTaskFor方法分别将Callable和Runnable实例转化成FutureTask实例,FutureTask是RunnableFuture的实现,上述源码中涉及FutureTask的两种构造函数:
        ...
        private Callable<V> callable;
        private volatile int state;
        private static final int NEW          = 0;
        ...
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
        }
        ...
        public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }
        ...
    
    1.     
      对于Callable实例,直接将入参Callable对象赋值给this.callable属性,并设置this.state属性为NEW; 而对于Funnable实例,需要通过Executors类的callable(runnable, result)方法转化成Callable实例:
        public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
        }
    
    1.     
      Executors类的callable(runnable, result)方法实际生成了一个RunnableAdapter对象,看下其源码:
        static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }
    

        
    显而易见,RunnableAdapter类实现了Callable接口的call()方法,内部调用了Runnable实例的run()方法,并返回预先传过来的result值。

    1.     
      回过头来看下,FutureTask类实现了RunnableFuture接口,进而实现了Runnable接口和Future接口的统一,那么它是如何实现Runnable接口的run()方法的呢?看下其源码:
        public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
        
        ...
        
        protected void set(V v) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                finishCompletion();
            }
        }
        
        ...
        
        protected void setException(Throwable t) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = t;
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
                finishCompletion();
            }
        }
    

        
    可以看到FutureTask的run()方法内部调用了Runnable实例的call()方法,并且如果运行成功,将call()方法的返回值赋值给outcome,否则将异常赋值给outcome。
        
    这样也就容易理解ExecutorService的submit方法实现中是如何调用execute(Runnable command)方法的了,它将Runnable或者Callable实例统一转换成了RunnableFuture实例,由于RunnableFuture继承了Runnable接口,所以线程池可以通过execute(Runnable command)方法来进行处理。

    1.     回过来看下FutureTask类的get()方法实现:
        public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
                s = awaitDone(false, 0L);
            return report(s);
        }
        
        ...
        
        private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                return (V)x;
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
        
        ...
        
            private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
                if (Thread.interrupted()) {
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                if (s > COMPLETING) {
                    if (q != null)
                        q.thread = null;
                    return s;
                }
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    q = new WaitNode();
                else if (!queued)
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                else if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        removeWaiter(q);
                        return state;
                    }
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    LockSupport.park(this);
            }
        }
    
    • 如果状态state为任务执行中,则阻塞等待,否则通过report(s)返回结果。返回结果时:如果状态正常,则直接返回outcome;如果取消或者中断,则返回CancellationException异常;如果执行异常,则返回ExecutionException。
    • 上述源码可以看出,get()方法通过awaitDone方法进行阻塞等待,awaitDone方法实现上采用LockSupport.park()进行线程阻塞,在FutureTasl的run()方法执行完成或异常发生,会执行set(V v)方法或setException(Throwable t)方法,两者的实现中都会调用finishCompletion()方法,并在finishCompletion()方法中采用LockSupport.unpark方法进行了线程唤醒:
        private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            done();
    
            callable = null;        // to reduce footprint
        }
    

    四、总结

        通过上述举例和源码分析我们理解了java中Future模式的原理和使用,Future模式对于一些耗时操作(比如网络请求等)的性能提升还是比较有用的,实际开发中可以灵活运用。

    相关文章

      网友评论

          本文标题:【java并发编程】全面理解Future模式的原理和使用(Fut

          本文链接:https://www.haomeiwen.com/subject/sxvogftx.html