美文网首页
Android RxJava 基本操作符-创建操作符

Android RxJava 基本操作符-创建操作符

作者: MengkZhang | 来源:发表于2019-07-09 15:55 被阅读0次
package com.zhang.rxjavademo1;

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

/**
 * RxJava入门
 */
public class MainActivity extends AppCompatActivity {

    public static final String TAG = "MainActivity";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        RxJava01();

        RxJava02();

        RxJava03();

        RxJava04();

        RxJava05();

        RxJavaJiChuCaoZuoFu();

    }

    /**
     * RxJava的基础操作符
     */
    private void RxJavaJiChuCaoZuoFu() {
        RxJavaCreate();
        RxJavaCreateUse();
        RxJavaQuitCreateJust();
        RxJavaQuitCreateFromArray();
        RxJavaQuitCreateFromIterable();
        RxJavaCreateDefer();
        RxJavaCreateTimer();
//        RxJavaCreateInterval();
        RxJavaCreateIntervalRange();
    }

    /**
     * 快速创建一个被观察者对象
     *
     * intervalRange()
     * 快速创建1个被观察者对象(Observable)
     * 发送事件的特点:每隔指定时间 就发送 事件,可指定发送的数据的数量
     *
     * 发送的事件序列 = 从0开始、无限递增1的的整数序列
     * 作用类似于interval(),但可指定发送的数据的数量
     *
     */
    private void RxJavaCreateIntervalRange() {
        log("==========================RxJavaCreateIntervalRange分割start===============================");
        // 参数说明:
        Observable.intervalRange(
                3,         //事件序列起始点
                10,       //事件数量
                2,   //第1次事件延迟发送时间
                1,       //间隔时间数字
                TimeUnit.SECONDS //时间单位
        )
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        log("onSubscribe连接");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        log("接收到的事件:" + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("响应error");
                    }

                    @Override
                    public void onComplete() {
                        log("响应onComplete");
                    }
                });
        //结果
        // ==========================RxJavaCreateIntervalRange分割start===============================
        // onSubscribe连接
        // ==========================RxJavaCreateIntervalRange分割end===============================
        // 接收到的事件:0
        // 响应onComplete
        // 接收到的事件:3
        // 接收到的事件:4
        // 接收到的事件:5
        // 接收到的事件:6
        log("==========================RxJavaCreateIntervalRange分割end===============================");
    }

    /**
     * 快速创建一个被观察者对象
     *
     * interval()作用
     * 快速创建1个被观察者对象(Observable)
     * 发送事件的特点:每隔指定时间 就发送 事件
     *
     * 发送的事件序列 = 从0开始、无限递增1的的整数序列
     *
     */
    private void RxJavaCreateInterval() {
        log("==========================RxJavaCreateInterval分割start===============================");
        // 参数说明:
        // 参数1 = 第1次延迟时间;
        // 参数2 = 间隔时间数字;
        // 参数3 = 时间单位;
        Observable.interval(3,1,TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        log("采用onSubscribe连接");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        log("接收到的事件:" + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("响应error");
                    }

                    @Override
                    public void onComplete() {
                        log("响应onComplete");
                    }
                });
        //结果 每过三秒接收一个事件
        // ==========================RxJavaCreateInterval分割start===============================
        // 采用onSubscribe连接
        // ==========================RxJavaCreateInterval分割end===============================
        // 接收到的事件:0
        // 响应onComplete
        // 接收到的事件:0
        // 接收到的事件:1
        // 接收到的事件:2
        // 接收到的事件:3
        // 接收到的事件:4

        log("==========================RxJavaCreateInterval分割end===============================");
    }

    /**
     * 快速创建一个被观察者对象
     *
     * timer()作用
     * 快速创建1个被观察者对象(Observable)
     * 发送事件的特点:延迟指定时间后,发送1个数值0(Long类型)
     *
     * 本质 = 延迟指定时间后,调用一次 onNext(0)
     *
     * 注:timer操作符默认运行在一个新线程上
     * 也可自定义线程调度器(第3个参数):timer(long,TimeUnit,Scheduler)
     *
     */
    private void RxJavaCreateTimer() {
        // 该例子 = 延迟2s后,发送一个long类型数值
        Observable.timer(2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        log("采用onSubscribe连接");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        log("接收到的事件:" + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("响应onError");
                    }

                    @Override
                    public void onComplete() {
                        log("响应onComplete");
                    }
                });
        //结果
        // 采用onSubscribe连接
        // 接收到的事件:0
        // 响应onComplete

        log();
    }

    /**
     * 延迟创建被观察者对象
     *
     * defer()作用
     * 直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件
     *
     * 通过 Observable工厂方法创建被观察者对象(Observable)
     * 每次订阅后,都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的
     *
     * 应用场景
     * 动态创建被观察者对象(Observable) & 获取最新的Observable对象数据
     *
     */

    //1. 第1次对i赋值
    Integer i = 10;
    private void RxJavaCreateDefer() {
        // 2. 通过defer 定义被观察者对象
        // 注:此时被观察者对象还没创建
        Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> call() throws Exception {
                return Observable.just(i);
            }
        });

        //第2次对i赋值
        i = 15;
        observable.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                log("开始采用onSubscribe连接");
            }

            @Override
            public void onNext(Integer integer) {
                log("接收到的值:" + integer);
            }

            @Override
            public void onError(Throwable e) {
                log("响应onError事件");
            }

            @Override
            public void onComplete() {
                log("响应onComplete事件");
            }
        });

        //结果
        // 开始采用onSubscribe连接
        // 接收到的值:15
        // 响应onComplete事件

        //因为是在订阅时才创建,所以i值会取第2次的赋值

        log();

    }

    /**
     * 快速创建被观察者对象
     * <p>
     * fromIterable()作用
     * 快速创建1个被观察者对象(Observable)
     * 发送事件的特点:直接发送 传入的集合List数据
     * <p>
     * 会将数组中的数据转换为Observable对象
     * <p>
     * 应用场景
     * 快速创建 被观察者对象(Observable) & 发送10个以上事件(集合形式)
     * 集合元素遍历
     */
    private void RxJavaQuitCreateFromIterable() {
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 4; i++) {
            list.add(i);
        }
        // 2. 通过fromIterable()将集合中的对象 / 数据发送出去
        Observable.fromIterable(list)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        log("开始采用subscribe()连接");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        log("收到了事件:" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        log("对onComplete事件作出响应");
                    }
                });

        //结果:
        // 开始采用subscribe()连接
        // 收到了事件:0
        // 收到了事件:1
        // 收到了事件:2
        // 收到了事件:3
        // 对onComplete事件作出响应

        log();

        //遍历集合
        Observable.fromIterable(list)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        log("开始采用subscribe()连接");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        log("集合中的元素:" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        log("遍历完成");
                    }
                });

        //结果
        // 开始采用subscribe()连接
        // 集合中的元素:0
        // 集合中的元素:1
        // 集合中的元素:2
        // 集合中的元素:3
        // 遍历完成

        log();
    }

    /**
     * 快速创建被观察者对象
     * <p>
     * fromArray()作用
     * 快速创建1个被观察者对象(Observable)
     * 发送事件的特点:直接发送 传入的数组数据
     * <p>
     * 注 :会将数组中的数据转换为Observable对象
     * <p>
     * 应用场景
     * <p>
     * 快速创建 被观察者对象(Observable) & 发送10个以上事件(数组形式)
     * 数组元素遍历
     */
    private void RxJavaQuitCreateFromArray() {
        // 1. 设置需要传入的数组
        Integer[] items = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
        // 2. 创建被观察者对象(Observable)时传入数组
        // 在创建后就会将该数组转换成Observable & 发送该对象中的所有数据
        Observable.fromArray(items)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        log("开始采用subscribe()连接");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        log("收到了事件:" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        log("对Complete事件作出响应");
                    }
                });

        //结果
        // 开始采用subscribe()连接
        // 收到了事件:0
        // 收到了事件:1
        // 收到了事件:2
        // 收到了事件:3
        // 收到了事件:4
        // 收到了事件:5
        // 收到了事件:6
        // 收到了事件:7
        // 收到了事件:8
        // 收到了事件:9
        // 收到了事件:10
        // 收到了事件:11
        // 对Complete事件作出响应


        log();

        //遍历数组
        Observable.fromArray(items)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        log("遍历数组");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        log("数组中的元素:" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        log("遍历结束");
                    }
                });
        //结果
        // 遍历数组
        // 数组中的元素:0
        // 数组中的元素:1
        // 数组中的元素:2
        // 数组中的元素:3
        // 数组中的元素:4
        // 数组中的元素:5
        // 数组中的元素:6
        // 数组中的元素:7
        // 数组中的元素:8
        // 数组中的元素:9
        // 数组中的元素:10
        // 数组中的元素:11
        // 遍历结束

        log();
    }

    /**
     * 快速创建被观察者对象
     * just()作用
     * 快速创建1个被观察者对象(Observable)
     * 发送事件的特点:直接发送 传入的事件
     * 注:最多只能发送10个参数
     * <p>
     * 应用场景
     * 快速创建 被观察者对象(Observable) & 发送10个以下事件
     */
    private void RxJavaQuitCreateJust() {
        // 1. 创建时传入整型1、2、3、4
        // 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
        Observable.just(1, 2, 3, 4)
                // 至此,一个Observable对象创建完毕,以下步骤仅为展示一个完整demo,可以忽略
                // 2. 通过通过订阅(subscribe)连接观察者和被观察者
                // 3. 创建观察者 & 定义响应事件的行为
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        log("开始采用subscribe()连接");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        log("收到了事件:" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        log("对Complete事件作出响应");
                    }
                });

        //结果
        // 开始采用subscribe()连接
        // 收到了事件:1
        // 收到了事件:2
        // 收到了事件:3
        // 收到了事件:4
        // 对Complete事件作出响应
        log();
    }


    /**
     * RxJava基础操作符-create操作符
     */
    private void RxJavaCreate() {
        //1,通过create()创建被观察者对象Observable
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            // 传入参数: OnSubscribe 对象
            // 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
            // 即观察者会依次调用对应事件的复写方法从而响应事件
            // 从而实现由被观察者向观察者的事件传递 & 被观察者调用了观察者的回调方法 ,即观察者模式

            /**
             * 在重写的方法中定义要发送的事件
             * @param emitter
             * @throws Exception
             */
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                //通过ObservableEmitter对象产生发送事件
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });
        //至此 一个完整的被观察者对象就创建完毕

        log();
    }

    /**
     * 在具体使用的时候,一般采用链式调用来创建
     */
    private void RxJavaCreateUse() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                log("开始采用subscribe()连接");
            }
            //默认最先调用重写的onSubscribe()

            @Override
            public void onNext(Integer integer) {
                log("收到了事件:" + integer);
            }

            @Override
            public void onError(Throwable e) {
                log("对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                log("对Complete事件作出响应");
            }
        });
        //程序执行的结果
        // 开始采用subscribe()连接
        // 收到了事件:1
        // 收到了事件:2
        // 收到了事件:3
        // 对Complete事件作出响应
        log();
    }


    private void RxJava05() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                log("上游 发射--a");
                emitter.onNext("上游 发射--a");

                log("上游 发射--b");
                emitter.onNext("上游 发射--b");

                log("上游 发射--c");
                emitter.onNext("上游 发射--c");

                log("上游 发射--d");
                emitter.onNext("上游 发射--d");
            }
        })
                .subscribe(new Observer<String>() {
                    Disposable mDisposable;

                    @Override
                    public void onSubscribe(Disposable d) {

                        mDisposable = d;
                        log("下游 onSubscribe");

                    }

                    @Override
                    public void onNext(String s) {

                        if (s.equals("上游 发射--c")) {
                            mDisposable.dispose();
                        }

                        log("下游 onNext :" + s);

                    }

                    @Override
                    public void onError(Throwable e) {

                        log("下游 onError");

                    }

                    @Override
                    public void onComplete() {

                        log("下游 onComplete");

                    }
                });

        //程序执行结果
