美文网首页
RxJava2修炼之路(二)

RxJava2修炼之路(二)

作者: luweicheng24 | 来源:发表于2017-09-02 19:32 被阅读0次

    RaJava2修炼之路——易容术

    上一篇讲解了RxJava的原理之后,本篇继续深入了解一下RxJava对执行任务的线程调度。

    目的

    在每个刚学Android的小白来讲,必须牢记的的一点就是子线程不能跟新UI,所有跟新UI的操作必须放在主线程(也就是UI线程),问什么呢?因为会造成界面卡顿或者ANR啊,如果你想问什么是ANR啊?那你就先去查一下资料充点电再来看本篇文章,如果任何跟新UI的操作在子线程中程序就会奔溃。下面是我写的一个案例,利用Handler来跟新UI:

     handler = new Handler(){
                @Override
                public void handleMessage(Message msg) {
                    super.handleMessage(msg);
                    Log.d(TAG, "handleMessage: "+Thread.currentThread().getName());
                    if(msg.what == 0x01){
                        Bitmap bmp = (Bitmap) msg.obj;
                        loadPic(bmp);
                    }
                }
            };
    
      /**
         * 跟新UI
         *
         * @param bmp
         */
        private void loadPic(Bitmap bmp) {
            img.setImageBitmap(bmp);
        }
        @Override
        protected void onResume() {
            super.onResume();
            new Thread() {//开启图片下载线程
                @Override
                public void run() {
                    Bitmap bmp = loadImage();
                    Message msg = handler.obtainMessage();
                    msg.obj = bmp;
                    msg.what = 0x01;
                    handler.sendMessage(msg);
                }
            }.start();
        }
    
        /**
         * 图片下载
         *
         * @return
         */
        private Bitmap loadImage() {
            Bitmap bmp = null;
            try {
                URL url = new URL(mUrl);
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.setRequestMethod("GET");
                conn.setConnectTimeout(8 * 1000);
                conn.setReadTimeout(8 * 1000);
                InputStream is = conn.getInputStream();
                bmp = BitmapFactory.decodeStream(is);
            } catch (MalformedURLException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return bmp;
        }
    
        @Override
        protected void onDestroy() {
            super.onDestroy();
            handler.removeCallbacksAndMessages(null);// 移除所有handler的消息回调
        }
    
    

    是一个很简单的网络加载图片,然后显示在UI界面,在OnResume中开启线程执行加载图片的耗时操作,图片加载完成后将图片填充到message中传递到HandlerMessage中,执行UI跟新,Handler是Android中一块很重要的知识点,不太清楚的小伙伴去查资料补一下吧。

    实现

    我们首先看你一下RxJava中事件发送和接受分别在那个线程:

         Observable.create(new ObservableOnSubscribe<String >() {
                @Override
                public void subscribe(ObservableEmitter<String > e) throws Exception {
                    Log.d(TAG, "accept: "+Thread.currentThread().getName());
                     e.onNext("hello world");
                     e.onComplete();
                }
              }).subscribe(new Consumer<String >() {
                @Override
                public void accept(String  str) throws Exception {
                    Log.d(TAG, "accept: "+Thread.currentThread().getName());
                }
            });
            }
    

    打印结果:

    09-02 10:34:10.065 22364-22364/ruanrong.com.rxjava2demo D/tag: subscribe: main
    09-02 10:34:10.067 22364-22364/ruanrong.com.rxjava2demo D/tag: accept: main
    

    很明显,发送和接受事件默认都是在Main,也就是UI线程,那如何让发送事件在子线程,跟新事件在UI线程呢?这里引入一个新的概念,RxJava的线程调度器Scheduler,Schedulers内部包含六中调度类型

    1. Schedulers.computation( )
      用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
    2. Schedulers.from(executor)
      使用指定的Executor作为调度器
    3. Schedulers.immediate( )
      在当前线程立即开始执行任务
    4. Schedulers.io( )
      用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
    5. Schedulers.newThread( )
      为每个任务创建一个新线程
    6. Schedulers.trampoline( )
      当其它排队的任务完成后,在当前线程排队开始执行
      另外,还有一个特殊的调度类型:
    AndroidSchedulers.mainThread() 该调度类型是唯一的确定发生在UI线程的

    上面就是线程调度器的六中类型,都是Schedules类的静态常量。
    接下来就是变身的时候了:

       Observable.create(new ObservableOnSubscribe<String >() {
                @Override
                public void subscribe(ObservableEmitter<String > e) throws Exception {
                    Log.d(TAG, "subscribe: "+Thread.currentThread().getName());
                     e.onNext("hello world");
                     e.onComplete();
                }
              }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String >() {
                @Override
                public void accept(String  str) throws Exception {
                    Log.d(TAG, "accept: "+Thread.currentThread().getName());
                }
            });
            }
    

    打印结果:

    09-02 10:59:12.717 12326-12357/ruanrong.com.rxjava2demo D/tag: subscribe: RxNewThreadScheduler-1
    09-02 10:59:12.729 12326-12326/ruanrong.com.rxjava2demo D/tag: accept: main                                                             
    

    果然,完成了事件发送在子线程,跟新在主线程的操作,下面将图片的下载更新放到RxJava中来执行:

     Observable.create(new ObservableOnSubscribe<Bitmap>() {
                @Override
                public void subscribe(ObservableEmitter<Bitmap> e) throws Exception {
                    Log.d(TAG, "accept: "+Thread.currentThread().getName());
                    Bitmap bmp = loadImage();
                     e.onNext(bmp);
                     e.onComplete();
                }
            }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Bitmap>() {
                @Override
                public void accept(Bitmap bitmap) throws Exception {
                    Log.d(TAG, "accept: "+Thread.currentThread().getName());
                    img.setImageBitmap(bitmap);
                }
            });
    

    好了,图片正常显示,既然这样,如果在onSubscribe和onObserver中多次指定线程呢?比如下面这样:

          Observable.create(new ObservableOnSubscribe<String >() {
                @Override
                public void subscribe(ObservableEmitter<String > e) throws Exception {
                    Log.d(TAG, "accept: "+Thread.currentThread().getName());
                    e.onNext("hello world");
                    e.onComplete();
                }
            }).subscribeOn(Schedulers.newThread())
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .observeOn(Schedulers.newThread())
                    .subscribe(new Consumer<String >() {
                @Override
                public void accept(String  str) throws Exception {
                    Log.d(TAG, "accept: "+Thread.currentThread().getName());
                }
            });
        }
    

    打印结果:

    09-02 11:09:51.315 21666-21686/? D/tag: subscribe: RxNewThreadScheduler-1
    09-02 11:09:51.329 21666-21688/? D/tag: accept: RxNewThreadScheduler-2
    

    可以说明设置订阅事件选择离它最近的线程调度方式,而观察者选择最后的线程调度方式。
    既然完成了订阅在子线程观察在主线程,那么对于这种在处理耗时任务过程中,Activity销毁呢?这里就用到上一篇中的Disposiable对象了,当Activity销毁时终止事件发送。

     Observable.create(new ObservableOnSubscribe<Bitmap>() {
                @Override
                public void subscribe(ObservableEmitter<Bitmap> e) throws Exception {
                    Log.d(TAG, "accept: "+Thread.currentThread().getName());
                    Bitmap bmp = loadImage();
                     e.onNext(bmp);
                     e.onComplete();
                }
            }).subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Bitmap>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            mDisposable = d;
                        }
    
                        @Override
                        public void onNext(Bitmap value) {
                            img.setImageBitmap(value);
                        }
                        @Override
                        public void onError(Throwable e) {
                        }
                        @Override
                        public void onComplete() {
                        }
                    });
    
     @Override
        protected void onDestroy() {
            super.onDestroy();
            handler.removeCallbacksAndMessages(null);
            if(!mDisposable.isDisposed()){
                mDisposable.dispose();
            }
        }
    

    那如果有多个Disposable呢?这里有一个专门装Disposable的容器类CompositeDisposable,里面维护了一个`OpenHashSet<Disposable> resources;集合,所以只需要在订阅时将Disposable添加到CompositeDisposable中,在Activity销毁时将该容器Clear掉就行了。
    好了,RxJava的事件调度就是这样来改变事件执行的线程。

    相关文章

      网友评论

          本文标题:RxJava2修炼之路(二)

          本文链接:https://www.haomeiwen.com/subject/xwyrjxtx.html