上车RxJava2(一)

作者: 树獭非懒 | 来源:发表于2019-01-21 17:01 被阅读13次

    RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。

    RxJava的观察者模式

    如果不知道观察者设计模式的话,建议先传送到这里:

    观察者设计模式

    RxJava它有四个概念

    • Observer(观察者)
    • Observable(被观察者)
    • subscribe(订阅)
    • 事件

    以上的几个概念和普通观察者模式基本一样,但是RxJava 的事件回调方法除了普通事件 onNext() 之外,还定义了两个特殊的事件:onCompleted() 和 onError()。

    这两个事件都是在事件结束的时候一种标记,并且必须有且只有一个,
    onCompleted() 标记事件流的完成,onError() 标记事件的异常结束,终止其他事件的发出。

    RxJava的基本使用

    根据以上的概念,RxJava的基本使用如下:

    1.创建观察者 -Observer

    观察者会对被观察者的事件触发做出响应。比如被观察者 发起onNext 事件就会回调观察者的 onNext 方法。但在发起onNext事件之前,被观察者需要订阅观察者,这个时候就会回调 onSubscribe 方法。而 onError 和 onComplete 方法分别是在被观察中发起 onComplete 和 onError 事件时调用,关于这两者的区别,刚刚提过。

     public Observer<String> getObserver(){
           return new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe: ");
                }
    
                @Override
                public void onNext(String s) {
                    Log.d(TAG, "onNext: ->"+s);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError: ");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete: Complete");
                }
            };
        }
    

    2.被观察者 -Observable

    public Observable<String> getObservable(){
            return Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("吃饭");
                    emitter.onNext("睡觉");
                    emitter.onNext("打豆豆");
                    emitter.onComplete();
                }
            });
        }
    

    这个方法里的内容也可以这么写

    Observable<String> observable = Observable.just("吃饭","睡觉","打豆豆");
    
    Observable<String> observable = Observable.fromArray("吃饭","睡觉","打豆豆");
    

    3.订阅-subscribe

    创建好了 Observer 和 Observable,需要用订阅关系也就是 subScribe方法 将它们建立连接,

     Observable<String> observable = getObservable();  //创建被观察者
     Observer<String> observer = getObserver(); //创建观察者
     observable.subscribe(observer);    //订阅
    

    4.一步式写法

    如果感觉上面的写法有点麻烦也可以用下面的方法,一步写成

    Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("吃饭");
                emitter.onNext("睡觉");
                emitter.onNext("打豆豆");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
    
            }
    
            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: ->"+s);
                sb.append(s);
                contentText.setText(sb+"->");  
            }
    
            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }
    
            @Override
            public void onComplete() {
    
            }
        });
    

    在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。如果不指定线程的话,在哪个线程调用 subscribe(),就在哪个线程生产事件。上面的例子它就是在主线程中,所以在 onNext 方法里,我对UI进行了更新是没有抛异常的。这样它实现出来的只是一个同步的观察者模式。

    但但但,,,但是异步对于 RxJava 是及其重要的

    RxJava 的线程控制

    RxJava的异步,比如进行网络请求(在io线程中)后更新ui(主线程)操作,这就要对 RxJava 的线程进行控制。RxJava 的线程控制需要用到调度器-Scheduler

    RxJava 内置了几个调度器:

    • Schedulers.immediate():这是默认的 Scheduler。它直接在当前线程运行,不指定线程。
    • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
    • Schedulers.io():I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。它和
    • newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
    • Schedulers.computation(): 计算所使用的Scheduler。这个计算指的是CPU密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    • AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行,由于更新ui。

    下面我们用一个栗子来学习调度器的用法:

    进行一个网络请求(需要切换到io线程中),获得返回的请求数据并更新UI(切断到主线程)

    添加以下依赖库

        implementation 'io.reactivex.rxjava2:rxjava:2.2.5'
        implementation 'com.squareup.retrofit2:retrofit:2.5.0'
        implementation 'io.reactivex:rxandroid:1.2.1'
        implementation 'com.squareup.retrofit2:converter-gson:2.5.0'
        implementation 'com.squareup.retrofit2:adapter-rxjava:2.5.0'
    

    这里的网络请求我们借助Retrofit框架

    关于Retrofit这里就不介绍了,在后面笔者会关于它单独写一篇

    定义一个接口

    public interface Books {
        @GET("chaxunyuyue/")
        Call<List<BookInfo>> getBookInfo(@Query("username") String username);
    }
    

    针对返回的数据写一个javaBean

    public class BookInfo {
        private String yuyue_address;
        private String yuyue_reason;
        private String yuyue_time;
        
        //....省略所以getter和setter方法
    }
    

    开始网络请求并更新ui

      private void queryBookInfo(final String username){
            String url = "http://smpark.chzu.edu.cn:8081/ipv6/";
            Retrofit retrofit = new Retrofit.Builder()
                    .baseUrl(url)
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                    .build();
            final Books books = retrofit.create(Books.class);
            Observable.create(new ObservableOnSubscribe<BookInfo>() {
                @Override
                public void subscribe(ObservableEmitter<BookInfo> emitter) throws Exception {
                    List<BookInfo> bookInfos = books.getBookInfo(username).execute().body();
                    Log.d(TAG, "subscribe: " + bookInfos.get(0).getYuyue_address());
                    BookInfo bookInfo = bookInfos.get(0);
                    emitter.onNext(bookInfo);
                }
            })
                    .subscribeOn(Schedulers.io())                 // 1.指定 subscribe() 发生在 IO 线程
                    .subscribe(new Observer<BookInfo>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(BookInfo bookInfo) {
                           // Log.d(TAG, "onNext: " + bookInfo.getYuyue_reason());
                            String bookAddress = bookInfo.getYuyue_address();
                            Log.d(TAG, "onNext: " + bookAddress);
                            tvContent.setText("地址 :" + bookAddress);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: " + e.toString());
                            e.printStackTrace();
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete: ");
                        }
                    });
        }
    

    上面代码我用了一段式写法,在创建被观察者之后使用了调度器

    .subscribeOn(Schedulers.io())
    

    这样就把subscribe() 调度到 IO 线程中执行,开始我以为调度的同时也会把回调的结果 onNext() 等调度到IO线程,看别人的博客都会加上这样一句话

    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    

    但是发现我更新ui的时候并没有抛异常,所以我觉得它没有将回调结果放于io线程中也有可能它自动切换到主线程中了。

    总结

    RxJava 的本质可以理解为异步这一个词,这篇博客笔者主要介绍了Rxjava的基本使用和线程控制,在下一篇博客,笔者会介绍它的其他内容。

    相关文章

      网友评论

        本文标题:上车RxJava2(一)

        本文链接:https://www.haomeiwen.com/subject/mgacjqtx.html