美文网首页
并发编程(五)_自定义线程池示例代码

并发编程(五)_自定义线程池示例代码

作者: c5fc16271aee | 来源:发表于2018-04-07 16:59 被阅读0次

    【UseThreadPoolExecutor1.class】

    package com.jxb.thread13;
    
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy;
    /*
     * 自定义线程池 ThreadPoolExecutor
     */
    public class UseThreadPoolExecutor1 {
        public static void main(String[] args) {
            /**
             * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
             * 若大于corePoolSize,则会将任务加入队列,
             * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
             * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
             * 
             */ 
            ThreadPoolExecutor pool = new ThreadPoolExecutor(
                    1,              //coreSize  核心线程
                    2,              //MaxSize   最大线程数
                    60,             //60
                    TimeUnit.SECONDS, 
                    new ArrayBlockingQueue<Runnable>(3)         //指定一种有界队列 (有界队列)3个
                    //new LinkedBlockingQueue<Runnable>()
                    //, new MyRejected()    //自定义拒绝策略
                    //, new DiscardOldestPolicy()   //丢弃最老的请求(队列里面的),尝试再次提交当前任务。
                    );
            
            MyTask mt1 = new MyTask(1, "任务1");
            MyTask mt2 = new MyTask(2, "任务2");
            MyTask mt3 = new MyTask(3, "任务3");
            MyTask mt4 = new MyTask(4, "任务4");
            MyTask mt5 = new MyTask(5, "任务5");
            MyTask mt6 = new MyTask(6, "任务6");
            
            pool.execute(mt1);      //当启动一个时,直接打印       
            pool.execute(mt2);      //当启动两个时,放一个到queue里面,每个等五秒打印
            pool.execute(mt3);      //当启动三个时,放两个到queue里面,每个等五秒打印
            pool.execute(mt4);      //当启动四个时,放三个到queue里面,每个等五秒打印
            pool.execute(mt5);      //当启动五个时,本来应该放四个到queue里面,但是queue的容量为3,所以重新创建一个线程,先打印1 5,其他等五秒再打印
            pool.execute(mt6);      //当启动六个时,本来应该放五个到queue里面,但是queue的容量为3,所以再重新创建一个线程,但是最大线程数为2,再创建就超出了,所以报错
                                    
            pool.shutdown();    //等线程任务执行完毕后才关闭
            
        }
    }
    

    【UseThreadPoolExecutor1.class 的运行结果】
    这里运行结果有多种,现在就展现两种:

    1、六个任务同时都启动,使用默认的拒绝策略
    run taskId =1
    run taskId =5
    Exception in thread "main" 
    java.util.concurrent.RejectedExecutionException: Task 6 rejected from java.util.concurrent.ThreadPoolExecutor@70dea4e[Running, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
        at com.jxb.thread13.UseThreadPoolExecutor1.main(UseThreadPoolExecutor1.java:43)
    run taskId =2
    run taskId =3
    run taskId =4
    
    2、六个任务同时都启动,使用自定义的拒绝策略
    自定义处理..
    run taskId =5
    run taskId =1
    当前被拒绝任务为:6
    run taskId =2
    run taskId =3
    run taskId =4
    

    【UseThreadPoolExecutor2.class】

    package com.jxb.thread13;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    /*
     * 自定义线程池 ThreadPoolExecutor
     */
    public class UseThreadPoolExecutor2 implements Runnable{
    
        private static AtomicInteger count = new AtomicInteger(0);
        
        @Override
        public void run() {
            try {
                int temp = count.incrementAndGet(); //i++
                System.out.println("任务" + temp);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        public static void main(String[] args) throws Exception{
            //System.out.println(Runtime.getRuntime().availableProcessors());
            /**
             * 无界队列(LinkedBlockingQueue),中共20任务,每次执行5个任务(看coreSize的大小),剩下的15个任务放到queue里面;
             * 当前面的五个任务执行完了,再五个五个的执行。
             */
            BlockingQueue<Runnable> queue = 
                    //new LinkedBlockingQueue<Runnable>();
            /**
             * 当换成有界队列(ArrayBlockingQueue)时,先执行5个,本来应该放15个到queue,但是
             *      queue的容量是10,所以还剩下5个;再看maxSize的大小。还未超出(20最大线程数-5核心线程),将剩下的5个放入线程中执行。
             *      变成 先执行10个,再从queue里面取出10个执行.
             * 【测试】 相当于这个可以执行的总线程数为:5(核心线程)+10(队列)+15(最大线程数-核心线程数)
             */
                    new ArrayBlockingQueue<Runnable>(10);
            ExecutorService executor  = new ThreadPoolExecutor(
                        5,      //coreSize
                        20,     //maxSize
                        120L,   //2fenzhong
                        TimeUnit.SECONDS,
                        queue);
            
            //循环20次,提交了20个任务
            for(int i = 0 ; i < 20; i++){
                executor.execute(new UseThreadPoolExecutor2());
            }
            Thread.sleep(1000);
            System.out.println("queue size:" + queue.size());       
            Thread.sleep(2000);
            
            executor.shutdown();
            
        }
    
    
    }
    
    

    【MyTask.class】

    package com.jxb.thread13;
    
    public class MyTask implements Runnable {
    
        private int taskId;
        private String taskName;
        
        public MyTask(int taskId, String taskName){
            this.taskId = taskId;
            this.taskName = taskName;
        }
        
        public int getTaskId() {
            return taskId;
        }
    
        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }
    
        public String getTaskName() {
            return taskName;
        }
    
        public void setTaskName(String taskName) {
            this.taskName = taskName;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("run taskId =" + this.taskId);
                Thread.sleep(5*1000);
                //System.out.println("end taskId =" + this.taskId);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }       
        }
        
        public String toString(){
            return Integer.toString(this.taskId);
        }
    
    }
    
    

    【MyRejected.class】

    package com.jxb.thread13;
    
    import java.net.HttpURLConnection;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
    *  自定义拒绝策略可以实现RejectedExecutionHandler接口,并重写rejectedExecution(参数1,参数2)方法
    *  参数1:当前任务对象
    *  参数2:线程池对象
    */
    public class MyRejected implements RejectedExecutionHandler{
    
      public MyRejected(){
      }
      
      @Override
      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
          System.out.println("自定义处理..");
          System.out.println("当前被拒绝任务为:" + r.toString());
          /**
           * 拒绝任务的时候,可以使用 HttpURLConnection发送一个请求,将拒绝的数据发送给请求方客户端,
           *  告诉他这个任务现在处理不了。这种解决方案不利于大量的并发操作等。
           *  例如如果有1000个拒绝,难道要请求1000次吗?
           *      发送请求还可以使用:Apache HttpClient
           *              【https://blog.csdn.net/u013473691/article/details/52297195】 
           * 
           * 第二种解决方案:记录log。然后在不是高峰期的时候再使用定时任务去解析日志等
           */
          
      }
    
    }
    
    

    相关文章

      网友评论

          本文标题:并发编程(五)_自定义线程池示例代码

          本文链接:https://www.haomeiwen.com/subject/cdmyhftx.html