RxJava

作者: 资本家大恶人 | 来源:发表于2020-05-22 23:17 被阅读0次

1.观察者模式

  • 定义:
    定义对象间一对多的依赖依赖关系,当一个对象改变状态时,所有依赖它的对象会收到通知并自动更新。
  • 理解:
    其实就是发布订阅模式,发布者发布信息,订阅者获取信息,订阅了就能收到信息,没订阅就收不到信息。属于行为型设计模式的一种。
    使用场景:
    有一个微信公众号服务,不定时发布一些消息,关注公众号就可以收到推送消息,取消关注就收不到推送消息。其中,客户是观察者,公众号是被观察者。

2.什么是RxJava(ReactiveX.io链式编程)

  • 定义:一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
    总结:RxJava 是一个 基于事件流、实现异步操作的库
    理解:RXJava是一个响应式编程框架 ,采用观察者设计模式,观察者模式本身的目的就是『后台处理,前台回调』的异步机制

  • 优点:
    由于 RxJava是基于事件流的链式调用,所以使得 RxJava:

逻辑简洁
实现优雅
使用简单

  • 作用:
    实现异步操作

类似于 Android中的 AsyncTask 、Handler作用

3.Rxjava基本概念(观察者模式)

  • 案例:按钮点击处理、广播注册
    通过setOnClickListener()方法,Button持有OnClickListener的引用;当用户击时,Button自动调用OnClickListener的onClick()方法。
    Button——>被观察者
    OnClickListener——>观察者
    setOnClickListener ——>订阅
    onClick ——>事件

4. RxJava 有3个基本概念及原理

1.Observable(被观察者)
2.Observer(观察者)
3.subscribe(订阅)事件。

  • 原理:被观察者(Observable) 通过 订阅(Subscribe)按顺序发送事件 给观察者(Observer), 观察者(Observer) 按顺序接收事件 & 作出对应的响应动作。

  • 普通事件 : onNext() 接收被观察者发送的消息

  • 特殊的事件:
    onCompleted() 事件队列完结
    onError () 事件队列异常

注意:

1)RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。
2)RxJava 规定,onNext() 接收被观察者发送的消息、可以执行多次;当不会再有新的 onNext () 发出时,需要触发 onCompleted () 方法作为标志。onError():事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
3)在一个正确运行的事件序列中, onCompleted() 和 onError () 有且只有一个,并且是事件序列中的最后一个。
4)需要注意的是,onCompleted()和 onError () 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

5.调度器

  • RxJava中调度器设置方法
subscribeOn():或者叫做事件产生的线程。 
指定 subscribe()所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。

observeOn():或者叫做事件消费的线程。指定 Subscriber所运行在的线程。

几种调度器

在RxJava 中Scheduler——调度器,相当于线程控制器,
RxJava 通过它来指定每一段代码应该运行在什么样的线程。
RxJava 已经内置了几个Scheduler,它们已经适合大多数的使用场景:
  1:Schedulers.immediate():直接在当前线程运行,相当于不指定线程。这是默认的Scheduler
  2:Schedulers.newThread():总是启用新线程,并在新线程执行操作。
  3:Schedulers.io():I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler
      行为模式和 newThread()差不多区别在于 io()的内部实现是是用一个无数量上限的线 
      程池可以重用空闲的线程,因此多数情况下 io()比 newThread()更有效率。不要把计算
      工作放在 io()中可以避免创建不必要的线程。
  4:Schedulers.computation():计算所使用的 Scheduler这个计算指的是 CPU密集型计算,
       即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler使用的固定  
       的线程池,大小为 CPU核数。不要把 I/O 操作放在computation()中,否则 I/O 操作
       的等待时间会浪费CPU。
  5.AndroidSchedulers.mainThread():Android 还有一个专用的
        它指定的操作将在 Android主线程运行。有了这几个 Scheduler,就可以使用
        subscribeOn()和 observeOn()两个方法来对线程进行控制了。 

6.依赖库

    //RxJava
    implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
    implementation 'com.squareup.retrofit2:retrofit:2.5.0'//retrofit 库
    implementation 'com.squareup.retrofit2:converter-gson:2.5.0'//转换器,请求结果转换成Model
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'//配合Rxjava 使用
    implementation 'com.google.code.gson:gson:2.6.2'//Gson 库

