美文网首页AndroidJava-多线程
Android网络请求框架核心-线程池处理高并发

Android网络请求框架核心-线程池处理高并发

作者: 坑逼的严 | 来源:发表于2019-05-10 13:18 被阅读0次

    最近公司开发一个新的SDK,需要配合他们写一些上层代码,下载时不能缺少的,当然,既然是SDK就不能引用其他第三方的开源网络框架,只能自己写了,这个SDK对网络高并发有严格要求,所以需要特殊处理,查了些资料,模仿了volley编写了自己的高并发框架,下面就来简单介绍。

    一、设置线程池

    线程池是高并发的核心,所以我们先搞懂他。这里只是入门理解理解,毕竟能力有限:

    1、系统封装的线程池:Executors类

    Executors类里面封装了几个常用的线程池:
    1、newCachedThreadPool:可缓存的线程池
    创建一个可缓存线程池,如果线程池中线程数量超过核心线程数,可灵活回收空闲线程,若无可回收,则新建线程。
    这种类型的线程池特点是:
    工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
    如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
    在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    2、newFixedThreadPool
    创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。
    FixedThreadPool是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程(因为他设置的回收时间为0),还会占用一定的系统资源。

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    3、newSingleThreadExecutor
    创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定FIFO(先进先出)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的

        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    4、newScheduleThreadPool
    创建一个定长的线程池,而且支持定时的以及周期性的任务执行,支持定时及周期性任务执行

    以上四种就是系统封装的,那我们可以参考他们来封装一下,直接上ThreadPoolManager的代码

    import android.util.Log;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPoolManager {
        private static ThreadPoolManager instence = null;
        private ThreadPoolExecutor mThreadPoolExecutor;
        //线程数量,核心线程数
        private int corePoolSize = 4;
        //池中允许的最大线程数
        private int maximumPoolSize = 10;
        //设置空闲线程的空闲时间,如果一共有6个线程,那么超出核心线程数的两个线程就会被记录时间,超过该时间就会被杀死,如果没有超过核心线程数,那么线程是不会被倒计时的。
        private long keepAliveTime = 10;
        //等待执行的容器容量大小
        private int capacity = 10;
        //拒绝后的执行任务容器--》凉快的地方
        private LinkedBlockingQueue taskQueue =new LinkedBlockingQueue();
    
        public synchronized static ThreadPoolManager getInstence() {
            if (instence == null) {
                synchronized (ThreadPoolManager.class) {
                    if (instence == instence) {
                        instence = new ThreadPoolManager();
                    }
                }
            }
            return instence;
        }
    
        /**
         * 构造方法里面就初始化线程池
         * ArrayBlockingQueue是一个执行任务的容量,当调用mThreadPoolExecutor的execute,容量加1,执行run完后,容量减1
         * ArrayBlockingQueue后面传入true就是以FIFO规则存储:先进先出
         */
        public ThreadPoolManager(){
            if(mThreadPoolExecutor==null){
                mThreadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime, TimeUnit.SECONDS,
                        new ArrayBlockingQueue<Runnable>(capacity,true),handler);
            }
            //开启线程一直循环从等待队列里面取出可执行任务并执行
            mThreadPoolExecutor.execute(runnable);
        }
    
    
        /**
         * 往队列里面存入可执行任务
         * @param runnable
         */
        public void putExecutableTasks(Runnable runnable){
            try {
                taskQueue.put(runnable);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * ThreadPoolExecutor的run
         */
        private Runnable runnable=new Runnable() {
    
            @Override
            public void run() {
                //开启循环
                while(true){
                    //取出等待的执行任务
                    Runnable taskQueueRunnable = null;
                    try {
                        Log.d("yanjin","等待队列大小:"+taskQueue.size());
                        taskQueueRunnable = (Runnable) taskQueue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if(runnable!=null){
                        mThreadPoolExecutor.execute(taskQueueRunnable);
                    }
                    Log.d("yanjin","线程池大小"+mThreadPoolExecutor.getPoolSize());
                }
            }
        };
    
        /**
         * 拒绝策略
         * 当ArrayBlockingQueue容量过大,就要执行拒绝策略,对来的执行任务说:放不下了,先到一边凉快去,那么就要有一个凉快的容器撞他们
         *
         */
        private RejectedExecutionHandler handler = new RejectedExecutionHandler(){
    
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    taskQueue.put(r);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
    }
    
    

    名字取ThreadPoolManager,

    public synchronized static ThreadPoolManager getInstence() {
            if (instence == null) {
                synchronized (ThreadPoolManager.class) {
                    if (instence == instence) {
                        instence = new ThreadPoolManager();
                    }
                }
            }
            return instence;
        }
    

    这个代表他是个单例。在他的构造方法里面我们就初始化线程池,这样我们的线程池也只有唯一一份了

        /**
         * 构造方法里面就初始化线程池
         * ArrayBlockingQueue是一个执行任务的容量,当调用mThreadPoolExecutor的execute,容量加1,执行run完后,容量减1
         * ArrayBlockingQueue后面传入true就是以FIFO规则存储:先进先出
         */
        public ThreadPoolManager(){
            if(mThreadPoolExecutor==null){
                mThreadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime, TimeUnit.SECONDS,
                        new ArrayBlockingQueue<Runnable>(capacity,true),handler);
            }
            //开启线程一直循环从等待队列里面取出可执行任务并执行
            mThreadPoolExecutor.execute(runnable);
        }
    

    ThreadPoolExecutor是系统的线程池,之前我们介绍的四种系统封装的都是对他的一个分装而来的,我们点进去,看看他的参数含义,他有好几个构造方法不要弄错了

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
    

    corePoolSize代表核心线程数、maximumPoolSize是池中允许的最大线程数、keepAliveTime是设置空闲线程的空闲时间,如果一共有6个线程,那么超出核心线程数的两个线程就会被记录时间,超过该时间就会被杀死,如果没有超过核心线程数,那么线程是不会被倒计时的。workQueue用于保存任务的队列,我们每添加一个,就执行一个,这里我们用new ArrayBlockingQueue<Runnable>(capacity,true),ArrayBlockingQueue是使用数组保的方法,capacity是容量大小,true代表按照FIFA的顺序(先进先出)执行,handler代表拒绝策略,一旦当前ArrayBlockingQueue容器满了,我们定义的是10个,那么就会调用走到handler里面。

        /**
         * 拒绝策略
         * 当ArrayBlockingQueue容量过大,就要执行拒绝策略,对来的执行任务说:放不下了,先到一边凉快去,那么就要有      一个凉快的容器装他们
         *
         */
        private RejectedExecutionHandler handler = new RejectedExecutionHandler(){
    
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    taskQueue.put(r);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
    

    这个拒绝策略,我们只是把被拒绝的任务存储起来,taskQueue就是这个容器是LinkedBlockingQueue类型(遵循FIFA先进先出顺序的阻塞是容器),那么我们就需要开一个线程一直循环去taskQueue取出执行任务。还记得构造方法里面我们初始化了线程池,并且执行了,对!这个就是他开线程循环取的地方,就是我们已初始化就开始轮询了

    //开启线程一直循环从等待队列里面取出可执行任务并执行
            mThreadPoolExecutor.execute(runnable);
    
    ..........
    ..........
    ..........
        /**
         * ThreadPoolExecutor的run
         */
        private Runnable runnable=new Runnable() {
    
            @Override
            public void run() {
                //开启循环
                while(true){
                    //取出等待的执行任务
                    Runnable taskQueueRunnable = null;
                    try {
                        Log.d("yanjin","等待队列大小:"+taskQueue.size());
                        //take: 获取并移除此队列的头部,在元素变得可用之前一直等待 。queue的长度 == 0 的时候,一直阻塞
                        taskQueueRunnable = (Runnable) taskQueue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if(runnable!=null){
                        mThreadPoolExecutor.execute(taskQueueRunnable);
                    }
                    Log.d("yanjin","线程池大小"+mThreadPoolExecutor.getPoolSize());
                }
            }
        };
    

    这里我们用到了taskQueue.take();,,take: 获取并移除此队列的头部,在元素变得可用之前一直等待 。queue的长度 == 0 的时候,一直阻塞。那么就是有数据就执行线面的mThreadPoolExecutor.execute(taskQueueRunnable);,没有数据就一直停留。这里我记得有一个惊群效应就是外面是while循环这里面还有一个while循环判断,不能用if简单判断,take已经为我们做好了。

        /**
         * 往队列里面存入可执行任务
         * @param runnable
         */
        public void putExecutableTasks(Runnable runnable){
            try {
                taskQueue.put(runnable);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    这是插入执行任务。上面的轮询会监听taskQueue是否有数据,那么我们插入一个taskQueue.put(runnable);,就会排队等待轮询。

    二、自定义处理类

    public class TaskImp implements Runnable {
        private int age = 0;
        private String name = null;
        private int current_number;
    
        public TaskImp(int age, String name, int number) {
            this.age = age;
            this.name = name;
            this.current_number = number;
        }
    
        @Override
        public void run() {
            Log.d("yanjin", "第" + current_number + "个 " + name + "的年龄是" + age);
        }
    }
    

    他肯定是实现Runable接口上面mThreadPoolExecutor.execute(taskQueueRunnable);执行的Runnable 其实是在这个处理类的run里面,这样我们就能在这里写了,也没有那么耦合了。

    三、客户端添加执行任务

        /**
         * 测试并发
         * @param view
         */
        public void test(View view) {
            ThreadPoolManager threadPoolManager = ThreadPoolManager.getInstence();
            for(int x= 0; x<1000;x++){
                if((x+1)%2==0){
                    TaskImp taskImp=new TaskImp(16,"yanjin",x+1);
                    threadPoolManager.putExecutableTasks(taskImp);
                }else{
                    Task2Imp taskImp=new Task2Imp(14,"zj",x+1);
                    threadPoolManager.putExecutableTasks(taskImp);
                }
    
    
            }
    
        }
    

    这里我创了1000个,偶数和奇数区分传入,只要一调用threadPoolManager.putExecutableTasks(taskImp);,那就立马执行。

    相关文章

      网友评论

        本文标题:Android网络请求框架核心-线程池处理高并发

        本文链接:https://www.haomeiwen.com/subject/jpnooqtx.html