美文网首页
框架-RxJava之Observable

框架-RxJava之Observable

作者: 剩下的只有自己 | 来源:发表于2016-09-20 17:49 被阅读101次

    创建一个Observable

    1. Observable.create()
      该方法接收一个Obsubscribe对象
    Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
    
                }
            });
    

    来个例子:

            Observable<Integer> observable=Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    for(int i=0;i<5;i++){
                        subscriber.onNext(i);
                    }
                    subscriber.onCompleted();
                }
            });
            //Observable.subscribe(Observer),Observer订阅了Observable
            Subscription subscribe = observable.subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "完成");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "异常");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG, "接收Obsverable中发射的值:" + integer);
                }
            });
    输出:
    
     接收Obsverable中发射的值:0
    接收Obsverable中发射的值:1
    接收Obsverable中发射的值:2
    接收Obsverable中发射的值:3
    接收Obsverable中发射的值:4
    完成
    

    从上面的例子可以看出,在Observer订阅了Observable后,Observer作为OnSubscribe中call方法的参数传入,从而调用了Observer的相关方法

    1. Observable.from()
      该方法需要一个 数组或集合参数
      假如现在我们有一个集合,我们能否也像上面一样但不使用for循环,一个个发送给Observer,发送完后再调用onCompleted方法呢?
            //创建一个集合
            List<Integer> list=new ArrayList<>();
            for(int i=0;i<5;i++){
                list.add(i);
            }
            //使用Observable.from接收上面的list集合
            Observable<Integer> observable=Observable.from(list);
            //Observer订阅Observable
            Subscription subscribe = observable.subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "完成");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "异常");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG, "接收Obsverable中发射的值:" + integer);
                }
            });
    输出:
    
     接收Obsverable中发射的值:0
    接收Obsverable中发射的值:1
    接收Obsverable中发射的值:2
    接收Obsverable中发射的值:3
    接收Obsverable中发射的值:4
    完成
    

    可以看到使用Observable.from()和1中的效果一样

    1. Observable.just()
      该方法可接收 1到9同一任意类型的参数
      假如我们有一个带返回值的方法,我们也想使用Observable对其操作,该怎么办呢?
      private List<Integer> getDatas(){//提供数据的方法
            List<Integer> list=new ArrayList<>();
            for(int i=0;i<5;i++){
                list.add(i);
            }
            return list;
        }
    
            //使用Observable.just调用上面的getDatas()方法,注意这里得到的Observable的类型为
            //List<Integer>,而非Integer
            Observable<List<Integer>> observable=Observable.just(getDatas());
            Subscription subscribe = observable.subscribe(new Observer<List<Integer>>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "完成");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "异常");
                }
    
                @Override
                public void onNext(List<Integer> integers) {
                    for(Integer integer: integers)
                        Log.e(TAG, "接收Obsverable中发射的值:" + integer);
                }
            });
    输出:
    
     接收Obsverable中发射的值:0
    接收Obsverable中发射的值:1
    接收Obsverable中发射的值:2
    接收Obsverable中发射的值:3
    接收Obsverable中发射的值:4
    完成
    

    从上面的代码,我们可以看出just与from的区别,just是将集合直接作为一个参数发送给Observer,而from是将
    集合中所有的元素一个一个的发送给Observer。假如上面代码中的getDatas方法返回的不是一个集合,我们也就不能使用from了应该使用just。
    在发送数据后,just也会自动调用onCompleted方法。

    1. 其他方式:
      当我们需要一个Observable毫无理由的不再发射数据正常结束时,我们可以使用empty()
      。我们可以使用never()
      创建一个不发射数据并且也永远不会结束的Observable。我们也可以使用throw()
      创建一个不发射数据并且以错误结束的Observable。

    特殊类Subject

    为啥说它特殊呢?
    因为它继承了Observable并且实现Observer接口,所以说它即可已是一个Observer又可以是一个Observable,它包含4个子类:

    • PublishSubject
    • BehaviorSubject
    • ReplaySubject
    • AsyncSubject
    1. PublishSubject
            //创建一个Subject这里看起来是不是缺点什么,对比上面的Observable.create()方法
            PublishSubject<String> subject = PublishSubject.create();
            
            Subscription subscription = subject.subscribe(new Observer<String>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "完成");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "异常");
                }
    
                @Override
                public void onNext(String s) {
                    Log.e(TAG,"接收PublishSubject发送的值:"+s);
                }
            });
            //手动发送数据给订阅它的Observer
            subject.onNext("PublishSubject");
    输出:
    接收PublishSubject发送的值:PublishSubject
    

    从上面代码可以看出,我们需手动发送数据给订阅它的Observer。所以这里就与Observable不同:
    在Observable中,如果有订阅者订阅了它,它就会马上自动发送数据给Observer。
    而对于Subject而言,它在被订阅的时候并不会自动发送数据给Observer,发送数据的控制权交给了我们,在我们发送数据之前,订阅的Observer会一直处于等待状态,但是这种等待并不会阻塞线程,也不会消耗太多的资源。
    当然我们也可以像Observable一样使用Subject,没有区别。

    1. ReplaySubject
            //PublishSubject<Integer> subject=PublishSubject.create();
            ReplaySubject<Integer> subject= ReplaySubject.create();
            subject.onNext(1);// 1
            subject.subscribe(new Action1<Integer>() {// 2
                @Override
                public void call(Integer integer) {
                    Log.e(TAG,""+integer);
                }
            });
            subject.onNext(2);// 3
            subject.onNext(3);// 4
            subject.onNext(4);// 5
    

    这里没有给出输出,那我们先来猜猜如果上面的subject是PublishSubject情况下输出的是什么吧?
    执行到1时,由于subject没有订阅者订阅,所以发送出去的数据1也就没有订阅者接收
    执行到2时,subject有订阅者订阅了(这里的Action1相当于一个实现了onNext方法的Observer对象)
    执行到3 4 5时,subject发送的数据都会被2中的订阅者接收到,从而输出2、3、4。
    PublishSubject发送数据时,会将数据发送给订阅了它的所有Observer
    那subject是ReplaySubject输出的是怎样呢?
    Replay是不是重新播放的意思呢,这里的重新播放是指 当Observer订阅ReplaySubject时,会将ReplaySubject之前发送过的数据,重新发送给该Observer,所以这里会输出1 2 3 4。
    再来看看另外2个ReplaySubject.createXXX()方法
    - ReplaySubject.createWithSize(int size)
    如果在Observer订阅该ReplaySubject前,ReplaySubject发送数据的个数大于size,那么超出size部分的数据
    不会发送给Observer。

            ReplaySubject<Integer> subject= ReplaySubject.createWithSize(2);
            subject.onNext(1);
            subject.onNext(2);
            subject.onNext(3);
            subject.subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    Log.e(TAG,""+integer);
                }
            });
            subject.onNext(4);
    输出:2 3 4
    
    - ReplaySubject.createWithTime(int time, TimeUnit unit, Scheduler scheduler);
    

    需要3个参数,第12个用来确定时间,第3个传入一个Scheduler
    该方法表示如果ReplaySubject发送数据的时间超过了指定的时间,将不会重新发送给新订阅的Observer

    1. BehaviorSubject
            BehaviorSubject subject=BehaviorSubject.create();
            subject.onNext(1);
            subject.onNext(2);
            subject.onNext(3);
            subject.subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    Log.e(TAG,""+integer);
                }
            });
            subject.onNext(4);
            subject.onNext(5);
    输出:
    3 4 5
    

    BehaviorSubject可以当作ReplaySubject来看,它只接收Observer订阅前BehaviorSubject发送的最后一条数据。

    1. AsyncSubject
            AsyncSubject subject=AsyncSubject.create();
            subject.onNext(1);
            subject.subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    Log.e(TAG,"A"+integer);
                }
            });
            subject.onNext(2);
            subject.onNext(3);
            subject.onCompleted();
            subject.subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    Log.e(TAG,"B"+integer);
                }
            });
    输出:
    A3 B3
    

    AsyncSubject会将执行过的一个完整的事件缓存起来(最后一个subject.onNext() +subject.onCompleted()),然后会发送给所有订阅它的Observer。

    相关文章

      网友评论

          本文标题:框架-RxJava之Observable

          本文链接:https://www.haomeiwen.com/subject/qyxoettx.html