美文网首页
适合的才是最好的-RxJava篇

适合的才是最好的-RxJava篇

作者: 海重山青 | 来源:发表于2018-01-24 17:12 被阅读0次

    原创博客地址
    对于程序猿来说,Demo是最好的起手
    而对于RxJava来说,你可以简单理解成:

    • 是一个观察者模式框架
    • 替代AsyncTask成为更好的异步操作工具
    • 即便逻辑再复杂,对于RxJava来说就是:简洁

    首先上Demo

    public static void main(String[] args) {
        // 0.准备一些数据
        Integer[] numbers = { 1, 2, 3, 4 };
        List<Integer> lists = Arrays.asList(numbers);
        
        // 1.创建一个被观察者
        //      被观察者很明显从List集合获取数据,现在就等着有人来订阅~
        Observable<Integer> observable = Observable.from(lists);
    
        // 2.创建一个观察者
        //      SubScriber是Observer的实现类,所以也是一个观察者
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onNext(Integer data) {
                // 被观察者发送的数据都会送到这里
                System.out.println("Rx -- onNext:" + data);
            }
    
            @Override
            public void onCompleted() {
                // 被观察者发送完数据会调用该方法
                System.out.println("Rx -- Complete!");
            }
    
            @Override
            public void onError(Throwable e) {
                // 被观察者传输数据中发生异常会调用该方法
                System.out.println("Rx -- Error!");
            }
        };
    
        // 3.订阅
        //      正常来说应该是:observer.subscribe(observable); 看起来更合乎逻辑
        //      这样反而像是:被观察者 订阅了 观察者(报纸 订阅了 读者)
        //      这涉及到流式编程,姑且先这样记住吧
        observable.subscribe(observer);
    }
    

    运行结果:

    运行结果.png
    • 在观察者订阅的顺间,被观察者就发送数据过来了
    • 数据发送过来调用的方法:onNext()
    • 数据发送完成调用的方法:onCompleted()
    • 数据发送期间出现异常调用的方法:onError()

    不要看代码多了,但逻辑很简洁!只有逻辑上的简洁才是真正的简洁!

    上面的Demo看完一遍,大概知道有什么样的角色在扮演。

    现在分析下每个角色:

    观察者

    作用:接收数据并进行处理

    观察者毫无疑问就是Observer,但它是接口。在实际操作中,一般都使用它的抽象实现类Subscriber。两者使用方式完全一样。

    public abstract class Subscriber<T> implements Observer<T>, Subscription
    

    现在来看看观察者常用的创建方式

    第一种:new Observer()接口

    Observer<Integer> observer = new Observer<Integer>(){
        @Override
        public void onCompleted() {
        }
    
        @Override
        public void onError(Throwable e) {
        }
    
        @Override
        public void onNext(Integer i) {
        }
    };
    

    第二种:new Subscriber()抽象类

    Subscriber<Integer> subscriber = new Subscriber<Integer>(){
        @Override
        public void onCompleted() {
        }
    
        @Override
        public void onError(Throwable e) {
        }
    
        @Override
        public void onNext(Integer i) {
        }
    };
    

    Subscriber中有一个方法:

    /**
     * This method is invoked when the Subscriber and Observable have been connected but the Observable has
     * not yet begun to emit items or send notifications to the Subscriber. Override this method to add any
     * useful initialization to your subscription, for instance to initiate backpressure.
     */
    public void onStart() {
        // do nothing by default
    }
    
    • 很明显是留给调用者自己重写
    • 英文好的可以自己看注释
    • 这里大致说下意思:这个方法是在观察者和被观察者已连接,但是被观察者还没有向观察者发送数据时进行调用
    • 所以,这个方法就是用来做初始化用的。

    除此之外,Subscriber实现的Subscription接口还有两个方法:

    public interface Subscription {
        void unsubscribe(); // 取消订阅
        boolean isUnsubscribed(); // 是否已经取消订阅
    }
    
    • 取消订阅后,观察者将不会再接收事件
    • 取消之前先判断一下isUnsubscribed()
    • 如果程序中没有调用取消订阅方法,被观察者会始终持有观察者引用。造成内存泄漏

    被观察者

    作用:作为数据的发送方,它决定什么时候发送,怎么发送

    被观察者ObservableJava里也有。很多地方都喜欢用这个单词作为被观察者,这也是它的直译。但是就因为都一样,所以小心不要导错包了。

    现在来看看被观察者常用的创建方式

    第一种:Observable.create()

    Observable observable = Observable.create(new Observable.OnSubscribe<Integer>(){
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            
        }
    });
    
    • create()方法接收一个OnSubscribe接口参数
    • OnSubscribeObservable的内部接口
    public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // cover for generics insanity
    }
    
    • 根据接口名,顾名思义。当观察者被订阅的时候,会调用这个call()方法
    • 下面举个小例子:
    public static void main(String[] args) {
        // 观察者
        Observer<Integer> observer = new Observer<Integer>(){
            @Override
            public void onCompleted() {
                System.out.println("接收数据结束");
            }
            @Override
            public void onError(Throwable e) {
                
            }
            @Override
            public void onNext(Integer t) {
                System.out.println("接收数据:" + t);
            }
        };
        // 被观察者
        Observable observable = Observable.create(new Observable.OnSubscribe<Integer>(){
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onCompleted();
            }
        });
        
        // 订阅
        observable.subscribe(observer);
    }
    

    运行结果:

    运行结果2.png

    注意:

    这个方法已经被废弃了,推荐使用SyncOnSubscribeAsyncOnSubscribe

    看名字应该知道是什么意思

    create方法废弃.png

    第二种:Observable.from()

    Integer[] nums = {1, 2, 3};
    Observable observable = Observable.from(nums);
    
    • 从一个数组Iterable中依次发送数据元素

    第三种:Observable.just()

    Observable observable = Observable.just(1, 2, 3);
    
    • 这个更直接。将参数依次发送过来。

    订阅

    observable.subscribe(observer);
    

    其内部实现:

    subscribe.png
    • subscriber.onStart()就是观察者中内置的用于初始化的方法
    • 被观察者.call(subscriber)就是
    observable.call.png
    • 最后把观察者当成订阅者返回。前面说过
    public abstract class Subscriber<T> implements Observer<T>, Subscription
    
    • 所以,你可以:
    // 订阅
    Subscription subscription = observable.subscribe(observer);
    // 取消订阅
    subscription.unsubscribe();
    
    • 形成链式编程

    关于Action

    前面在被观察者的第一种创建方式Observable.create()中,接收的参数是OnSubscribe接口。它继承了Action1

    public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // cover for generics insanity
    }
    
    • 被观察者订阅时,OnSubscribecall()方法才会被调用
    • 这个call()就是Action1
    public interface Action1<T> extends Action {
        void call(T t);
    }
    

    至于这个Action,你可以理解为就是一次单纯的行为,一个单纯的回调

    有很多的Actionx

    public interface Action0 extends Action {
        void call();
    }
    
    public interface Action1<T> extends Action {
        void call(T t);
    }
    
    public interface Action2<T1, T2> extends Action {
        void call(T1 t1, T2 t2);
    }
    
    public interface Action3<T1, T2, T3> extends Action {
        void call(T1 t1, T2 t2, T3 t3);
    }
    
    • 0就代表call()方法没有参数
    • 1就代表call()方法有1个参数
    • 2就代表call()方法有2个参数
    • 至于ActionN接口
    public interface ActionN extends Action {
        void call(Object... args);
    }
    

    Observable.subscribe(..)的时候,里面除了ObserverSubscriber这两个观察者之外。还可以接受一个Action

    Action1<Integer> action1 = new Action1<Integer>() {
        @Override
        public void call(Integer num) {
            System.out.println("接收到数据:" + num);
        }
    };
    observable.subscribe(action1);
    

    常用方法

    map

    People[] peoples = new People[]{
            new People("张三", 18, new String[]{"睡觉", "吃饭", "打豆豆"}),
            new People("李四", 19, new String[]{"编程", "泡妞", "LOL"})
    };
    // 观察者
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String name) {
            System.out.println("接收信息:" + name);
        }
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
        }
    };
    // 被观察者
    Observable.from(peoples).map(new Func1<People, String>() {
            @Override
            public String call(People people) {
                return people.getName();
            }
        }).subscribe(subscriber);
    
    • 可以看到被观察者从People数组里读取每一个元素
    • map方法里找到每一个元素对象的name传递给观察者
    • 观察者接收并使用
    • 这里转换范围很大,不仅仅只是提取属性。

    运行结果

    运行结果4.png

    flatMap

    People[] peoples = new People[]{
            new People("张三", 18, new String[]{"睡觉", "吃饭", "打豆豆"}),
            new People("李四", 19, new String[]{"编程", "泡妞", "LOL"})
    };
    // 观察者
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String hobby) {
            System.out.println("接收信息:" + hobby);
        }
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
        }
    };
    // 被观察者
    Observable.from(peoples).flatMap(new Func1<People, Observable<String>>() {
            @Override
            public Observable<String> call(People people) {
                return Observable.from(people.getHobby());
            }
        }).subscribe(subscriber);
    
    • 效果和map是类似的
    • 区别在于map是用于一对一,而flatMap是用于一对多
    • 被观察者从People数组读取每一个对象,call()里读取每一个对象的hobby属性,并依次返回其中的一个元素

    运行结果

    运行结果3.png

    filter

    People[] peoples = new People[]{
            new People("张三", 18, new String[]{"睡觉", "吃饭", "打豆豆"}),
            new People("李四", 19, new String[]{"编程", "泡妞", "LOL"})
    };
    // 观察者
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String name) {
            System.out.println("接收信息:" + name);
        }
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
        }
    };
    // 被观察者
    Observable.from(peoples).filter(new Func1<People, Boolean>() {
    
        @Override
        public Boolean call(People t) {
            return t.getAge() > 18;
        }
    }).map(new Func1<People, String>() {
        @Override
        public String call(People people) {
            return people.getName();
        }
    }).subscribe(subscriber);
    

    运行结果

    运行结果5.png

    线程

    RxJava遵循的线程原则在那个线程订阅,则被观察者和观察者的操作都在该线程

    通过Schedulers切换线程

    • Schedulers.immediate()默认值。在当前线程运行。
    • AndroidSchedulers.mainThread():在Android主线程运行。
      • 注意:这个是RxAndroid里的。必须要导入RxAndroidjar包。RxJava里是没有的。
    • Schedulers.newThread()总是开启新线程运行。
    • Schedulers.io():如果操作涉及到I/O使用该项。
      • 也是总是开启新线程运行
      • 内部有线程池和复用
    • Schedulers.computation():如果操作涉及到图形计算等使用该项。

    还是之前例子,但是增加两行代码:

    People[] peoples = new People[]{
            new People("张三", 18, new String[]{"睡觉", "吃饭", "打豆豆"}),
            new People("李四", 19, new String[]{"编程", "泡妞", "LOL"})
    };
    // 观察者
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String name) {
            System.out.println("接收信息:" + name);
        }
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
        }
    };
    // 被观察者
    Observable.from(peoples).filter(new Func1<People, Boolean>() {
    
        @Override
        public Boolean call(People t) {
            return t.getAge() > 18;
        }
    }).map(new Func1<People, String>() {
        @Override
        public String call(People people) {
            return people.getName();
        }
    }).subscribeOn(Schedulers.immediate()) // 当前线程
    .observeOn(Schedulers.io()) // io线程
    .subscribe(subscriber);
    
    • 被观察者在新开起的IO线程读取/过滤/转换操作
    • 数据传给观察者
    • 观察者当前线程显示数据

    运行结果

    运行结果5.png

    总结

    • RxJava确实是一个非常强大的流式编程工具
    • 再复杂的逻辑,RxJava都能很简洁的表示
    • 一句代码完成线程切换,很方便
    • 用多了才知道它的美~

    相关文章

      网友评论

          本文标题:适合的才是最好的-RxJava篇

          本文链接:https://www.haomeiwen.com/subject/iswuaxtx.html