美文网首页
RxJava操作符(创建)

RxJava操作符(创建)

作者: Charein | 来源:发表于2019-02-13 08:59 被阅读0次

    包括create,defer,just,fromfromArrayfromIterablefromPublisher

    create

    通过以编程方式调用observer方法从头创建一个Observable,参考http://reactivex.io/documentation/operators/create.html

    image.png
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
    

    例如:

    Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    for (int i = 0; i < 3; i++) {
                        e.onNext("#" + i);
                    }
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println("accept: " + s);
                }
            });
    
           // 输出
           // System.out: accept: #0
           // System.out: accept: #
           // System.out: accept: #2
    

    defer

    在观察者订阅之前不要创建Observable,并为每个观察者创建一个新的Observable。
    Defer运算符等待观察者订阅它,然后它生成一个Observable,通常带有Observable工厂函数。 它为每个订阅者重新执行此操作,因此尽管每个订阅者可能认为它订阅了相同的Observable,但实际上每个订阅者都获得其自己的单独序列。
    在某些情况下,等到最后一分钟(即直到订阅时间)生成Observable可以确保此Observable包含最新的数据。
    参考:http://reactivex.io/documentation/operators/defer.html

    image.png
    public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier);
    

    例如

        private String msg = "#0";
    
        @SuppressLint("CheckResult")
        public void testDefer() throws InterruptedException {
            // 不停的更新msg内容
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i=0; i<10; i++) {
                        msg = "#" + i;
                        System.out.println("msg: " + msg);
                        try {
                            TimeUnit.MILLISECONDS.sleep(300);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
    
            Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
                @Override
                public ObservableSource<String> call() throws Exception {
                    return Observable.just(msg);
                }
            });
    //        Observable<String> observable = Observable.just(msg);
    
            TimeUnit.MILLISECONDS.sleep(1000);
            observable.subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println("accept: " + s);
                }
            });
    
            // 输出
            // System.out: msg: #0
            // System.out: msg: #1
            // System.out: msg: #2
            // System.out: msg: #3
            // System.out: accept: #3
            // System.out: msg: #4
            // System.out: msg: #5
            // System.out: msg: #6
            // System.out: msg: #7
            // System.out: msg: #8
            // System.out: msg: #9
        }
    

    如果把defer换成just,输出结果将为

            // System.out: msg: #0
            // System.out: msg: #1
            // System.out: msg: #2
            // System.out: msg: #3
            // System.out: accept: #0
            // System.out: msg: #4
            // System.out: msg: #5
            // System.out: msg: #6
            // System.out: msg: #7
            // System.out: msg: #8
            // System.out: msg: #9
    

    just

    public static <T> Observable<T> just(T item)
    public static <T> Observable<T> just(T item1, T item2)
    // 最多可以带10个参数
    

    例如

    Observable.just("#0")
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            // System.out: accept: #0
                            System.out.println("accept: " + s);
                        }
                    });
    

    fromArray

    public static <T> Observable<T> fromArray(T... items)
    

    例如

    Observable.fromArray("#0", "#1", "#2")
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            // System.out: accept: #0
                            // System.out: accept: #1
                            // System.out: accept: #2
                            System.out.println("accept: " + s);
                        }
                    });
    

    fromIterable

    public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
    

    例如

    Observable.fromIterable(new Iterable<String>() {
                @NonNull
                @Override
                public Iterator<String> iterator() {
                    List<String> list = new ArrayList<>();
                    list.add("#0");
                    list.add("#1");
                    list.add("#2");
                    return list.iterator();
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    // System.out: accept: #0
                    // System.out: accept: #1
                    // System.out: accept: #2
                    System.out.println("accept: " + s);
                }
            });
    

    fromPublisher

    public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
    

    例如

    Observable.fromPublisher(new Publisher<String>() {
                @Override
                public void subscribe(Subscriber<? super String> s) {
                    for (int i = 0; i < 4; i++) {
                        s.onNext("#" + i);
                    }
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    // System.out: accept: #0
                    // System.out: accept: #1
                    // System.out: accept: #2
                    System.out.println("accept: " + s);
                }
            });
    

    相关文章

      网友评论

          本文标题:RxJava操作符(创建)

          本文链接:https://www.haomeiwen.com/subject/rcikeqtx.html