美文网首页
高并发编程四

高并发编程四

作者: 紫雨杰 | 来源:发表于2018-05-15 17:23 被阅读0次

    1、线程池:提供了一个线队列,队列中保存着所有等待状态的线程。避免了频繁的创建与销毁所造成的额外开销,提高了响应的速度。

    2、线程池的体系结构:

     java.util.concurrent.Executor:负责线程的使用与调度的根接口【Executor->执行器执行任务的一个接口】
             |---(**)ExecutorService 子接口: 线程池的主要接口
                        |--- ThreadPoolExecutor 线程池的实现类
                        |--- ScheduledExecutorService 子接口:负责线程的调度
                              |--- ScheduledThreadPoolExecutor : 继承了ThreadPoolExecutor,实现了ScheduledExecutorService接口,
                                                                 所以该实现类即具备了线程池的功能,又有线程调度的功能。
    

    3、工具类:Executors

     ExecutorService  newFixedThreadPool(): 创建固定大小的线程池;
     ExecutorService  newCachedThreadPool():缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量;
     ExecutorService  newSingleThreadExecutor():创建单个线程池。线程池中只有一个线程;
     ScheduledExecutorService  newScheduledThreadPool():创建固定大小的线程,可以延迟或定时的执行任务
    
     newWorkStealingPool:底层是用ForkJoinPool实现的【工作窃取】
     ForkJoinPool
    

    4、工作窃取算法

    (1)、工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
    
    (2)、那么为什么需要使用工作窃取算法呢?
            假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,
        并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,
        而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。
        而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,
        而窃取任务的线程永远从双端队列的尾部拿任务执行。
    
    (3)、工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,
         其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
    

    5、案例:

    ★ newWorkStealingPool
    
    public class WorkStealingPoolDemo {
        public static void main(String[] args) throws IOException {
            ExecutorService service = Executors.newWorkStealingPool();
            System.out.println(Runtime.getRuntime().availableProcessors());     //输出电脑的CPU核数,创建核数多个线程
    
            service.execute(new R(1000));
            service.execute(new R(2000));
            service.execute(new R(2000));
            service.execute(new R(2000)); //daemon
            service.execute(new R(2000));
            
            //由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出,所以要添加System.in.read();
            System.in.read(); 
        }
    
        static class R implements Runnable {
            int time;
            R(int t) {
                this.time = t;
            }
    
            @Override
            public void run() {     
                try {
                    TimeUnit.MILLISECONDS.sleep(time);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }           
                System.out.println(time  + " " + Thread.currentThread().getName());         
            }
        }
    }
    
    ★ ForkJoinPool 
       (1)、Java7 提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。
       (2)、ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。
    
       (3)、使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask<T> task) 或invoke(ForkJoinTask<T> task)
                     或execute(ForkJoinTask<T> task)方法来执行指定任务了。
    
       (4)、其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。
            其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。
    
    public class ForkJoinPoolDemo {
        static int[] nums = new int[1000000];
        static final int MAX_NUM = 50000;
        static Random r = new Random();
        
        static {
            for(int i=0; i<nums.length; i++) {
                nums[i] = r.nextInt(100);
            }       
            System.out.println(Arrays.stream(nums).sum()); //stream api 
        }
        
        
        /*
        static class AddTask extends RecursiveAction {       //继承RecursiveAction,没有返回值  
            int start, end;     
            AddTask(int s, int e) {
                start = s;
                end = e;
            }
    
            @Override
            protected void compute() {          
                if(end-start <= MAX_NUM) {
                    long sum = 0L;
                    for(int i=start; i<end; i++) sum += nums[i];
                    System.out.println("from:" + start + " to:" + end + " = " + sum);
                } else {        
                    int middle = start + (end-start)/2;
                    
                    AddTask subTask1 = new AddTask(start, middle);
                    AddTask subTask2 = new AddTask(middle, end);
                    subTask1.fork();
                    subTask2.fork();
                }           
            }
        }
        */
        
        static class AddTask extends RecursiveTask<Long> {   //继承RecursiveTask,有返回值,RecursiveTask带有泛型,泛型即为返回值的类型    
            int start, end;     
            AddTask(int s, int e) {
                start = s;
                end = e;
            }
    
            @Override
            protected Long compute() {          
                if(end-start <= MAX_NUM) {
                    long sum = 0L;
                    for(int i=start; i<end; i++) sum += nums[i];
                    return sum;
                } 
                
                int middle = start + (end-start)/2;
                
                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                subTask1.fork();
                subTask2.fork();
                
                return subTask1.join() + subTask2.join();
            }       
        }
        
        public static void main(String[] args) throws IOException {
            ForkJoinPool fjp = new ForkJoinPool();
            AddTask task = new AddTask(0, nums.length);
            fjp.execute(task);
    
            long result = task.join();      //join有阻塞,所以不需要写System.in.read();   
            System.out.println(result);
            
            //System.in.read();     
        }
    }
    

    相关文章

      网友评论

          本文标题:高并发编程四

          本文链接:https://www.haomeiwen.com/subject/vatbkftx.html