美文网首页
Rxjava介绍<1>

Rxjava介绍<1>

作者: 天空在微笑 | 来源:发表于2018-01-28 21:58 被阅读7次

    Rxjava github地址
    给初学者的RxJava2.0教程------水管系列
    手把手教你使用 RxJava 2.0
    这可能是最好的RxJava 2.x 入门教程
    RxJava API文档
    RxJava 2.x 使用详解(一) 快速入门
    实战CSDN作者余志强的RxJava2操作符系列
    推荐Rxjava2 教程大集合

    RxJava2 源码解析(一)
    RxJava2 源码解析(二)

    1.作用

    RxJava的目的就是异步。
    RxJava的特点就是可以非常简便的实现异步调用,可以在逻辑复杂的代码逻辑中以比较轻易的方式实现异步调用。随着逻辑的复杂,需求的更改,代码可依然能保持极强的阅读性,在深入的使用过程中一定对这点深有体会。

    2.概念

    由于RxJava是利用观察者模式来实现一些列的操作,所以对于观察者模式中的观察者,被观察者,以及订阅、事件需要有一个了解。

    • Observable:在观察者模式中称为“被观察者”;
    • Observer:观察者模式中的“观察者”,可接收-
      Observable发送的数据;
    • subscribe:订阅,观察者与被观察者,通过subscribe()方法进行订阅;
    • Subscriber:也是一种观察者,在2.0中 它与Observer没什么实质的区别,不同的是 Subscriber要与Flowable(也是一种被观察者)联合使用,该部分内容是2.0新增的。Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable

    在新版本中,出现了两种观察者模式:

    Observable ( 被观察者 ) / Observer ( 观察者 )
    Flowable (被观察者)/ Subscriber (观察者)


    image.png

    3.工程引用

    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    // Because RxAndroid releases are few and far between, it is recommended you also
    // explicitly depend on RxJava's latest version for bug fixes and new features.
    compile 'io.reactivex.rxjava2:rxjava:2.1.8'

    4.基本使用

    Observable ( 被观察者 ) / Observer ( 观察者 ):
    Observable的创建:

    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    //执行一些其他操作
                    //.............
                    //执行完毕,触发回调,通知观察者
                    e.onNext("我来发射数据");
                }
            });
    

    Observer的创建:

    Observer<String> observer = new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                //观察者接收到通知,进行相关操作
                public void onNext(String aLong) {
                    System.out.println("我接收到数据了");
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            };
    

    订阅:

     observable.subscribe(observer);
    

    Flowable (被观察者)/ Subscriber (观察者):

      Flowable
                    .create(new FlowableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                            Log.d(TAG, "First requested = " + emitter.requested());
                            boolean flag;
                            for (int i = 0; ; i++) {
                                flag = false;
                                while (emitter.requested() == 0) {
                                    if (!flag) {
                                        Log.d(TAG, "Oh no! I can't emit value!");
                                        flag = true;
                                    }
                                }
                                emitter.onNext(i);
                                Log.d(TAG, "emit " + i + " , requested = " + emitter.requested());
                            }
                        }
                    }, BackpressureStrategy.ERROR)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Subscriber<Integer>() {
    
                        @Override
                        public void onSubscribe(Subscription s) {
                            Log.d(TAG, "onSubscribe");
                            s.request(96);
                            mSubscription = s;
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "onNext: " + integer);
    //                        try {
    //                            Thread.sleep(1000);
    //                        } catch (InterruptedException e) {
    //                            e.printStackTrace();
    //                        }
                            mSubscription.request(1);
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            Log.w(TAG, "onError: ", t);
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    5.基本使用

    RxJava 有四个基本概念:Observable (被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在完成某些操作,获得一些结果后,回调触发事件,即发出事件来通知 Observer。

    关于回调,如果理解则可以跳过这一段,如果不理解,在RxJava中可以简单的理解为:为了方便Observable和Observer交互,在Observable中,将Observer对象传入,在完成某些操作后调用Observer对象的方法,此时将触发Observer中具体实现的对应方法。
    注意:Observer是个接口,Observable是个类。

    与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() 之外,还定义了三个特殊的事件:onComplete() 和 onError(),onSubscribe()。

    onComplete(): 事件队列完结时调用该方法。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。
    onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
    onSubscribe():RxJava 2.0 中新增的,传递参数为Disposable ,Disposable 相当于RxJava1.x中的Subscription,用于解除订阅。
    注意:onComplete() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

    6.线程切换

    在RxJava中, 已经内置了4种线程选项供我们选择:

    • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
    • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
    • Schedulers.newThread() 代表一个常规的新线程
    • AndroidSchedulers.mainThread() 代表Android的主线程
    1. 简单地说,subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。
    2. 多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略。
    3. 但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次。

    相关文章

      网友评论

          本文标题:Rxjava介绍<1>

          本文链接:https://www.haomeiwen.com/subject/uhmnaxtx.html