美文网首页
学习RxJava2---finish

学习RxJava2---finish

作者: Thor_果冻 | 来源:发表于2018-12-31 12:58 被阅读0次

    RxJava

    观看菜鸟窝视频RxJava2整理
    我的github整理学习rxjava

    RxJava是一种响应式编程
    采用观察者模式
    好处:异步、简洁

    RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。

    观察者模式

    观察者(Observer)模式:是对象的行为模式,又叫发布-订阅(Publish/Subscribe)模式、模型-视图(Model/View)模式、源-监听(Source/Listener)模式或者从属(Dependents)模式

    观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象,这个主题对象再状态上发生变化时,会通知所有观察者对象,使它们能够自动更新自己。


    • 抽象主题(Subject)角色:
      抽象主题角色把所有对观察者对象的引用保存在一个聚集(比如ArrayList对象)里,每个主题都可以有任何数量的观察者。抽象主题提供一个接口,可以增加和删除观察者对象,抽象主题角色又叫抽象被观察者角色。
    • 具体主题(ConcreteSubject)角色:
      将有关状态存入具体观察者对象;在具体主题的内部状态改变时,给所有登记过的观察者发出通知。具体主题角色又叫做具体被观察者角色。
    • 抽象观察者(Observer)角色:
      为所有的具体观察者定义一个接口,在得到主题通知时更新自己,这个接口叫做更新接口。
    • 具体观察者(ConcreteObserver)角色:
      存储与主题的状态自怡的状态。具体观察者角色实现抽象观察者角色所有要求的更新接口、以便使本身的状态与主题的状态协调。如果需要,具体观察者角色可以保持一个指向具体主题对象的引用。

    基本使用

    public class MainActivity extends AppCompatActivity {
    
        private static final String TAG = "123===";
    
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
        }
    
        public void click(View view) {
            //点击按钮
    
            //第一种创建方法,最基本的使用
    //        firstMethod();
    
            //第二种创建方法,简单的方法
            secondMethod();
        }
    
        private void secondMethod() {
    //        Observable<String> observable = Observable.just("1", "2", "3");
            Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    e.onNext("1");
                    e.onNext("2");
                    e.onNext("3");
                    e.onComplete();
    //                e.onError(new Throwable("----onError"));//如果是错误结束,在subscribe订阅的时候必须加上第二个参数
                }
            });
    
            observable.subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG, "accept: "+s);
                }
            }/*, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.d(TAG, "accept: "+throwable.getLocalizedMessage());
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                    Log.d(TAG, "run: ");
                }
            }*/);
        }
    
        private void firstMethod() {
            //第一步创建Observable, 被观察这
            Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    Log.d(TAG, "subscribe: 1--");
                    e.onNext("1");//发送数据
                    Log.d(TAG, "subscribe: 2--");
                    e.onNext("2");
                    Log.d(TAG, "subscribe: 3--");
                    e.onNext("3");
                    Log.d(TAG, "subscribe: 完成--");
                    e.onComplete();//完成是调用
    //                e.onError(new Throwable("我是错误信息"));//在错误的时候调用(完成或错误只能调用一个,不可以两个同时调用)
                }
            });
            //第二部创建observer,观察者
            Observer<String> observer = new Observer<String>() {
    
                private Disposable dd;
    
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    //
                    dd = d;
    //                d.dispose();//移除订阅关系
    //                d.isDisposed();//是否移除订阅关系
                    Log.d(TAG, "onSubscribe: " + d.isDisposed());
                }
    
                @Override
                public void onNext(@NonNull String s) {
                    //事件
                    Log.d(TAG, "onNext: " + s);
                    if ("2".equals(s)) {
                        dd.dispose();
                    }
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    //错误
                    Log.d(TAG, "onError: " + e.getLocalizedMessage());
                }
    
                @Override
                public void onComplete() {
                    //完成
                    Log.d(TAG, "onComplete: ");
                }
            };
            //实现订阅关系
            observable.subscribe(observer);
        }
    }
    

    Scheduler线程控制

    • Schedulers.immediate():直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。
    • Schedulers.newThread():总是启用新线程,并在新线程执行操作。
    • Schedulers.io():i/o操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io()比newThread()更有效率。不要把计算工作放入io()中,可以避免创建不必要的线程。
    • Schedulers.computatuion():计算所使用的Scheduler。这个计算指的是CPU密集型计算,即不会被i/o等操作限制性能的操作,例如图形的计算。这个Scheduler使用的时固定的线程池,大小为CPU核数。不要把I/O操作放在computation()中,否则I/O操作的等待时间会浪费CPU。
    • AndroidSchedulers.mainThread():它指定的操作将在Android主线程运行。
    Observable.create(new ObservableOnSubscribe<User>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<User> e) throws Exception {
                    //请求网络
                    User user = new User("com.thor.wdd", "1", "wdd");
    
                    e.onNext(user);
                }
            }).subscribeOn(Schedulers.io())//Observable切换到子线程
                    .observeOn(AndroidSchedulers.mainThread())//Observer切换到主线程
                    .subscribe(new Observer<User>() {
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(@NonNull User user) {
                            Log.d(TAG, "onNext: " + user.toString());
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    subscribeOn(Schedulers.io())//Observable、Observer切换到子线程
    observeOn(AndroidSchedulers.mainThread())//Observer切换到主线程

    操作符介绍

    操作符分类跳转到官翻

    下面都是我整理学习的记录在github

    创建操作符

            creat;
            just;//快速创建对象,最多发送十个对象
            fromArray;//数组遍历
            fromIterable;//遍历
            empty;//仅发送Complete事件,直接通知完成
            error;//仅发送Error事件,直接通知异常
            never;//不发送任何事件
            defer;//直到有观察者observer订阅时,才动态创建被观察者对象 并 发送事件
            timer;//延迟指定时间后,调用一次onNext(0)
            interval;//每隔指定事件发送事件,无限叠加发送,从0开始
            intervalRange;//每隔指定时间 就发送事件,并指定发送事件数量
            range;//连续发送一个事件序列,可指定次数
    

    变换操作符

            map;//数据转换
            flatMap;//将被观察者发送的事件序列进行拆分,并且单独转换,再合并成一个新的事件序列,最后进行发送
            concatMap;//类似FlatMap()操作符;区别:拆分并且重新合并生成的事件序列的顺序 等于 被观察者旧序列生产的顺序
            buffer;//缓存区大小 = 每次从被观察者中获取的事件数量, 步长 = 从当前位置向后移动几位
    

    组合/合并操作符

            concat;//concat组合被观察者数量<=4,顺序执行
            concatArray;//concatArray组合观察者数量>4,顺序执行
            merge;//组合被观察者数量<=4,非顺序执行
            mergeArray;//组合被观察者数量>4,非顺序执行
            concatArrayDelayError;//第1个被观察者的Error事件将在第2个被观察者发送完事件后再继续发送,mergeDelayError()操作符同理
            mergeDelayError;
            zip;//严格按照原先事件序列 进行对位合并,最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
            combineLatest;//当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据,与reduce区别就是接收的事件是每次都有而reduce只有最后一次输出信息
            reduce;//把前2个数据聚合,然后与后1个数据继续进行聚合,依次类推
            collect;//将被观察者Observable发送的数据事件收集到一个数据结构里
            startWith;//在一个被观察者发送事件前,追加发送一些数据/ 一个新的被观察者, 后调用的startWith先追加
            startWithArray;
            count;//统计被观察者发送事件的数量
    

    功能性操作符

            delay;//指定延迟时间
            do;//在某个事件的生命周期中调用
            retry;//重试,即出现错误,让被观察者重新发送数据
            retryWhen;//出现错误后,判断是否需要重新发送数据
            repeat;//重复不断地发送被观察者事件
            repeatWhen;//有条件地、重复发送 被观察者事件
    

    过滤操作符

            filter;过滤 特定条件的事件
            ofType;过滤 特定类型的数据
            skip;跳过指定事件,也可跳过指定时间时间
            skipLast;跳过指定最后事件,也可跳过最后时间时间
            distinct;去重
            distinctUntilChanged;连续重复才去重
            take;接收事件数量
            takeLast;只接收最后几个事件数量
            throttleFirst;指定时间内只接收第一次事件
            throttleLast/sample;指定时间内只接收最后一次次事件
            throttleWithTimeout/debounce;采样频率,指定时间只接收最新事件
            firstElement;//获取第一个事件
            lastElement;//获取最后一个事件
            elementAt;//获取指定位置事件,如果超出位置没有任何提示,如果需要提示则调用elementAtOrError(),如超出位置需要指定默认值则调用elementAt 2个参数的方法
    

    布尔操作符

        takeWhile;判断发送的每项数据是否满足设置的函数条件
        skipWhile;跳过满足条件的那些数据,发送不满足那些条件的数据
        takeUntil;Predicate参数时:执行到条件成立时,停止发送事件,但本次事件会发送出去;observable参数时:第二个observable开始发送数据时,原始的observable停止发送事件
        sequenceEqual;判定两个Observables需要发送的数据是否相同
        contains;判断发送的数据中是否包含指定数据
        isEmpty;判断发送的数据是否为空
        defaultIfEmpty;在不发送任何有效事件( Next事件)、仅发送了 Complete 事件的前提下,发送一个默认值
        amb;当需要发送多个 Observable时,只发送 先发送数据的Observable的数据,而其余 Observable则被丢弃
    

    相关文章

      网友评论

          本文标题:学习RxJava2---finish

          本文链接:https://www.haomeiwen.com/subject/uqdplqtx.html