美文网首页
多线程Executor

多线程Executor

作者: 叛逆的青春不回头 | 来源:发表于2017-04-21 14:41 被阅读0次

    一、相关类简单介绍
    二、各类之间的关系图
    三、 execute方法执行过程


    相关类简单介绍

    1、Executor:执行已提交的 Runnable 任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法
    2、Executors:为创建ExecutorService提供了便捷的工厂方法
    3、ExecutorService:提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。其提供两个方法来关闭 ExecutorService:

    • shutdown() 在终止前允许执行以前提交的任务;
    • shutdownNow() 阻止等待任务的启动并试图停止当前正在执行的任务。终止后,执行程序没有任务在执行或等待执行,并且无法提交新任务。

    4、ScheduledExecutorService:ExecutorService的一个重要的子接口,此service是为了支持时间可控的任务执行而设计,其中包括固定延迟执行,周期性执行;不过他还不支持制定特定date执行,这个工作可以交给Timer来做
    5、ThreadPoolExecutor:线程池执行器,ExecutorService的一个实现类,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置
    6、Runnable、Callable、Future

    • 在并发编程时,一般使用runnable,然后扔给线程池完事,这种情况下不需要线程的结果,所以run的返回值是void类型;
    • 多线程协作时,比如菲波拉切数列,1,1,2,3,5,8...使用多线程来计算,后者需要前者的结果,就需要用Callable接口了。 Callable调用的是call方法,该方法有一个泛型返回值类型,你可以任意指定;
    • 线程是属于异步计算模型,所以你不可能直接从别的线程中得到函数返回值。 这时候Future就出场了。Futrue可以监视目标线程调用call的情况,当你调用Future的get()方法以获得结果时,当前线程就开始阻塞,直接call方法结束返回结果。

    各类之间的关系图

    下图是一张包含各类关系比较全的一张图,可以看出入口主要在execute(Runnable command) 方法:

    execute方法执行过程

    一个简单例子:

    public class ThreadPoolTest {  
        public static void main(String[] args) {  
            ExecutorService exec=Executors.newCachedThreadPool();  
            for(int i=0;i<5;i++)  
                exec.execute(new Runnable());  
            exec.shutdown();//并不是终止线程的运行,而是禁止在这个Executor中添加新的任务  
        }  
    } 
    

    看下源码:

    /*===ThreadPoolExecutor.java===*/
    public void execute(Runnable command) {      //***(1)***
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {   
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);          //***(2)***
        }
        else if (!addWorker(command, false))
            reject(command);
    }
      
    private boolean addWorker(Runnable firstTask, boolean core) {
        //省略代码..
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);        //***(3)***
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
     
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();     //***(5)***
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
     
     
      
    /*===ThreadPoolExecutor.Worker.java===*/
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
     
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);    //***(4)***
        }
       ...
    }
      
      
    /*===Executors.DefaultThreadFactory.java===*/
    static class DefaultThreadFactory implements ThreadFactory {
        ...
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
       ...
    }
    

    对应执行execute()方法的完整过程:


    从上面过程可以看出,会执行到addWorker()的t.start(),这里的t就是new出的Thread,也就是说t.start()实际上执行的是Worker内部的runWork方法。
    runWork()内部会在if条件里面使用“短路”:判断firstTask是否为null,若不是null则直接执行firstTask的run方法;如果是null,则调用getTask()方法来获取Runnable类型实例。从哪里获取呢?workQueue中。在上图的绿色框中执行addWork()之前就已经把Runnable类型实例放入到workQueue中了,所以这里可以从workQueue中获取到。


    相关文章

      网友评论

          本文标题:多线程Executor

          本文链接:https://www.haomeiwen.com/subject/imydzttx.html