美文网首页Android-RxJavaAndroid-Rxjava&retrofit&dagger
RxJava——基础学习(四),创建操作符

RxJava——基础学习(四),创建操作符

作者: 英勇青铜5 | 来源:发表于2016-12-01 14:30 被阅读167次

    RxJava——基础学习(四),创建操作符

    学习资料:

    上篇学习记录是4个月前,中间补习了些Java的基础知识,主要就是将Java编程思想大致看了一遍,感觉补习过后,对Rx的学习理解还是很有帮助的。接下来重点就是开始继续学习RxJava,争取在2017年到来之前的一个月里,能够将RxJava基础学习完,并能够为进阶学习做一些准备

    RxJava 2.0正式版已经出来了,在Android Studio中已经可以使用了,但在IntelliJ IDEA中我没有找到,找到的最新版是1.2.3,我不知道怎么可以使用最新版的2.0。不过对学习操作符影响不大,操作符是通过IDEA来敲代码进行学习的,打算将常用的操作符过一遍后,再开始学习了解2.0


    1.宝石图

    每个操作符下都会一个宝石图进行辅助说明,先简单学习了解一下如何查看

    legend宝石图
    名称 作用
    1 TimeLine时间线 从左向右,代表着Observable的事件的时间线,个人理解就是Observable的事件序列
    2 Items数据项 Observable包含的不同类型的数据项
    3 VerticalLine竖直线 Observable成功执行完成的标志
    4 DottedLines虚线
    5 Box盒子
    代表着每个数据项正在根据操作符进行转换操作
    6 X叉 由于某些原因,Observable已经不正常终止,并会给出一个error
    7 ResultLines 结果线 进行操作过后的结果线,个人理解就是Observer,Subscriber处理的事件结果序列
    8 Flip翻转 Box盒子中的文字,操作符的名字

    有的操作符的宝石图并不会像示意宝石图这么直观,需要对操作符自身的含义及用法比较了解后,更容易看懂,需要大量的经验积累


    2.创建操作符

    创建操作符用于创建Observable对象的操作符,貌似2.0中有些改动,但现在还不知道具体做了哪些改动,到了后面2.0的学习时,再进行比较

    操作符的宝石图不再贴出


    2.1 Just

    just将单个数据转换为发射那个数据的Observable数据是原样发出 ,数组与Iterable整个对象则当作单个数据发出

    注意:just(null),会返回一个发射null值的Observable,而不是返回一个空Observable(完全不发射任何数据的Observable),如果要产生空Observable应该使用empty操作符

    简单使用:

     class Rx_1 {
        public static void main(String[] args) {
            Observable
                    .just("Hello world", "英勇青铜5")
                    //.subscribe(System.out::println);
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println(s);
                        }
                    });
                    
            //Iterable List集合
            System.out.print("\n集合List:");
    //        Observable
    //                .just(Arrays.asList(1, 3))
    //                .subscribe(((integers) -> integers.forEach(System.out::println)));
            Observable
                    .just(Arrays.asList(1, 3))
                    .subscribe(new Action1<List<Integer>>() {
                        @Override
                        public void call(List<Integer> integers) {
                            for (int i : integers) {
                                System.out.print(i + ",");
                            }
                        }
                    });
                    
            //数组        
            System.out.print("\n数组:");
            int[] nums = {4, 5, 6};
            Observable.just(nums)
                    .subscribe(new Action1<int[]>() {
                        @Override
                        public void call(int[] ints) {
                            for (int i : ints) {
                                System.out.print(i + ",");
                            }
                        }
                    });
        }
    }
    

    结果:

    Hello world
    英勇青铜5
    
    集合List:1,3,
    数组:4,5,6,
    

    2.2 From

    将其他类型的对象和数据类型转换为Observable,默认不在任何特定的调度器上执行

    Form和Just区别:

    • 参数类型不同,form支持的类型有Iterable,Object[]arrays,Future,而just支持的有泛型T,数组,Iterable
    • just产生的Obsevable是将单个数据作为一个整体发出,包括数组和Iterablefrom产生的Obsevable则会将Iterable,Object[]arrays遍历后,逐个发出

    简单使用:

     class FromDemo {
        public static void main(String[] args) {
            System.out.println("Integer [] arrays:");
            arrays();
            System.out.println("List集合:");
            iterable();
            System.out.println("Future:");
            future();
        }
    
    
        /**
         * Object [] arrays
         */
        private static void arrays() {
            //这里是 Object [] arrays 集合,int [] ints不可以
            Integer[] ints = {1, 2, 3, 4, 5, 6};
            Observable
                    .from(ints)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print(integer + ",");
                        }
                    });
            System.out.println("\n");
        }
    
        /**
         *  Iterable 对象
         */
        private static void iterable() {
            Observable
                    .from(Arrays.asList(6, 5, 4, 3, 2, 1))
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print(integer + ",");
                        }
                    });
            System.out.println("\n");
        }
    
        /**
         *  Future 对象
         */
        private static void future() {
            //线程池对象
            ExecutorService executorService = Executors.newCachedThreadPool();
            //submit,产生 Future 对象
            Future<String> future = executorService.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    double d = Math.PI + (Math.random() * 10);
                    TimeUnit.SECONDS.sleep(3);//模拟耗时操作,可以改为1查看结果
                    return "Future -- > " + d;
                }
            });
            //关闭线程池
            executorService.shutdown();
    
            Observable
                    .from(future, 2, TimeUnit.SECONDS)
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println(s);
                        }
                    }, new Action1<Throwable>() {
                        @Override
                        public void call(Throwable throwable) {
                            System.out.println(throwable.getMessage());
                        }
                    }, new Action0() {
                        @Override
                        public void call() {
                            System.out.println("终止");
                        }
                    });       
        }
    }
    

    结果:

    Integer [] arrays:
    1,2,3,4,5,6,
    
    List集合:
    6,5,4,3,2,1,
    
    Future:
    null
    

    可以将future()方法中,模拟耗时操作改为sleep(1)

    Future:
    Future -- > 13.088281862678471
    终止
    

    2.3 Range系列

    创建一个发射特定整数序列的Observable,默认不在任何特定的调度器上执行

    简单使用:

    public class RangeDemo {
        public static void main(String[] args) {
            Observable
                    .range(2,3)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print(integer + ", ");
                        }
                    });
        }
    }
    

    结果:

    2, 3, 4,
    

    两个参数,第一个是起始值,第二个是整数序列的个数, 并不是结束值


    2.4 Defer推迟

    直到有观察者订阅时才创建Observable,然后使用Observable工厂方法为每个观察者创建一个新的Observable,对每个观察者都是这样,每个订阅者都以为自己订阅的是同一个Observable,但实际每个订阅者获取的是它们自己的单独的数据序列

    defer方法默认不在任何特定的调度器上执行

    简单使用:

    public class DeferDemo {
        public static void main(String[] args) {
            Observable
                    .defer(new Func0<Observable<Integer>>() {
                        @Override
                        public Observable<Integer> call() {
                            return Observable.just(1, 2, 3);
                        }
                    })
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print(integer + ", ");
                        }
                    });
        }
    }
    

    结果:

    1, 2, 3, 
    

    2.5 Timer定时

    创建出一个Obaservable后,在给定的延迟时间时,发送一个0

    默认在computation调度器上执行

    简单使用:

    public class TimerDemo {
        public static void main(String[] args) throws InterruptedException {
            Observable
                    .timer(10, TimeUnit.MILLISECONDS)
                    .subscribe(new Action1<Long>() {
                        @Override
                        public void call(Long l) {
                            System.out.println(l);
                        }
                    });
            //延迟主线程结束 等待结果
            TimeUnit.MILLISECONDS.sleep(20);
        }
    }
    

    运行结果:

    0
    

    由于默认在computation调度器上执行,并不是在main()主线程,所以给main()设置了20毫秒的延迟。如果timer(0,),设置延迟为0,main()就不需要sleep()


    2.6 Interval间隔

    创建一个按固定时间间隔发射从0开始的整数序列的Observable,默认在computation调度器上执行

    在不考虑Scheduler参数的情况下有两个版本:

    //固定间隔 发射从0开始的整数序列
    Observable<Long> interval(long interval, TimeUnit unit)
    
    //有延迟时间,然后根据固定间隔 发射从0开始的整数序列
    Observable<Long> interval(long initialDelay, long period, TimeUnit unit) 
    

    这里去看宝石图比较直观

    简单使用:

    public class IntervalDemo {
        public static void main(String[] args) throws InterruptedException {
            //infinite();//无限
            period();//有延迟,有周期
        }
    
        /**
         * 当前线程,间隔为500毫米,无限发送一个从0开始的整数序列
         */
        private static void infinite() {
            Observable
                    .interval(500, TimeUnit.MILLISECONDS, Schedulers.immediate())
                    .subscribe(new Action1<Long>() {
                        @Override
                        public void call(Long aLong) {
                            System.out.print(aLong + " ,");
                        }
                    });
    
        }
    
        /**
         *当前线程, 延迟1秒后,间隔为2s,发送一个从0开始的整数序列
         */
        private static void period() {
            Observable
                     .interval(1, 2, TimeUnit.SECONDS, Schedulers.immediate())
                    .subscribe(new Action1<Long>() {
                        @Override
                        public void call(Long aLong) {
                            System.out.print(aLong + " ,");
                        }
                    });
        }
    }
    

    2.7 Repeat重复

    创建一个发射特定数据重复多次的Observable,允许重复的发射某个数据序列,也可以限制重复的次数

    Repeat不是创建一个Observable,而是重复发射原始Observable的数据序列,这个序列或者是无限的,或者通过repeat(n)指定重复次数

    默认在trampoline调度器上执行


    repeatWhen

    不是缓存和存放原始Observable的数据序列,而是有条件的重新订阅和发射原来的Observable,默认在trampoline调度器上执行

    将原始Observable的终止通知(完成或错误)当做一个Void数据传递给一个通知处理器,它以此来决定是否要重新订阅和发射原来的Observable。这个通知处理器就像一个Observable操作符,接受一个发射Void通知的Observable为输入,返回一个发射Void数据(意思是,重新订阅和发射原始Observable)或者直接终止(意思是,使用repeatWhen终止发射数据)的Observable

    个人理解:可以根据需求,确定是否重新订阅,可以重复订阅多次

    repeatWhen宝石图

    简单使用:

    public class RepeatDemo {
        public static void main(String[] args) {
            //withOutParameter();
            //withParameter();
            repeatWhen();
        }
    
        /**
         * 无限重复 1,2,3
         */
        private static void withOutParameter() {
            Observable
                    .just(1, 2, 3)
                    .repeat()
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print(integer + " ,");
                        }
                    });
        }
    
        /**
         * 有次数的重复
         */
        private static void withParameter() {
            Observable
                    .just(1, 2, 3)
                    .repeat(2)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print(integer + " ,");
                        }
                    });
        }
        
        /**
         * 每隔一秒订阅一次
         */
        private static void repeatWhen() {
            Observable
                    //.just(1, 2, 3)
                    .range(1, 5)
                    .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
                        @Override
                        public Observable<?> call(Observable<? extends Void> observable) {
     //                     return Observable.timer(1,TimeUnit.SECONDS);
    //                      return Observable.interval(1, TimeUnit.SECONDS);
                            return observable.delay(1, TimeUnit.SECONDS);
                        }
                    })
                    .subscribe(new Subscriber<Integer>() {
                        @Override
                        public void onCompleted() {
                            System.out.println("Completed");
                        }
    
                        @Override
                        public void onError(Throwable throwable) {
                            System.out.println(throwable.getMessage());
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            System.out.print(integer + ", ");
                        }
                    });
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    运行结果:

    1, 2, 3, 1, 2, 3, 1, 2, 3, 
    

    2.8 Create

    使用一个函数从头开始创建一个Observable,默认不在任何特定的调度器上执行

    建议在传递给create方法的函数中检查观察者的isUnsubscribed状态,以便在没有观察者的时候,让Observable停止发射数据或者做昂贵的运算

    简单使用:

    class CreateDemo {
        public static void main(String[] args) {
            Observable
                    .create(new Observable.OnSubscribe<Integer>() {
                        @Override
                        public void call(Subscriber<? super Integer> subscriber) {
                            try {
                                if (!subscriber.isUnsubscribed()) {
                                    for (int i = 0; i < 5; i++) {
                                        subscriber.onNext(i);
                                    }
                                }
                                subscriber.onCompleted();
                            } catch (Exception e) {
                                subscriber.onError(e);
                            }
    
                        }
                    })
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onCompleted() {
                            System.out.println("Completed");
                        }
    
                        @Override
                        public void onError(Throwable throwable) {
                            System.out.println(throwable.getMessage());
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            System.out.print(integer + ", ");
                        }
                    });
        }
    }
    

    运行结果:

    0, 1, 2, 3, 4, Completed
    

    2.0中,这个操作符改动蛮大


    2.9 Empty/Never/Throw

    • Empty:
      创建一个不发射任何数据但是正常终止的Observable

    • Never:
      创建一个不发射数据也不终止的Observable

    • Throw:
      创建一个不发射数据以一个错误终止的Observable

    这三个操作符生成的Observable行为非常特殊和受限。测试的时候很有用,有时候也用于结合其它的Observables,或者作为其它需要Observable的操作符的参数。

    RxJava将这些操作符实现为 empty,never和errorerror操作符需要一个Throwable参数,你的Observable会以此终止。这些操作符默认不在任何特定的调度器上执行,但是empty和error有一个可选参数是Scheduler,如果传递了Scheduler参数,它们会在这个调度器上发送通知


    3.最后

    基本就是文档照搬,创建操作符大致就这些,宝石图并没有贴,可以去文档进行查看

    本人很菜,有错误请指出

    共勉 :)

    相关文章

      网友评论

        本文标题:RxJava——基础学习(四),创建操作符

        本文链接:https://www.haomeiwen.com/subject/hlmtmttx.html