//        2019-04-29 22:48:39.534 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 下游 onSubscribe
//        2019-04-29 22:48:39.534 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--a
//        2019-04-29 22:48:39.537 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--a
//        2019-04-29 22:48:39.537 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--b
//        2019-04-29 22:48:39.537 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--b
//        2019-04-29 22:48:39.537 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--c
//        2019-04-29 22:48:39.537 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--c
//        2019-04-29 22:48:39.537 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--d
        log();
    }

    private void RxJava04() {

        //创建上游Observable
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                log("上游 发射--a");
                emitter.onNext("上游 发射--a");

                log("上游 发射--b");
                emitter.onNext("上游 发射--b");

                log("上游 发射--c");
                emitter.onNext("上游 发射--c");

                log("上游 发射--d");
                emitter.onNext("上游 发射--d");

            }
        });

        //创建下游Observer
        Observer<String> observer = new Observer<String>() {

            Disposable mDisposable;

            @Override
            public void onSubscribe(Disposable d) {

                mDisposable = d;
                log("下游 onSubscribe");

            }

            @Override
            public void onNext(String s) {

                if (s.equals("上游 发射--c")) {
                    mDisposable.dispose();
                }

                log("下游 onNext :" + s);

            }

            @Override
            public void onError(Throwable e) {

                log("下游 onError");

            }

            @Override
            public void onComplete() {

                log("下游 onComplete");

            }
        };

        //连接上下游
        observable.subscribe(observer);

        //程序执行结果
