美文网首页
RxJava2的学习

RxJava2的学习

作者: 简单Liml | 来源:发表于2017-08-18 17:46 被阅读31次

    参考链接:
    http://www.jianshu.com/p/5e93c9101dc5
    http://blog.csdn.net/qq_35064774/article/details/53045298
    http://www.jianshu.com/p/240f1c8ebf9d
    http://www.jianshu.com/p/464fa025229e
    http://blog.csdn.net/qq_35064774/article/details/53057359

    RxJava是一种响应式的编程。最近自己才学习下,感觉自己已经落后好多,下面是自己的简单总结。这边是对RxJava的使用介绍,对其编程思想上我这边较少介绍。因为自己对RxJava的学习较晚,对1版本,大家可以到别的博客上获取,这边主要是对2版本的介绍。

    我们来大致分析下它的流程。主要分两块,上游发送数据,在下游获取数据,中间可以对数据进行操作,使用观察者模式。

    我们直接上代码:

    Observable<String> sender = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    System.out.println("sendOnNext1");
                    e.onNext("1");
                    System.out.println("sendOnNext2");
                    e.onNext("2");
                    System.out.println("sendOnComplete");
                    e.onComplete();
                }
            });
    
            Observer<String> receiver = new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("onSubscribe");
                }
    
                @Override
                public void onNext(String value) {
                    System.out.println("onNext" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("onError");
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            };
    
            sender.subscribe(receiver);
    

    输出:


    image.png

    Observable对象其实就是我们所说的“被观察者”,Observer对象为“观察者”,我们重写subscribe方法,通过onNext方法发送数据,在下游Observer的onNext方法中获取数据。其余方法如onSubscribe,onError,onComplete方法的调用流程见输出。

    这个最简单的一个流程演示了。

    在2.x中,Action1被重命名为Consumer,我们在使用Observable的时候,如不需要获取其余的流程状态,下游可以用Consumer替换,由此,代码可以如下:

    Observable<String> sender = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    System.out.println("sendOnNext1");
                    e.onNext("1");
                    System.out.println("sendOnNext2");
                    e.onNext("2");
                    System.out.println("sendOnComplete");
                    e.onComplete();
                }
            });
    
            Consumer<String> receiver = new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println("accept"+s);
                }
            };
    
            sender.subscribe(receiver);
    

    输出:

    image.png

    在到2.x时,出现一个Flowable类,它与Observable不同的是,它支持背压,但这个不意味着MissingBackpressureException不会出现。Observable完全不支持背压。
    代码如下:

    Flowable<String> sender = Flowable.create(new FlowableOnSubscribe<String>() {
                @Override
                public void subscribe(FlowableEmitter<String> e) throws Exception {
                    System.out.println("sendOnNext1");
                    e.onNext("1");
                    System.out.println("sendOnNext2");
                    e.onNext("2");
                    System.out.println("sendOnComplete");
                    e.onComplete();
                }
            }, BackpressureStrategy.BUFFER);
    
            Subscriber<String> receiver = new Subscriber<String>() {
                @Override
                public void onSubscribe(Subscription s) {
                    System.out.println("onSubscribe");
                    s.request(Long.MAX_VALUE); // 接收参数的数量
                }
    
                @Override
                public void onNext(String s) {
                    System.out.println("onNext" + s);
                }
    
                @Override
                public void onError(Throwable t) {
                    System.out.println("onError");
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            };
    
    
            sender.subscribe(receiver);
    

    这里Flowable为“被观察者”,Subscriber为“观察者”。
    输出结果:

    image.png

    看起来与第一个代码十分相似。

    我们在发送数据数量上如果有一定限制的话,在2.x中还有几个扩展类:Completable,Single,Maybe。
    首先看Completable,代码如下:

    Completable sender = Completable.create(new CompletableOnSubscribe() {
                @Override
                public void subscribe(CompletableEmitter e) throws Exception {
                    System.out.println("subscribe");
                }
            });
    
            CompletableObserver receiver = new CompletableObserver() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("onSubscribe");
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("onError");
                }
            };
    
            sender.subscribe(receiver);
    

    输出如下:

    image.png

    上游不发送数据。

    在看Single:

    Single<String> sender = Single.create(new SingleOnSubscribe<String>() {
                @Override
                public void subscribe(SingleEmitter<String> e) throws Exception {
                    System.out.println("subscribe");
                    e.onSuccess("1111");
                }
            });
    
            SingleObserver receiver = new SingleObserver() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("onSubscribe");
                }
    
                @Override
                public void onSuccess(Object value) {
                    System.out.println("onSuccess" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("onError");
                }
            };
    
            sender.subscribe(receiver);
    

    输出如下:

    image.png

    上游发送一条onSuccess数据。

    最后看下Maybe:

    Maybe sender = Maybe.create(new MaybeOnSubscribe() {
                @Override
                public void subscribe(MaybeEmitter e) throws Exception {
                    System.out.println("subscribe");
                    e.onSuccess("1111");
                }
            });
    
            MaybeObserver receiver = new MaybeObserver() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("onSubscribe");
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("onError");
                }
    
                @Override
                public void onSuccess(Object value) {
                    System.out.println("onSuccess"+value);
                }
            };
    
            sender.subscribe(receiver);
    

    输出如下:

    image.png

    上游可发送一条或0条数据。

    通过上面的学习,我们可以看到上游与下游的工作流程。有时我们需要对上游到下游的数据进行中间操作,这边我们来演示下RxJava的链式代码:

    Observable.create(new ObservableOnSubscribe<List<User>>() {
                @Override
                public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
                    List<User> users = getUsers();
                    e.onNext(users);
                }
            }).flatMap(new Function<List<User>, ObservableSource<User>>() {
                @Override
                public ObservableSource<User> apply(List<User> users) throws Exception {
                    return Observable.fromIterable(users);
                }
            }).filter(new Predicate<User>() {
                @Override
                public boolean test(User user) throws Exception {
                    return user.getId().equals("2") || user.getId().equals("5");
                }
            }).subscribe(new Consumer<User>() {
                @Override
                public void accept(User user) throws Exception {
                    System.out.println(user.getName());
                }
            });
    
    public static List<User> getUsers(){
            List<User> users = new ArrayList<>();
            users.add(new User("user1","1"));
            users.add(new User("user2","2"));
            users.add(new User("user3","3"));
            users.add(new User("user4","4"));
            users.add(new User("user5","5"));
            return  users;
        }
    

    代码输出:

    image.png

    是不是看起来很酷炫。关于代码flatMap,filter等等方法大家可以去网上收集。

    同样,我们也可以试着用Flowable方法,同时用另一种发送数据源方式,代码如下:

    Flowable.just(1, 2, 3)
                    .take(2) //取前两个数据
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println("保存:" + integer);
                        }
                    })//中间操作
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println(integer);
                        }
                    });
    

    输出如下:


    image.png

    just或take方法同上,可以去网上查询使用方法。

    到此,是我简单的对RxJava的代码的理解,如有不当之处,还请大家多多指教。

    相关文章

      网友评论

          本文标题:RxJava2的学习

          本文链接:https://www.haomeiwen.com/subject/pfvarxtx.html