7.简单使用

    public static void baseRx(){
        //1.创建被观察者
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) {
                emitter.onNext("1111");
                emitter.onNext("2222");
                emitter.onNext("3333");
                emitter.onNext("4444");
                //emitter.onError(new Throwable("abc"));
                //emitter.onComplete();
            }
        });

        //2.创建观察者
        Observer<String> observer = new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {//关闭线程
                Log.e(TAG, "onSubscribe: " );
            }

            @Override
            public void onNext(String s) {
                Log.e(TAG, "onNext: "+ s );
            }

            @Override
            public void onError(Throwable e) {//失败
                Log.e(TAG, "onError: "+e.getMessage() );
            }

            @Override
            public void onComplete() {//成功
                Log.e(TAG, "onComplete: " );
            }
        };
        //3.被观察者订阅观察者
        observable.subscribe(observer);

        //线程切换
        observable
                //被订阅者在子线程中
                .subscribeOn(Schedulers.io())
                //订阅者在主线程中
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(observer);

        //观察中可以重复指定线程
        observable
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())//主
                .observeOn(Schedulers.io())//子
                .observeOn(AndroidSchedulers.mainThread())//主
                .subscribe(observer);
    }

  • 演示链式调用

8.Android功能使用

    private void rxAndroid() {

        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl(MyServer.Url)
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .build();

        MyServer myServer = retrofit.create(MyServer.class);

        Observable<ResponseBody> call = myServer.getDate();

        call.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<ResponseBody>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(ResponseBody responseBody) {

                        try {
                            Log.e(TAG, "onNext: "+responseBody.string() );
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

    private void rxAndroidBean() {

        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl(MyServer.Url)
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .build();

        MyServer myServer = retrofit.create(MyServer.class);

        Observable<Bean> call = myServer.getDate2();

        call.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Bean>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Bean responseBody) {

                        Log.e(TAG, "onNext: "+ responseBody.getRESULT() );
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

9.其他操作符使用( 查看操作符)

  • 创建操作符
    //遍历输出
    public static void rxFrom(){
        Integer[] a = {1,2,3,4,5};
        // Observable.fromArray(1,2,3,4)
        //Observable.fromArray("a","b","c")
        Observable.fromArray(a).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept: "+integer);
            }
        });
    }

    //数组合并输出
    public static void rxJust(){
        Integer[] a = {1,2,3};
        Integer[] b = {9,8,7};
        Observable.just(a,b).subscribe(new Consumer<Integer[]>() {
            @Override
            public void accept(Integer[] integers) throws Exception {
                for (Integer i: integers) {
                    Log.e(TAG, "accept: "+i);
                }
            }
        });
    }

    //范围输出
    public  static  void rxRange(){
        Observable.range(0,20).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept: "+integer );
            }
        });
    }

  //定时器
    public static void rxInterval(){
        Observable.interval(1,1,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.e(TAG, "accept: "+aLong );
            }
        });
    }
    //闪屏
    private void rxjavaInterval() {
        final Long time = 5L;
        subscribe = Observable.interval(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e("TAG", "倒计时:" + aLong);
                        if (aLong < time && !subscribe.isDisposed()) {
                            tv.setText("记录改变生活" + (time - aLong - 1));
                        } else {
                            Intent intent = new Intent(WelcomActivity.this, MainActivity.class);
                            startActivity(intent);
                            finish();
                        }
                    }
                });
    }
    @Override
    protected void onDestroy() {
        super.onDestroy();
        subscribe.dispose();
        subscribe = null;
    }

  • 过滤操作符
    //过滤输出
    public static void rxFilter(){
        Integer[] a = {1,2,3,4,5};
        Observable.fromArray(a).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                if (integer>3){
                    return true;
                }
                return false;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "accept: "+integer );
            }
        });
    }

  • 变换操作符

①Map:通过指定一个Fun函数将Observeble转换成一个新的Observable对象并发射,观察者收到新的observable处理。

    public static void rxMap(){
        Integer[] a = {1,2,3,4,5};
        Observable.fromArray(a).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) {
                return integer+"abc";
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) {
                Log.e(TAG, "accept: "+s );
            }
        });
    }

②FlatMap:将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送

    public static void rxFlatMap(){
        Integer[] a = {1,2,3,4,5};
        Observable.fromArray(a).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                String[] strs = new String[3];
                for (int i =0;i<strs.length;i++){
                    strs[i] = integer +  strs[I];
                }
                return Observable.fromArray(strs);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, "accept: "+s );
            }
        });
    }

