美文网首页
RxJava2学习笔记1

RxJava2学习笔记1

作者: A_si | 来源:发表于2017-07-08 11:08 被阅读38次

    RxJava用了一年多,一直cv,没仔细的去学习过。拖到rxjava2都出来了,所以今年的遗愿清单里加上学习rxjava2,半年过去了,做下笔记。
    新建了一个javalib的module,添加rxjava的依赖。

        compile 'io.reactivex.rxjava2:rxjava:2.1.0'
        compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    

    写一个Observable,然后subscribe,发现多了很多的订阅者。


    image.png

    先试简单的,

            Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                     System.out.println("subscribe");             
                     e.onNext(1);
               
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    System.out.println("accept="+integer);
                }
            });
    
    image.png

    和rxjava1如出一辙。

    订阅者只有一个方法,那如果发射异常呢?

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                System.out.println("发射1");
                e.onNext(1);
                System.out.println("发射异常");
                e.onError(new Throwable("我是异常"));
                System.out.println("发射异常后");
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                System.out.println("accept="+integer);
            }
        });
    

    结果就报异常了。accept方法并没有走。so,如果只关心onNext,可以这样写,但是,我们实际工作种要考虑异常情况。那么

     Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    System.out.println("发射1");
                    e.onNext(1);
                    System.out.println("发射异常");
                    e.onError(new Throwable("我是异常"));
                    System.out.println("发射异常后");
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    System.out.println("onSubscribe");
                }
    
                @Override
                public void onNext(@NonNull Integer integer) {
                    System.out.println("onNext" + integer);
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    System.out.println("onError");
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            });
    ---------------------------------------------
    onSubscribe
    发射1
    onNext1
    发射异常
    onError
    发射异常后
    

    这些生产者和消费者都是再主线程的,如果在Android里生产数据超时,就引起ANR,那么试一下线程调度:

            Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    System.out.println("shot1==thread=="+Thread.currentThread().getName());
                    e.onNext(1);
    
                    System.out.println("shot==thread=="+Thread.currentThread().getName());
                    e.onError(new Throwable("shoterror"));
                    System.out.println("aftershoterror==thread=="+Thread.currentThread().getName());
                }
            }).subscribeOn(Schedulers.io()).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    System.out.println("onSubscribe==thread=="+Thread.currentThread().getName());
                }
    
                @Override
                public void onNext(@NonNull Integer integer) {
                    System.out.println("onNext" + integer+"==thread=="+Thread.currentThread().getName());
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    System.out.println("onError==thread=="+Thread.currentThread().getName());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete"+Thread.currentThread().getName());
                }
            });
    
            System.out.println("aaaaaaaaaaa==thread==" +Thread.currentThread().getName());
    
    
    image.png

    .subscribeOn(Schedulers.io())后,生产消费都在io线程。

      @Override
                public void onSubscribe(@NonNull Disposable d) {
                    System.out.println("onSubscribe==thread=="+Thread.currentThread().getName());
                }
    

    这个方法是在main线程,因为这个是在订阅前的线程,就是 Observable.create的时候的线程。

     Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    System.out.println("shot1==thread==" + Thread.currentThread().getName());
                    e.onNext(1);
    
                    System.out.println("shot==thread==" + Thread.currentThread().getName());
                    e.onError(new Throwable("shoterror"));
                    System.out.println("aftershoterror==thread==" + Thread.currentThread().getName());
                }
            }).subscribeOn(Schedulers.io())
                    .subscribeOn(Schedulers.io()).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    System.out.println("onSubscribe==thread==" + Thread.currentThread().getName());
                }
    
                @Override
                public void onNext(@NonNull Integer integer) {
                    System.out.println("onNext" + integer + "==thread==" + Thread.currentThread().getName());
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    System.out.println("onError==thread==" + Thread.currentThread().getName());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete" + Thread.currentThread().getName());
                }
            });
    
            System.out.println("aaaaaaaaaaa==thread==" + Thread.currentThread().getName());
    

    如果我.subscribeOn(Schedulers.io())变换2次线程,结果还是第一次的线程。

    image.png

    )

       Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    System.out.println("发射1==thread==" + Thread.currentThread().getName());
                    e.onNext(1);
                    System.out.println("发射2==thread==" + Thread.currentThread().getName());
                    e.onNext(2);
                    System.out.println("发射error==thread==" + Thread.currentThread().getName());
    //                e.onError(new Throwable("shoterror"));
                    System.out.println("发射error后==thread==" + Thread.currentThread().getName());
                }
            }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
                            System.out.println("onSubscribe==thread==" + Thread.currentThread().getName());
                        }
    
                        @Override
                        public void onNext(@NonNull Integer integer) {
                            System.out.println("onNext" + integer + "==thread==" + Thread.currentThread().getName());
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            System.out.println("onError==thread==" + Thread.currentThread().getName());
                        }
    
                        @Override
                        public void onComplete() {
                            System.out.println("onComplete" + Thread.currentThread().getName());
                        }
                    });
    
    

    .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()),subscribeOn是生产发射的线程,observeOn决定的是接收消费的线程。

    image.png

    如果生产2个数据,打印出来是2个都发射后才消费。顺序是这样的吗?验证:

       Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    for (int i = 0; i <1000 ; i++) {
                        System.out.println(i+"要发射==thread==" + Thread.currentThread().getName());
                        e.onNext(i);
    
                    }
                }
            })
    
    image.png

    确实如此,全部数据发射完后才接受,那么去掉线程调度呢

    image.png

    生产一个消费一个。

    相关文章

      网友评论

          本文标题:RxJava2学习笔记1

          本文链接:https://www.haomeiwen.com/subject/jrnlhxtx.html