美文网首页程序员首页投稿(暂停使用,暂停投稿)
并发编程实战二之线程池和CompletionService

并发编程实战二之线程池和CompletionService

作者: 谜碌小孩 | 来源:发表于2016-11-25 10:09 被阅读0次

    线程池

    线程饥饿死锁

    任务依赖于其他任务,线程池不够大
    单线程,一个任务将另一个任务提交到同一个Executor。

    设置线程池的大小

    int N_CPUS = Runtime.getRuntime().availableProcessors();
    计算密集型  thread = N_CPUS+1
    包含I/O或其他阻塞操作的任务  thread = N_CPUS*U_CPUS(1 + W/C)
        U_CPUS  ——  基准负载
        W/C  ——  等待时间与计算时间的比值
    内存、文件句柄、套接字句柄、数据库连接 —— 资源可用总量/每个任务的需求量
    

    线程池的创建

    public ThreadPoolExecutor(int corePoolSize,          //基本大小
                                  int maximumPoolSize,        //最大
                                  long keepAliveTime,        //线程存活时间
                                  TimeUnit unit,            
                                  BlockingQueue<Runnable> workQueue,    //线程队列
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    Executors.newSingleThreadExecutor();
    
    Executors.newFixedThreadPool(100)基本大小和最大大小设置为参数指定值
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    Executors.newCachedThreadPool()线程池最大大小设置为Integer.MAX_VALUE,队列为SynchronousQueue
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    

    串行递归转并行递归

    public class SeToParallel {
        public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){
            for(Node<T> n:nodes){
                results.add(n.compute());
                sequentialRecursive(n.getChildren(),results);
            }
        }
    
        public <T> void parallelRecursive(final Executor exec,List<Node<T>> nodes,final Collection<T> results){
            for(final Node<T> n : nodes){
                exec.execute(new Runnable() {
                    @Override
                    public void run() {
                        results.add(n.compute());
                    }
                });
                parallelRecursive(exec,n.getChildren(),results);
            }
        }
    
        public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException {
            ExecutorService exec = Executors.newCachedThreadPool();
            Queue<T> resultQueue = new ConcurrentLinkedQueue<>();
            parallelRecursive(exec,nodes,resultQueue);
            exec.shutdown();
            exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
            return resultQueue;
        }
    
            class Node<T>{
    
                private T t;
                private List<Node<T>> children;
    
                public List<Node<T>> getChildren() {
                    return children;
                }
    
                public T compute(){
                    return t;
                }
            }
    }
    

    CompletionService:Executor与BlockingQueue

    ExecutorCompletionService实现了CompletionService,用BlockingQueue保存计算完成的结果。提交任务是,任务被包装成QueueingFuture.
    页面逐渐渲染:

    public class Renderer {
        private final ExecutorService executorService;
    
        public Renderer(ExecutorService executorService) {
            this.executorService = executorService;
        }
        
        void renderPage(CharSequence source){
            List<ImageInfo> info = scanForImageInfo(source);
                CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executorService);
            for(final ImageInfo imageInfo:info){
                completionService.submit(new Callable<ImageData>() {
                    @Override
                    public ImageData call() throws Exception {
                        return imageInfo.downloadImage();
                    }
                });
            }
            
            renderText(source);
            try {
                for(int t = 0,n = info.size();t < n;t++){
                    Future<ImageData> f = completionService.take();
                    ImageData imageData = f.get();
                    renderImage(imageData);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:并发编程实战二之线程池和CompletionService

        本文链接:https://www.haomeiwen.com/subject/pcbipttx.html