注意:flatmap的合并允许交叉,也就是说可能会交错的发送事件,最终结果的顺序可能并不是由原始发送时候的顺序。
③concatMap:concatMap操作符功能与flatmap功能一致,不过他解决了flatmap交叉问题,提供了一种能够把发射的值连续在一起的函数,而并不是合并他们。具体使用和flatmap一致。

  • 组合/合并操作符
    //Observable压缩合并ZIP:合并 多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送
    public static void rxZip(){
        Integer[] a= {1,2,3};
        Integer[] b={4,5,6};
        Observable<Integer> observableA = Observable.fromArray(a);
        Observable<Integer> observableB = Observable.fromArray(b);

        Observable.zip(observableA, observableB, new BiFunction<Integer, Integer, String>() {

            @Override
            public String apply(Integer integer, Integer integer2) throws Exception {
                return integer + ":" + integer2;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e(TAG, "accept: "+s );
            }
        });

    }

    //合并:组合多个被观察者一起发送数据,合并后 按时间线并行执行
    //merge()组合被观察者数量≤4个,而mergeArray()则可>4个
    public static void rxMerge(){
        Integer[] a ={1,2,3};
        String[] b = {"abc","aaa","bbb"};
        char[] c = {'a','b','c'};

        Observable<Integer> A = Observable.fromArray(a);
        Observable<String> B = Observable.fromArray(b);
        Observable<char[]> C = Observable.fromArray(c);

        Observable
                .merge(A,B,C)
                .subscribe(new Consumer<Serializable>() {
                    @Override
                    public void accept(Serializable serializable) throws Exception {
                        Log.e(TAG, "accept: ."+serializable );
                    }
                });
    }

嵌套网络请求
1.请求数据类

//Compose 请求集合接口bean类
//Nest嵌套bean
//Banner ,Article子bean类
{
//        创建retrofit对象
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl(ApiService.baseComposeUrl)
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build();
//        创建接口对象
        final ApiService apiService = retrofit.create(ApiService.class);
//        创建嵌套请求对象
        Observable<Compose> compose = apiService.getCompose();
//        请求嵌套接口类,和嵌套集合bean类
        compose.flatMap(new Function<Compose, ObservableSource<Nest>>() {
            @Override
            public ObservableSource<Nest> apply(Compose compose) throws Exception {
//                获得两个bean类接口
                String articleUrl = compose.getArticleList();
                String bannerUrl = compose.getBannerUrl();
//                创建连个bean连接Api
                Observable<Banner> banner = apiService.getBanner(bannerUrl);
                Observable<Article> article = apiService.getArticle(articleUrl);
//                将【Nest】bean类集合转换为BiFuncation对象
                Observable<Nest> zip = Observable.zip(banner, article, new BiFunction<Banner, Article, Nest>() {
                    @Override
                    public Nest apply(Banner banner, Article article) throws Exception {
//                        返回接收到的集合
                        return new Nest(article, banner);
                    }
                });
                return zip;
            }
        }).observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(new Observer<Nest>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Nest nest) {
//                        Mvp发送数据
                        clickHomeCallback.onOsucces(nest);
                    }

                    @Override
                    public void onError(Throwable e) {
                        clickHomeCallback.onFail("error" + e.getMessage());
                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }
  • 功能性操作符
subscribOn()
observeOn()

10.RxAndroid好处

  • 用途
    是一个实现异步操作的库,具有简洁的链式代码,提供强大的数据变换。

  • 优势
    异步好简单、代码好简洁,一个简单、一个简洁,这就意味着工作效率。

  • 注意
    subscribeOn只能定义一次,除非是在定义doOnSubscribe
    observeOn可以定义多次,决定后续代码所在的线程

11.RxJava:好处

使用Rxjava的好处在于,我们可以方便的切换方法的执行线程,对线程动态切换,该过程无需我们自己手动创建和启动线程。使用Rxjava创建的代码虽然出现在同一个线程中,但是我们可以设置使得不同方法在不同线程中执行。上述功能的实现主要归功于RxJava的Scheduler实现,Scheduler 提供了『后台处理,前台回调』的异步机制。

12. AsyncTask与RxJava的区别

  • RxJava 实现异步操作是通过一种扩展的观察者模式来实现的。
  • 异步、简洁(逻辑、代码读写)。
  • RxJava 内部支持多线程操作
  • AyncTask是采用线程池的形式实现的。
  • 出现错误的处理-rxjava 自身有错误的方法回调,aync无法做到。
  • 并发的请求,rxjava 通过操作符能够完成各种并发情况,而AyncTask不行。

13.拓展

作者:Anwfly
链接:https://www.jianshu.com/p/4e902b4fe89a
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

相关文章

网友评论

      本文标题:RxJava

      本文链接:https://www.haomeiwen.com/subject/jbcpwhtx.html