美文网首页Android 开发技术源码解析
拆轮子-RxDownload2源码解析(二)

拆轮子-RxDownload2源码解析(二)

作者: 1cf2c90a5564 | 来源:发表于2018-07-11 22:07 被阅读85次

    本文为博主原创文章,未经允许不得转载

    造轮子者:Season_zlc

    轮子用法请戳作者链接↑

    前言

    本文主要讲述 RxDownload2 的线程调度

    下载任务分发线程

    顾名思义,就是分发下载任务的线程。该线程运行在 DownloadService 中,从业务上看,DownloadService应当仅被 start() & bind() 一次。任务分发线程,在 onBind() 时创建:

    1. start & bind service [-> RxDownload.java]
        /**
         * start and bind service.
         *
         * @param callback Called when service connected.
         */
        private void startBindServiceAndDo(final ServiceConnectedCallback callback) {
            Intent intent = new Intent(context, DownloadService.class);
            intent.putExtra(DownloadService.INTENT_KEY, maxDownloadNumber);
            context.startService(intent);
            context.bindService(intent, new ServiceConnection() {
                @Override
                public void onServiceConnected(ComponentName name, IBinder binder) {
                    DownloadService.DownloadBinder downloadBinder
                            = (DownloadService.DownloadBinder) binder;
                    downloadService = downloadBinder.getService();
                    context.unbindService(this);
                    bound = true;
                    callback.call();
                }
    
                @Override
                public void onServiceDisconnected(ComponentName name) {
                    //注意!!这个方法只会在系统杀掉Service时才会调用!!
                    bound = false;
                }
            }, Context.BIND_AUTO_CREATE);
        }
    

    上述代码有个细节,onServiceConnected() 中马上调了 unbindService()

    1. onBind [-> DownloadService.java]
        @Nullable
        @Override
        public IBinder onBind(Intent intent) {
            log("bind Download Service");
            startDispatch();
            return mBinder;
        }
    
    1. startDispatch() [-> DownloadService.java]
        /**
         * start dispatch download queue.
         */
        private void startDispatch() {
            disposable = Observable
                    .create(new ObservableOnSubscribe<DownloadMission>() {
                        @Override
                        public void subscribe(ObservableEmitter<DownloadMission> emitter) throws Exception {
                            DownloadMission mission;
                            while (!emitter.isDisposed()) {
                                try {
                                    log(WAITING_FOR_MISSION_COME);
                                    mission = downloadQueue.take();
                                    log(Constant.MISSION_COMING);
                                } catch (InterruptedException e) {
                                    log("Interrupt blocking queue.");
                                    continue;
                                }
                                emitter.onNext(mission);
                            }
                            emitter.onComplete();
                        }
                    })
                    .subscribeOn(Schedulers.newThread())
                    .subscribe(new Consumer<DownloadMission>() {
                        @Override
                        public void accept(DownloadMission mission) throws Exception {
                            mission.start(semaphore);
                        }
                    });
        }
    
    • 上述代码就是下载任务分发线程的实现。其中 .subscribeOn(Schedulers.newThread()) 表明该线程通过 new Thread() 的方式产生的
    • 下载任务用一个阻塞队列来维护,阻塞队列内部已经实现了同步机制,所以无需担心并发问题
    • 只要没有取消订阅,该线程就会不停的尝试从阻塞队列中获取下载任务并发射。如果队列为空,就会一直阻塞,直到有新任务入队
    • 上述代码是不严谨的,对 disposable 重新赋值前,没有先尝试对其取消订阅。如果多次调用 bindService() ,就会出现线程泄露

    下载任务执行线程

    顾名思义,就是下载任务的执行线程。该线程运行在 Schedulers.io() 线程池上。入参信号量用来限制同时下载的最大任务数。

    1. start(final Semaphore semaphore) [-> SingleMission.java]
        @Override
        public void start(final Semaphore semaphore) {
            disposable = start(bean, semaphore, new MissionCallback() {
                @Override
                public void start() {
                    // 回调开始下载
                    if (callback != null) callback.start();
                }
    
                @Override
                public void next(DownloadStatus value) {
                    // 回调下载中
                    status = value;
                    processor.onNext(started(value));
                    if (callback != null) callback.next(value);
                }
    
                @Override
                public void error(Throwable throwable) {
                    // 回调下载失败
                    processor.onNext(failed(status, throwable));
                    if (callback != null) callback.error(throwable);
                }
    
                @Override
                public void complete() {
                    // 回调下载完成
                    processor.onNext(completed(status));
                    if (callback != null) callback.complete();
                }
            });
        }
    
    1. start(DownloadBean bean, final Semaphore semaphore, final MissionCallback callback) [-> DownloadMission.java]
        protected Disposable start(DownloadBean bean, final Semaphore semaphore,
                                   final MissionCallback callback) {
            return rxdownload.download(bean)
                    .subscribeOn(Schedulers.io()) // 指定下载任务执行线程
                    .doOnLifecycle(new Consumer<Disposable>() {
                        @Override
                        public void accept(Disposable disposable) throws Exception {
                            if (canceled.get()) {
                                dispose(disposable);
                            }
    
                            log(TRY_TO_ACQUIRE_SEMAPHORE);
                            // 申请信号量
                            semaphore.acquire();
                            log(ACQUIRE_SUCCESS);
                            
                            // 获得信号量后,需再次检测是否已经暂停下载
                            if (canceled.get()) {
                                // 已经暂停,则取消订阅,释放信号量
                                dispose(disposable);
                            } else {
                                callback.start();
                            }
                        }
                    }, new Action() {
                        @Override
                        public void run() throws Exception {
                            // 取消订阅时,需要释放信号量
                            semaphore.release();
                        }
                    })
                    .subscribe(new Consumer<DownloadStatus>() {
                        @Override
                        public void accept(DownloadStatus value) throws Exception {
                             // 回调下载进度
                            callback.next(value);
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            // 回调下载失败
                            callback.error(throwable);
                        }
                    }, new Action() {
                        @Override
                        public void run() throws Exception {
                            // 回调下载完成
                            callback.complete();
                        }
                    });
        }
    

    下载任务中断线程

    顾名思义,就是中断下载任务的线程。包括暂停、删除、全部暂停、全部取消四个操作。这些操作也运行在 Schedulers.io() 线程池上。

    1. pauseServiceDownload(final String missionId) [-> RxDownlaod.java]
        /**
         * Pause download.
         * <p>
         * Pause a url or all tasks belonging to missionId.
         *
         * @param missionId url or missionId
         */
        public Observable<?> pauseServiceDownload(final String missionId) {
            // createGeneralObservable 是一个异步绑定下载服务的Observable,通过资源数为1的信号量实现强制同步
            return createGeneralObservable(new GeneralObservableCallback() {
                @Override
                public void call() {
                    // 服务绑定后,调用服务的暂停下载
                    downloadService.pauseDownload(missionId);
                }
            }).observeOn(AndroidSchedulers.mainThread());
        }
    
    1. createGeneralObservable(final GeneralObservableCallback callback) [-> RxDownload.java]
        /**
         * return general observable
         *
         * @param callback Called when observable created.
         * @return Observable
         */
        private Observable<?> createGeneralObservable(final GeneralObservableCallback callback) {
            // 方法名起的不好,应该叫 bindService
            return Observable.create(new ObservableOnSubscribe<Object>() {
                @Override
                public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
                    if (!bound) {
                        // 因为 onServiceConnected 是异步回调的,所以这里用了个资源数为1的信号量实现强制同步(CountDownLatch也可以实现强制同步)
                        semaphore.acquire();
                        if (!bound) {
                            startBindServiceAndDo(new ServiceConnectedCallback() {
                                @Override
                                public void call() {
                                    // 服务绑定后,回调 callback
                                    doCall(callback, emitter);
                                    // 释放信号量
                                    semaphore.release();
                                }
                            });
                        } else {
                            doCall(callback, emitter);
                            semaphore.release();
                        }
                    } else {
                        doCall(callback, emitter);
                    }
                }
            }).subscribeOn(Schedulers.io()); // 指定在 io 线程执行,所以暂停下载也是在这个线程执行
        }
    

    同理,删除下载也会先调用 createGeneralObservable(),所以删除操作也是在 Schedulers.io() 上执行的。

    总结

    • 一个单独的线程用来分发下载任务:Schedulers.newThread()
    • 每次从线程池中取一个线程执行下载任务:Schedulers.io()
    • 每次从线程池中取一个线程执行暂停、删除下载任务:Schedulers.io()

    相关文章

      网友评论

      本文标题:拆轮子-RxDownload2源码解析(二)

      本文链接:https://www.haomeiwen.com/subject/oypauftx.html