//        2019-04-29 22:45:20.821 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 下游 onSubscribe
//        2019-04-29 22:45:20.821 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--a
//        2019-04-29 22:45:20.823 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--a
//        2019-04-29 22:45:20.823 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--b
//        2019-04-29 22:45:20.823 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--b
//        2019-04-29 22:45:20.823 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--c
//        2019-04-29 22:45:20.824 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--c
//        2019-04-29 22:45:20.824 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--d
        log();

    }

    private void RxJava03() {

        //创建上游Observable
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                log("上游 发射--a");
                emitter.onNext("上游 发射--a");

                log("上游 发射--b");
                emitter.onNext("上游 发射--b");

                emitter.onError(new NullPointerException());

                log("上游 发射--c");
                emitter.onNext("上游 发射--c");

                log("上游 发射--d");
                emitter.onNext("上游 发射--d");

            }
        });

        //创建下游Observer
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

                log("下游 onSubscribe");

            }

            @Override
            public void onNext(String s) {

                log("下游 onNext :" + s);

            }

            @Override
            public void onError(Throwable e) {

                log("下游 onError");

            }

            @Override
            public void onComplete() {

                log("下游 onComplete");

            }
        };

        //连接上下游
        observable.subscribe(observer);


        //程序执行结果
//        2019-04-29 22:40:52.290 10822-10822/? E/MainActivity: 下游 onSubscribe
//        2019-04-29 22:40:52.290 10822-10822/? E/MainActivity: 上游 发射--a
//        2019-04-29 22:40:52.292 10822-10822/? E/MainActivity: 下游 onNext :上游 发射--a
//        2019-04-29 22:40:52.292 10822-10822/? E/MainActivity: 上游 发射--b
//        2019-04-29 22:40:52.292 10822-10822/? E/MainActivity: 下游 onNext :上游 发射--b
//        2019-04-29 22:40:52.292 10822-10822/? E/MainActivity: 下游 onError
//        2019-04-29 22:40:52.293 10822-10822/? E/MainActivity: 上游 发射--c
//        2019-04-29 22:40:52.293 10822-10822/? E/MainActivity: 上游 发射--d
        log();

    }

    private void RxJava02() {

        //创建上游Observable
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                log("上游 发射--a");
                emitter.onNext("上游 发射--a");

                log("上游 发射--b");
                emitter.onNext("上游 发射--b");

                emitter.onComplete();

                log("上游 发射--c");
                emitter.onNext("上游 发射--c");

                log("上游 发射--d");
                emitter.onNext("上游 发射--d");

            }
        });

        //创建下游Observer
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

                log("下游 onSubscribe");

            }

            @Override
            public void onNext(String s) {

                log("下游 onNext :" + s);

            }

            @Override
            public void onError(Throwable e) {

                log("下游 onError");

            }

            @Override
            public void onComplete() {

                log("下游 onComplete");

            }
        };

        //连接上下游
        observable.subscribe(observer);


        //程序执行结果

