美文网首页Android开发经验谈
Android开发经验谈:并发编程(线程与线程池)

Android开发经验谈:并发编程(线程与线程池)

作者: 王二蛋和他的狗 | 来源:发表于2019-03-08 10:32 被阅读4次

    一、线程

    在Android开发中,你不可能都在主线程中开发,毕竟要联网,下载数据,保存数据等操作,当然这就离不开线程。

    (当然你可以在Android4.0以前的手机里在主线程请求网络,我最早开发的时候,用的手机比较古老。。。)
    在Android中你可以随意创建线程,于是就会造成线程不可控,内存泄漏,创建线程消耗资源,线程太多了消耗资源等问题。
    具体线程怎么创建我就不在文章里描述了,毕竟这主要将并发编程。。。。

    大家知道线程不可控就好了。。。于是就需要对线程进行控制,防止一系列问题出现,这就用到了如下要讲的东西。

    二、线程池

    线程池:顾名思义,就是放线程的大池子。

    如何创建一个线程池?

    先说说几个系统的线程池:

    • FixedThreadPool 创建定长线程的线程池
    • CachedThreadPool 需要的时候建立新的线程,超时线程销毁
    • SingleThreadPool 单个线程的线程池
    • ScheduledThreadPool 可以定时的线程池,创建周期性的任务

    这几个线程池不做多余阐述,因为这些线程池的原理都与我下面要讲的有关。。。。

    ···························································································

    如何自定义线程池(先来了解几个必须知道的参数):

    corePoolSize:

    核心线程池大小,线程池中主要工作的线程的多少。

    maximumPoolSize:

    线程池最大线程数。

    keepAliveTime:

    空闲线程可保持的时间是多久,如果你启用了allowCoreThreadTimeOut方法,你的线程池里的空闲线程在这个时间段后会自动销毁,如果没启用,则只要不超过corePoolSize,空闲线程也不会销毁。

    Unit:

    keepAliveTime的时间单位

    workQueue:

    阻塞队列,当任务达到corePoolSize,就会被放入这个队列

    常见几种BlockingQueue实现

    1. ArrayBlockingQueue : 有界的数组队列
    2. LinkedBlockingQueue : 可支持有界/无界的队列,使用链表实现
    3. PriorityBlockingQueue : 优先队列,可以针对任务排序
    4. SynchronousQueue : 队列长度为1的队列,和Array有点区别就是:client thread提交到block queue会是一个阻塞过程,直到有一个worker thread连接上来poll task。

    threadFactory:

    线程工厂,主要用来创建线程;

    handler:

    表示当拒绝处理任务时的策略,也就是参数maximumPoolSize达到后丢弃处理的方法。有以下四种取值:
    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
    用户也可以实现接口RejectedExecutionHandler定制自己的策略。

    代码展示:

    
    //线程工厂
    public class TaskThreadFactory  implements ThreadFactory {
    
        private final AtomicInteger mThreadNumber = new AtomicInteger(1);
    
        private final String mNamePrefix;
    
        TaskThreadFactory(String name) {
            mNamePrefix = name + "#";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r,mNamePrefix + mThreadNumber.getAndIncrement());
    
    //        if (t.isDaemon())
    //            t.setDaemon(false);
    //
    //        if (t.getPriority() != Thread.NORM_PRIORITY)
    //            t.setPriority(Thread.NORM_PRIORITY);
    
            return t;
        }
    }
    
    //重写runnable
    public class PRunnable implements Runnable {
    
        public static final int HIGH = 1;//优先级高
        public static final int NORMAL = 2;//优先级中等
        public static final int LOW = 3;//优先级低
        @IntDef({HIGH,NORMAL,LOW})
        @Retention(RetentionPolicy.SOURCE)
        public @interface Priority{}
    
        public final int priority;
        private final Runnable runnable;
        public int serial;
    
        public PRunnable(Runnable runnable){
            this(NORMAL,runnable);
        }
    
        public PRunnable(@Priority int priority,Runnable runnable){
            this.priority = priority;
            this.runnable = runnable;
        }
    
        @Override
        public void run() {
            if (runnable != null) {
                runnable.run();
            }
        }
    
        /**
         * 线程队列方式 先进先出
         * @param r1
         * @param r2
         * @return
         */
        public static final int compareFIFO(PRunnable r1, PRunnable r2) {
            int result = r1.priority-r2.priority;
            return result==0?r1.serial-r2.serial:result;
        }
    
        /**
         * 线程队列方式 后进先出
         * @param r1
         * @param r2
         * @return
         */
        public static final int compareLIFO(PRunnable r1, PRunnable r2) {
            int result = r1.priority-r2.priority;
            return result==0?r2.serial-r1.serial:result;
        }
    }
    
    //线程池实现
    public class TaskExecutor implements Executor {
    
        private final static int QUEUE_INIT_CAPACITY = 20;
    
        private static final int CORE = 3;
    
        private static final int MAX = 5;
    
        private static final int TIMEOUT = 30 * 1000;
    
        private AtomicInteger SERIAL = new AtomicInteger(0);//主要获取添加任务
    
        public static class Config {
            public int core;
    
            public int max;
    
            public int timeout;
    
            public boolean allowCoreTimeOut;
    
            public boolean fifo;
    
            public Config(int core, int max, int timeout, boolean allowCoreTimeOut,boolean fifo) {
                this.core = core;
                this.max = max;
                this.timeout = timeout;
                this.allowCoreTimeOut = allowCoreTimeOut;
                this.fifo = fifo;
            }
        }
    
        public static Config defaultConfig = new Config(CORE, MAX, TIMEOUT, true,true);
    
        private final String name;
    
        private final Config config;
    
        private ExecutorService service;
    
        public TaskExecutor(String name) {
            this(name, defaultConfig);
        }
    
        public TaskExecutor(String name, Config config) {
            this(name, config, true);
        }
    
        public TaskExecutor(String name, Config config, boolean startup) {
            this.name = name;
            this.config = config;
    
            if (startup) {
                startup();
            }
        }
    
        public void startup() {
            synchronized (this) {
                if (service != null && !service.isShutdown()) {
                    return;
                }
    
                service = createExecutor(config);
            }
        }
    
        public void shutdown() {
            ExecutorService executor = null;
    
            synchronized (this) {
                // 交换变量
                if (service != null) {
                    executor = service;
                    service = null;
                }
            }
    
            if (executor != null) {
                // 停止线程
                if (!executor.isShutdown()) {
                    executor.shutdown();
                }
    
                // 回收变量
                executor = null;
            }
        }
    
        private void executeRunnable(PRunnable runnable) {
            synchronized (this) {
                if (service == null || service.isShutdown()) {
                    return;
                }
                runnable.serial = SERIAL.getAndIncrement();
                service.execute(runnable);
            }
        }
    
        @Override
        public void execute(Runnable runnable) {
            if (runnable instanceof PRunnable) {
                executeRunnable((PRunnable) runnable);
            }else{
                executeRunnable(new PRunnable(runnable));
            }
        }
    
        public Future<?> submit(Runnable runnable) {
            synchronized (this) {
                if (service == null || service.isShutdown()) {
                    return null;
                }
                if (runnable instanceof PRunnable) {
                    ((PRunnable) runnable).serial = SERIAL.getAndIncrement();
                    return service.submit(runnable);
                }else{
                    PRunnable pRunnable = new PRunnable(runnable);
                    pRunnable.serial = SERIAL.getAndIncrement();
                    return service.submit(pRunnable);
                }
            }
        }
    
        public void execute(Runnable runnable, @PRunnable.Priority int priority) {
            executeRunnable(new PRunnable(priority,runnable));
        }
    
        private ExecutorService createExecutor(Config config) {
            ThreadPoolExecutor service = new ThreadPoolExecutor(config.core, config.max, config.timeout,
                    TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(QUEUE_INIT_CAPACITY, config.fifo ? mQueueFIFOComparator : mQueueLIFOComparator),
                    new TaskThreadFactory(name), new ThreadPoolExecutor.DiscardPolicy());
    
            allowCoreThreadTimeOut(service, config.allowCoreTimeOut);
    
            return service;
        }
    
        public boolean isBusy() {
            synchronized (this) {
                if (service == null || service.isShutdown()) {
                    return false;
                }
                if(service instanceof ThreadPoolExecutor){
                    ThreadPoolExecutor tService = (ThreadPoolExecutor) service;
                    return tService.getActiveCount() >= tService.getCorePoolSize();
                }
                return false;
            }
        }
    
        private static final void allowCoreThreadTimeOut(ThreadPoolExecutor service, boolean value) {
            if (Build.VERSION.SDK_INT >= 9) {
                allowCoreThreadTimeOut9(service, value);
            }
        }
    
        @TargetApi(9)
        private static final void allowCoreThreadTimeOut9(ThreadPoolExecutor service, boolean value) {
            service.allowCoreThreadTimeOut(value);
        }
    
        Comparator<Runnable> mQueueFIFOComparator = new Comparator<Runnable>() {
    
            @Override
            public int compare(Runnable lhs, Runnable rhs) {
                PRunnable r1 = (PRunnable) lhs;
                PRunnable r2 = (PRunnable) rhs;
    
                return PRunnable.compareFIFO(r1, r2);
            }
        };
    
        Comparator<Runnable> mQueueLIFOComparator = new Comparator<Runnable>() {
    
            @Override
            public int compare(Runnable lhs, Runnable rhs) {
                PRunnable r1 = (PRunnable) lhs;
                PRunnable r2 = (PRunnable) rhs;
    
                return PRunnable.compareLIFO(r1, r2);
            }
        };
    
    }
    
    image.gif

    相关文章

      网友评论

        本文标题:Android开发经验谈:并发编程(线程与线程池)

        本文链接:https://www.haomeiwen.com/subject/ynilpqtx.html