RxJava 从入门到放弃

作者: lovejjfg | 来源:发表于2016-09-03 17:59 被阅读4784次

    叫这个题目也是因为这篇博客写了太久太久了!有段时间都觉得完全没有必要写下去的,索性终于完工了,也算是对这段时间的肯定吧!

    RxJava基本概念

    RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。ObservableObserver 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer

    接下来就围绕Observable创建、Observer创建、线程切换、事件类型转换、订阅和取消订阅展开。

    Observerble的基本创建方式:

    1、create()//最基本的

    Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("xxixxii");
                subscriber.onCompleted();//这里必须调用该方法或者onError(),通知订阅者发送完毕,否者无法进行解除订阅。
    
            }
        });
    

    2、form()//适配集合等

        ArrayList<Student> students = new ArrayList<>();
        students.add(s1);
        students.add(s2);
        students.add(s3);
        students.add(s1);
        students.add(s4);
        students.add(s5);
        students.add(s6);
    
        Observable.from(students)
    

    3、just()//适配已经写好的方法

        Student s1 = new Student(19, "xiaoqiang");
        Student s2 = new Student(19, "xiaoqiang1");
        Student s3 = new Student(20, "xiaoqiang1");
        Student s4 = new Student(19, "xiaoqiang");
        Student s5 = new Student(21, "xiaoqiang");
        Student s6 = new Student(22, "xiaoqiang");
    
        Observable.just(s1, s2, s3, s4, s5, s6)
    

    4、merge(o1,02)//将多个合并为一个
    5、concat(o1,o2)//one by one emit!

    Observer Subscriber的创建

         Subscriber<String> stringSubscriber = new Subscriber<String>() {
            @Override
            public void onStart() {
                Log.e(TAG, "onStart: ");
            }
    
            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted: ");
    
            }
    
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: ");
            }
    
            @Override
            public void onNext(String s) {
                Log.e(TAG, "onNext: " + s);
            }
        };
    
        Observer<String> stringObserver = new Observer<String>() {
            @Override
            public void onCompleted() {
    
            }
    
            @Override
            public void onError(Throwable e) {
    
            }
    
            @Override
            public void onNext(String s) {
    
            }
        };
    

    这里需要注意:ObserverSubscriber不仅基本使用方式一样,实质上,在 RxJavasubscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 ObserverSubscriber 是完全一样的。它们的区别对于使用者来说主要有两点:

    onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。

    unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed()先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onStop()onDestory()等方法中)调用unsubscribe() 来解除引用关系,以避免内存泄露的发生。

    强大的条件筛选

    说了这么多没用的东西,肯定要来点儿实际的才能体会到RxJava的强大功能!

    1、take( )只发送指定数量的事件。
    2、filter( )过滤指定条件的事件。
    3、first()只发送第一个事件。
    4、distinct( )只发送不同的事件。(怎么定义为不同?!)
    其实还有很多。。。

    随时随地线程切换

    说完创建过滤你可能觉得这也没撒嘛!那么接下来想想之前在Android开发里要切换线程需要怎么处理呢?view.post()或者使用handler.sendMessage()!而在RxJava中,线程切换不用这么搞了,Schedulers是RxJava中用来管理相关线程调度的,基于订阅和被订阅,这里有两个方法!
    1、subscribeOn() 事件产生在哪个线程。

    2、observeOn()事件消费在哪个线程。

    3、Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
    4、Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

    5、Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。其行为模式和 newThread()是差不多滴,但是区别在于io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io()newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。
    6、Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation()中,否则 I/O 操作的等待时间会浪费 CPU。
    7、AndroidSchedulers.mainThread():Android 特供,它指定的操作将在 Android 主线程运行。

    transform 转换

    强大的内部转换功能,让你可以做到要什么就是什么。

    1、map():进行对象转换,不会创建新的Observable
    2、flatMap():也是进行对象转换,会创建新的Observable
    3、buffer()、:缓冲区,缓冲指定的Observable包装成新的
    4、Observable发射。
    5、toList():将单个的对象转换为集合。

    取消订阅

    爽了之后重视要记住一件事,那就是要释放相关资源!不然后果也是很严重的,尤其是在使用RxView相关的方法时会警告你需要调用Unsubscribe()来释放相关的引用。

    warn.png

    释放操作其实很简单。定义一个集合维护相关的Subscription,然后在ActivityonStop()或者onDestroy()方法中释放相关资源。

        Subscription clickSubscribe = RxView.clicks(findViewById(R.id.bt))
                .throttleFirst(1, TimeUnit.SECONDS)
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG, "clicks->doOnUnsubscribe");
                    }
                })
                .subscribe(new Action1<Void>() {
                    @Override
                    public void call(Void aVoid) {
                        methd6();
                    }
                });
        //维护相关的资源引用
        subscriptions.add(clickSubscribe);
    
    @Override
    protected void onDestroy() {
        for (Subscription s : subscriptions) {
            if (!s.isUnsubscribed()) {
                s.unsubscribe();
                Log.e(TAG, "onDestroy: 取消订阅!");
            }
        }
        super.onDestroy();
    }
    

    手动create()一个Observable的话,一定要调用 onComplete()或者onError()来结束这个事件,不然资源也不会被释放的。

    动手时间

    练习一

    统计集合中年龄大于20的学生姓名(年龄姓名一致的视为同一个!)
    首先当然是创建Observable

        //初始化数据
        Student s1 = new Student(19, "xiaoqiang0");
        Student s2 = new Student(20, "xiaoqiang1");
        Student s3 = new Student(21, "xiaoqiang2");
        Student s4 = new Student(22, "xiaoqiang3");
        Student s5 = new Student(23, "xiaoqiang4");
        Student s6 = new Student(24, "xiaoqiang5");
        Student s7 = new Student(25, "xiaoqiang6");
        Student s8 = new Student(25, "xiaoqiang5");
        students = new ArrayList<>();
        students.add(s1);
        students.add(s2);
        students.add(s3);
        students.add(s1);
        students.add(s4);
        students.add(s5);
        students.add(s6);
        students.add(s7);
        students.add(s8);
    
            Observable.just(students)//创建Observable
                .flatMap(new Func1<ArrayList<Student>, Observable<Student>>() {
                    @Override
                    public Observable<Student> call(ArrayList<Student> students) {
                        //变换为新的Observable
                        return Observable.from(students);
                    }
                })
                //过滤掉年龄和姓名相同的对象
                .distinct()
                //过滤掉年龄小于20的对象
                .filter(new Func1<Student, Boolean>() {
                    @Override
                    public Boolean call(Student student) {
                        return student.getAge() >= 20;
                    }
    
                })
                //将事件对象由Student 转换为 String
                .map(new Func1<Student, String>() {
                    @Override
                    public String call(Student student) {
                        return student.getName();
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG, "call: 取消订阅了!!");
    
                    }
                })
                //订阅
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
    
                    }
    
                    @Override
                    public void onError(Throwable e) {
    
                    }
    
                    @Override
                    public void onNext(String s) {
                        Log.e(TAG, "onNext: " + s);
                    }
                });
    
    
    E/MainActivity: onNext: xiaoqiang1
    E/MainActivity: onNext: xiaoqiang2
    E/MainActivity: onNext: xiaoqiang3
    E/MainActivity: onNext: xiaoqiang4
    E/MainActivity: onNext: xiaoqiang5
    E/MainActivity: onNext: xiaoqiang6
    E/MainActivity: onNext: xiaoqiang5
    E/MainActivity: call: 取消订阅了!!
    

    PS:至于这里对象的唯一性判断是通过复写equal()hashCode()来实现的!

    @Override
    public boolean equals(Object o) {
        return o instanceof Student && this.getAge() == ((Student) o).getAge() && this.getName().equals(((Student) o).getName());
    }
    
    @Override
    public int hashCode() {
        return Arrays.hashCode(new Object[]{getAge(), getName()});
    }
    

    练习二

    concat()使用:本地有缓存读取本地的数据,没有走网络请求并缓存到本地。

    Observable<String> netObservable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                Log.e(TAG, "走网络了!!");
                subscriber.onNext("这是缓存数据!!");
                subscriber.onCompleted();
            }
        }).doOnNext(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e(TAG, "call: 保存数据到本地");
                rxPreferences.getString("cash").asAction().call(s);
    
            }
        });
        Observable<String> nativeObservable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                if (TextUtils.isEmpty(rxPreferences.getString("cash").get())) {
                    Log.e(TAG, "没有缓存,走网络!");
                    subscriber.onCompleted();
                } else {
                    Log.e(TAG, "有缓存!");
                    subscriber.onNext(rxPreferences.getString("cash").get());
                    subscriber.onCompleted();
                }
            }
        });
    
        Observable.concat(nativeObservable, netObservable)
                .first()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        Log.e(TAG, "完成了!");
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "错误了!");
                    }
    
                    @Override
                    public void onNext(String s) {
                        Log.e(TAG, s);
                    }
                });
    
       //第一次              
     E/MainActivity: 没有缓存,走网络!
     E/MainActivity: 走网络了!!
     E/MainActivity: call: 保存数据到本地
     E/MainActivity: 这是缓存数据!!
     E/MainActivity: 完成了!
     //第二次
     E/MainActivity: 有缓存!
     E/MainActivity: 这是缓存数据!!
     E/MainActivity: 完成了!
    

    练习三

    merge()使用:服务端和本地都有相关数据,汇总展示。使用merge()对应的事件顺序是无序的,谁先产生了谁就先发送!

        Observable<String> just = Observable.just("S", "O", "S")
                .subscribeOn(Schedulers.newThread())
                .doOnNext(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        SystemClock.sleep(20);
                    }
                });
    
        Observable<String> just1 = Observable.just("S","T","R").subscribeOn(Schedulers.newThread())
                .doOnNext(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        SystemClock.sleep(20);
                    }
                });
    
        Observable.merge(just1, just)
                .subscribeOn(Schedulers.newThread())
                .distinct()
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e(TAG, "call: " + s);
                    }
                });
    
    
     E/MainActivity: call: S
     E/MainActivity: call: T
     E/MainActivity: call: R
     E/MainActivity: call: O
    //或者这样
     E/MainActivity: call: S
     E/MainActivity: call: T
     E/MainActivity: call: O
     E/MainActivity: call: R
    

    练习四

    RxView和RxCompoundButton的使用:

    RxCompoundButton.checked(checkBox).call(rxPreferences.getBoolean("checked").get());
        //noinspection ConstantConditions
        Subscription checkedSubscription1 = RxCompoundButton.checkedChanges(checkBox)
                .subscribe(rxPreferences.getBoolean("checked").asAction());
        subscriptions.add(checkedSubscription1);
    
        Subscription checkedSubscription = rxPreferences.getBoolean("checked")
                .asObservable()
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG, "rxPreferences->doOnUnsubscribe");
    
                    }
                })
                .subscribe(new Subscriber<Boolean>() {
                    @Override
                    public void onCompleted() {
                        Log.e(TAG, "rxPreferences->onNext: +onCompleted");
                    }
    
                    @Override
                    public void onError(Throwable e) {
    
                    }
    
                    @Override
                    public void onNext(Boolean aBoolean) {
                        Log.e(TAG, "rxPreferences->onNext: " + aBoolean);
                    }
                });
        subscriptions.add(checkedSubscription);
    

    练习五

    RxJava和Retrofit的搭配使用:

       Subscription beforeSubscribe = BaseDataManager.getDailyApiService()
            .getBeforeDailyStories(date)
                    .subscribeOn(Schedulers.io())//事件产生在子线程
                    .doOnSubscribe(new Action0() {//subscribe之后,事件发送前执行。
                        @Override
                        public void call() {
                            isLoadingMore = true;
                            Log.e("TAG", "call: true");
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<DailyStories>() {
                        @Override
                        public void call(DailyStories dailyStories) {
                            mView.onLoadMore(dailyStories);
                            mView.isLoadingMore(false);
                            Log.e("TAG", "call: false");
                            isLoadingMore = false;
                        }
                    }, new Action1<Throwable>() {
                        @Override
                        public void call(Throwable throwable) {
                            mView.onLoadError(throwable.toString());
                            isLoadingMore = false;
                        }
                    });
            subscribe(beforeSubscribe);
    

    相关回调

    对于Observable,这里有一系列的回调方法,作用在不同的时期,其中常用的是doOnSubscribe()doOnNext()
    另外在Subscriber里,还有一个onStart()的方法!

    callBack.png
    Observable.just("L", "O", "V", "E")
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG, "call: doOnSubscribe");
    
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG, "call: doOnUnsubscribe");
                    }
                })
                .doOnEach(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        Log.e(TAG, "doOnEach: onCompleted");
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "doOnEach: onError");
                    }
    
                    @Override
                    public void onNext(String s) {
                        Log.e(TAG, "doOnEach: onNext:"+s);
                    }
                })
                .doOnNext(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e(TAG, "call: doOnNext");
                    }
                })
                .doOnRequest(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.e(TAG, "call: doOnRequest");
                    }
                })
                .doOnTerminate(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG, "call: doOnTerminate");
                    }
                })
                .doAfterTerminate(new Action0() {
                    @Override
                    public void call() {
                        Log.e(TAG, "call: doAfterTerminate");
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        Log.e(TAG, "subscribe->call: onCompleted");
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "subscribe->call: onError");
                    }
    
                    @Override
                    public void onNext(String s) {
                        Log.e(TAG, "subscribe->call: onNext:" + s);
                    }
                });
    
    E/MainActivity: subscribe->call: onStart:
    E/MainActivity: call: doOnRequest
    E/MainActivity: call: doOnSubscribe
    E/MainActivity: doOnEach: onNext:L
    E/MainActivity: call: doOnNext
    E/MainActivity: subscribe->call: onNext:L
    E/MainActivity: doOnEach: onNext:O
    E/MainActivity: call: doOnNext
    E/MainActivity: subscribe->call: onNext:O
    E/MainActivity: doOnEach: onNext:V
    E/MainActivity: call: doOnNext
    E/MainActivity: subscribe->call: onNext:V
    E/MainActivity: doOnEach: onNext:E
    E/MainActivity: call: doOnNext
    E/MainActivity: subscribe->call: onNext:E
    E/MainActivity: doOnEach: onCompleted
    E/MainActivity: call: doOnTerminate
    E/MainActivity: subscribe->call: onCompleted
    E/MainActivity: call: doOnUnsubscribe
    E/MainActivity: call: doAfterTerminate
    

    通过Log可以看到一个事件的订阅过程。
    首先回调Subscriber.onStart()表明ObservableSubscriber已经建立了连接,但是这个时候事件还没有开始发射。
    然后是 doOnRequest()doOnSubscribe(),这个时候事件也没有开始发射。
    接着是doOnEach()doOnNext()onNext(),这个时候事件正式发射了。
    最后发射完了,就是onCompleteddoOnTerminate(),然后取消订阅释放资源doOnUnsubscribe()

    Rx全家桶

    欢迎加入Rx全家桶豪华套餐,只有你想不到的,没有它做不到的!

    Rx全家桶

    小结

    总的来说,使用RxJava之后,可以让我们的代码更加清晰一目了然,而且线程的切换也非常的简单,不用自己维护相关线程和写那该死的Runnable,内部强大的事件转换和筛选过滤也是为我们开发省去了不少的工作。

    参考文档

    1、给 Android 开发者的 RxJava 详解
    2、RxJava wiki
    3、RxJava使用场景小结
    4、Demo下载

    ---- Edit By Joe ----

    相关文章

      网友评论

      • Wings6:嘛 还好不是Android从入门到放弃
      • Dimon喰:for (Subscription s : subscriptions) {
        if (!s.isUnsubscribed()) {
        s.unsubscribe();
        Log.e(TAG, "onDestroy: 取消订阅!");
        }
        }

        是否可以优化成使用CompositeSubscription;
        只需要mSubscriptions = new CompositeSubscription();
        mSubscriptions.add(subscription);
        ...//在创建的时候添加进去
        mSubscriptions.clear();//在onDestroy等方法里调用就行了

        作者的很干货~学到了东西~只是些许建议,加油↖(^ω^)↗
        lovejjfg: @Dimon喰 @Dimon 后面就是这么写的哈 之前没有留意到相关的API 谢谢提醒 😁
      • jkyeo:Observable.from(List<E>)...
      • Bigmercu:挺全面
      • Clendy:看标题我就猜到作者是受《Java从入门到放弃》的启发~
      • 简单ken: :sweat: 未见放弃
      • 王亟亟:写的很好
        lovejjfg:@王亟亟 三口!
      • e7ae8e11ab7e:说好的放弃呢?
        lovejjfg:@一种习惯 :grin:
      • e9a44a336dfe:我是进来调皮的。
      • 王默默:关注你是怎么放弃,才进来的,作者调皮了
        王默默:@lovejjfg :smile::smile::smile:
        lovejjfg:@王默默 让你失望了 :wink:
      • 捡淑:马克
      • oden:放弃?为什么名称是入门到放弃?
        lovejjfg:@oden 是不是心里默默骂了一句 标题狗 :grin:

      本文标题:RxJava 从入门到放弃

      本文链接:https://www.haomeiwen.com/subject/fspbdttx.html