美文网首页我爱编程
RxJava 基础实现

RxJava 基础实现

作者: 一只笔 | 来源:发表于2018-08-04 21:05 被阅读0次

    1. 定义

    RxJava 是一个 基于事件流、实现异步操作的库

    2. 作用

    实现异步操作
    类似于 Android中的 AsyncTaskHandler作用

    3. 特点

    • 逻辑简洁
    • 实现优雅
    • 使用简单
      更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁 & 优雅
    image.png

    4. 实现步骤

    1. 创建事件 Observable.create
    2. 创建观察者 Observer
    3. 订阅observable.subsribe
    public void text1() {
           //3创建事件
           Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
               @Override
               public void subscribe(ObservableEmitter<String> e) throws Exception {
                   LogUtil.f().i(MainActivity.TAG, "subscribe");
                   e.onNext("132");
                   e.onNext("456");
                   e.onComplete();
                   //注意onComplete() 与 onError() 同时只能调用一个
    //                e.onError(new Throwable());
               }
           });
           
           //2创建观察者
           Observer<String> observer = new Observer<String>() {
               Disposable disposable;
               @Override
               public void onSubscribe(Disposable d) {
                   disposable=d;
                   LogUtil.f().i(MainActivity.TAG, "onSubscribe");
               }
               
               @Override
               public void onNext(String s) {
                   LogUtil.f().i(MainActivity.TAG, "onNext:" + s);
                   //可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接
    //                disposable.dispose();
               }
               
               
               @Override
               public void onError(Throwable e) {
                   LogUtil.f().i(MainActivity.TAG, "onError");
               }
               
               @Override
               public void onComplete() {
                   LogUtil.f().i(MainActivity.TAG, "onComplete");
               }
           };
           //3订阅
           observable.subscribe(observer);
           
       }
    

    输出的结


    image.png

    切断观察者

    //可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接

       @Override
                public void onNext(String s) {
                    LogUtil.f().i(MainActivity.TAG, "onNext:" + s);
                    //可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接
                    disposable.dispose();
                }
    

    输出的结


    image.png

    切断后,后的方法就不会再执行

    额外说明

    观察者 Observer的subscribe()具备多个重载的方法

        public final Disposable subscribe() {}
        // 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
    
        public final Disposable subscribe(Consumer<? super T> onNext) {}
        // 表示观察者只对被观察者发送的Next事件作出响应
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
        // 表示观察者只对被观察者发送的Next事件 & Error事件作出响应
    
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
        // 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应
    
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
        // 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出响应
    
        public final void subscribe(Observer<? super T> observer) {}
        // 表示观察者对被观察者发送的任何事件都作出响应
    

    切换线程

         //切换线程
            observable = observable
                    .subscribeOn(Schedulers.io())//运行子线程
                    .observeOn(AndroidSchedulers.mainThread());//回调在android 主线程
    

    运行结果:

    image.png
    以上可以看出,运行是在子线程,回调是在android 主线程.

    优雅的实现

          Observable.create(new ObservableOnSubscribe<String>() {
                 @Override
                 public void subscribe(ObservableEmitter<String> e) throws Exception {
                     Log.i(MainActivity.TAG, "subscribe" + "线程:" + Thread.currentThread().getName());
                     e.onNext("132");
                     e.onNext("456");
                     e.onComplete();
                 }
             })
                     .subscribeOn(Schedulers.io())
                     .observeOn(AndroidSchedulers.mainThread())
                     .subscribe(new Observer<String>() {
                         @Override
                         public void onSubscribe(Disposable d) {
                             Log.i(MainActivity.TAG, "onSubscribe" + "线程:" + Thread.currentThread().getName());
                         }
        
                         @Override
                         public void onNext(String s) {
                             Log.i(MainActivity.TAG, "onNext" + "线程:" + Thread.currentThread().getName());
                         }
        
                         @Override
                         public void onError(Throwable e) {
                             Log.i(MainActivity.TAG, "onError" + "线程:" + Thread.currentThread().getName());
                         }
        
                         @Override
                         public void onComplete() {
                             Log.i(MainActivity.TAG, "onComplete" + "线程:" + Thread.currentThread().getName());
                         }
                     });
        }
    

    运行结果:

    image.png

    操作符

    Map

    map是RxJava中最简单的一个变换操作符了, 它的作用就是对上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化. 用事件图表示如下:

           //Integer
            Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onComplete();
                }
                //Strig
            }).map(new Function<Integer, String>() {
                @Override
                public String apply(Integer integer) throws Exception {
                    return String.format("The %s time", integer);
                }
                //String 叠加
            }).map(new Function<String, String>() {
                @Override
                public String apply(String s) throws Exception {
                    return String.format("%s , He is second time", s);
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println(s);
                }
            });
    

    在上游我们发送的是数字类型, 而在下游我们接收的是String类型, 中间起转换作用的就是map操作符, 运行结果为:

    The 1 time , He is second time
    The 2 time , He is second time
    
    

    FlatMap

    flatMap是一个非常强大的操作符, 先用一个比较难懂的概念说明一下:

    FlatMap将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里.

      Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onComplete();
                }
            }).flatMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    List<String> list = new ArrayList();
                    list.add("He is "+integer);
                    list.add("He is "+integer);
                    list.add("He is "+integer);
    
                    return Observable.fromIterable(list).delay(10,TimeUnit.MICROSECONDS);
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String str) throws Exception {
                    System.out.println(str);
                }
            });
    

    运行结果:

    He is 1
    He is 1
    He is 1
    He is 2
    He is 2
    He is 2
    

    举个例子:注册成功后调登录

    api.register(new RegisterRequest())            //发起注册请求
                    .subscribeOn(Schedulers.io())               //在IO线程进行网络请求
                    .observeOn(AndroidSchedulers.mainThread())  //回到主线程去处理请求注册结果
                    .doOnNext(new Consumer<RegisterResponse>() {
                        @Override
                        public void accept(RegisterResponse registerResponse) throws Exception {
                            //先根据注册的响应结果去做一些操作
                        }
                    })
                    .observeOn(Schedulers.io())                 //回到IO线程去发起登录请求
                    .flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
                        @Override
                        public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
                            return api.login(new LoginRequest());
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())  //回到主线程去处理请求登录的结果
                    .subscribe(new Consumer<LoginResponse>() {
                        @Override
                        public void accept(LoginResponse loginResponse) throws Exception {
                            Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Toast.makeText(MainActivity.this, "登录失败", Toast.LENGTH_SHORT).show();
                        }
                    });
    
     public void login() {
                renoteRegister().doOnNext(new Consumer<String>() {//开始注册
                    @Override
                    public void accept(String s) throws Exception {//注册成功回调
                        System.out.println(s);
                        System.out.println(Thread.currentThread().getName());
                    }
    
                }).flatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(String s) throws Exception {
                        System.out.println(s + "--开始登录");
                        System.out.println(Thread.currentThread().getName());
                        return renoteLogin();//调用登录
                    }
                }).subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {//登录回调
                        System.out.println(s);
                        System.out.println(Thread.currentThread().getName());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        System.out.println(throwable.getMessage());
                        System.out.println(Thread.currentThread().getName());
                    }
                });
            }
    
            //远程登录
            public Observable<String> renoteLogin() {
                return Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                        emitter.onNext("登录成功");
                        emitter.onComplete();
                    }
                });
            }
    
            //远程注册
            public Observable<String> renoteRegister() {
                return Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                        emitter.onNext("注册成功");
                        emitter.onComplete();
                    }
                });
            }
    

    本文源码下载地址
    参考原文

    参考原文

    相关文章

      网友评论

        本文标题:RxJava 基础实现

        本文链接:https://www.haomeiwen.com/subject/hdzmvftx.html