//        2019-04-29 22:34:16.577 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 下游 onSubscribe
//        2019-04-29 22:34:16.577 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--a
//        2019-04-29 22:34:16.580 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--a
//        2019-04-29 22:34:16.580 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--b
//        2019-04-29 22:34:16.580 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--b
//        2019-04-29 22:34:16.582 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 下游 onComplete
//        2019-04-29 22:34:16.582 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--c
//        2019-04-29 22:34:16.582 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--d
        log();
    }

    private void RxJava01() {
        //创建上游的Observable
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("Hello");
                emitter.onNext("World");
                emitter.onNext("!!!!!!");

                emitter.onComplete();
            }
        });

        //创建下游的Observer
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

                Log.e(TAG, "onSubscribe");

            }

            @Override
            public void onNext(String s) {

                Log.e(TAG, "onNext : " + s);

            }

            @Override
            public void onError(Throwable e) {

                Log.e(TAG, "onError");

            }

            @Override
            public void onComplete() {

                Log.e(TAG, "onComplete");

            }
        };

        //连接上下游
        observable.subscribe(observer);


        //程序执行结果

//        2019-04-29 22:23:12.222 10151-10151/com.zhang.rxjavademo1 E/MainActivity: onSubscribe
//        2019-04-29 22:23:12.224 10151-10151/com.zhang.rxjavademo1 E/MainActivity: onNext : Hello
//        2019-04-29 22:23:12.224 10151-10151/com.zhang.rxjavademo1 E/MainActivity: onNext : World
//        2019-04-29 22:23:12.224 10151-10151/com.zhang.rxjavademo1 E/MainActivity: onNext : !!!!!!
//        2019-04-29 22:23:12.224 10151-10151/com.zhang.rxjavademo1 E/MainActivity: onComplete
        log();
    }

    private void log(String msg) {
        Log.e(TAG, msg);
    }

    private void log() {
        Log.e(TAG, "----------------------------------华丽的分割线------------------------------------");
    }

}

相关文章

网友评论

      本文标题:Android RxJava 基本操作符-创建操作符

      本文链接:https://www.haomeiwen.com/subject/cioxkctx.html