美文网首页
学习RxJava2---finish

学习RxJava2---finish

作者: Thor_果冻 | 来源:发表于2018-12-31 12:58 被阅读0次

RxJava

观看菜鸟窝视频RxJava2整理
我的github整理学习rxjava

RxJava是一种响应式编程
采用观察者模式
好处:异步、简洁

RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。

观察者模式

观察者(Observer)模式:是对象的行为模式,又叫发布-订阅(Publish/Subscribe)模式、模型-视图(Model/View)模式、源-监听(Source/Listener)模式或者从属(Dependents)模式

观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象,这个主题对象再状态上发生变化时,会通知所有观察者对象,使它们能够自动更新自己。


  • 抽象主题(Subject)角色:
    抽象主题角色把所有对观察者对象的引用保存在一个聚集(比如ArrayList对象)里,每个主题都可以有任何数量的观察者。抽象主题提供一个接口,可以增加和删除观察者对象,抽象主题角色又叫抽象被观察者角色。
  • 具体主题(ConcreteSubject)角色:
    将有关状态存入具体观察者对象;在具体主题的内部状态改变时,给所有登记过的观察者发出通知。具体主题角色又叫做具体被观察者角色。
  • 抽象观察者(Observer)角色:
    为所有的具体观察者定义一个接口,在得到主题通知时更新自己,这个接口叫做更新接口。
  • 具体观察者(ConcreteObserver)角色:
    存储与主题的状态自怡的状态。具体观察者角色实现抽象观察者角色所有要求的更新接口、以便使本身的状态与主题的状态协调。如果需要,具体观察者角色可以保持一个指向具体主题对象的引用。

基本使用

public class MainActivity extends AppCompatActivity {

    private static final String TAG = "123===";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
    }

    public void click(View view) {
        //点击按钮

        //第一种创建方法,最基本的使用
//        firstMethod();

        //第二种创建方法,简单的方法
        secondMethod();
    }

    private void secondMethod() {
//        Observable<String> observable = Observable.just("1", "2", "3");
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("1");
                e.onNext("2");
                e.onNext("3");
                e.onComplete();
//                e.onError(new Throwable("----onError"));//如果是错误结束,在subscribe订阅的时候必须加上第二个参数
            }
        });

        observable.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: "+s);
            }
        }/*, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.d(TAG, "accept: "+throwable.getLocalizedMessage());
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "run: ");
            }
        }*/);
    }

    private void firstMethod() {
        //第一步创建Observable, 被观察这
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                Log.d(TAG, "subscribe: 1--");
                e.onNext("1");//发送数据
                Log.d(TAG, "subscribe: 2--");
                e.onNext("2");
                Log.d(TAG, "subscribe: 3--");
                e.onNext("3");
                Log.d(TAG, "subscribe: 完成--");
                e.onComplete();//完成是调用
//                e.onError(new Throwable("我是错误信息"));//在错误的时候调用(完成或错误只能调用一个,不可以两个同时调用)
            }
        });
        //第二部创建observer,观察者
        Observer<String> observer = new Observer<String>() {

            private Disposable dd;

            @Override
            public void onSubscribe(@NonNull Disposable d) {
                //
                dd = d;
//                d.dispose();//移除订阅关系
//                d.isDisposed();//是否移除订阅关系
                Log.d(TAG, "onSubscribe: " + d.isDisposed());
            }

            @Override
            public void onNext(@NonNull String s) {
                //事件
                Log.d(TAG, "onNext: " + s);
                if ("2".equals(s)) {
                    dd.dispose();
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {
                //错误
                Log.d(TAG, "onError: " + e.getLocalizedMessage());
            }

            @Override
            public void onComplete() {
                //完成
                Log.d(TAG, "onComplete: ");
            }
        };
        //实现订阅关系
        observable.subscribe(observer);
    }
}

Scheduler线程控制

  • Schedulers.immediate():直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。
  • Schedulers.newThread():总是启用新线程,并在新线程执行操作。
  • Schedulers.io():i/o操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io()比newThread()更有效率。不要把计算工作放入io()中,可以避免创建不必要的线程。
  • Schedulers.computatuion():计算所使用的Scheduler。这个计算指的是CPU密集型计算,即不会被i/o等操作限制性能的操作,例如图形的计算。这个Scheduler使用的时固定的线程池,大小为CPU核数。不要把I/O操作放在computation()中,否则I/O操作的等待时间会浪费CPU。
  • AndroidSchedulers.mainThread():它指定的操作将在Android主线程运行。
Observable.create(new ObservableOnSubscribe<User>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<User> e) throws Exception {
                //请求网络
                User user = new User("com.thor.wdd", "1", "wdd");

                e.onNext(user);
            }
        }).subscribeOn(Schedulers.io())//Observable切换到子线程
                .observeOn(AndroidSchedulers.mainThread())//Observer切换到主线程
                .subscribe(new Observer<User>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull User user) {
                        Log.d(TAG, "onNext: " + user.toString());
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

subscribeOn(Schedulers.io())//Observable、Observer切换到子线程
observeOn(AndroidSchedulers.mainThread())//Observer切换到主线程

操作符介绍

操作符分类跳转到官翻

下面都是我整理学习的记录在github

创建操作符

        creat;
        just;//快速创建对象,最多发送十个对象
        fromArray;//数组遍历
        fromIterable;//遍历
        empty;//仅发送Complete事件,直接通知完成
        error;//仅发送Error事件,直接通知异常
        never;//不发送任何事件
        defer;//直到有观察者observer订阅时,才动态创建被观察者对象 并 发送事件
        timer;//延迟指定时间后,调用一次onNext(0)
        interval;//每隔指定事件发送事件,无限叠加发送,从0开始
        intervalRange;//每隔指定时间 就发送事件,并指定发送事件数量
        range;//连续发送一个事件序列,可指定次数

变换操作符

        map;//数据转换
        flatMap;//将被观察者发送的事件序列进行拆分,并且单独转换,再合并成一个新的事件序列,最后进行发送
        concatMap;//类似FlatMap()操作符;区别:拆分并且重新合并生成的事件序列的顺序 等于 被观察者旧序列生产的顺序
        buffer;//缓存区大小 = 每次从被观察者中获取的事件数量, 步长 = 从当前位置向后移动几位

组合/合并操作符

        concat;//concat组合被观察者数量<=4,顺序执行
        concatArray;//concatArray组合观察者数量>4,顺序执行
        merge;//组合被观察者数量<=4,非顺序执行
        mergeArray;//组合被观察者数量>4,非顺序执行
        concatArrayDelayError;//第1个被观察者的Error事件将在第2个被观察者发送完事件后再继续发送,mergeDelayError()操作符同理
        mergeDelayError;
        zip;//严格按照原先事件序列 进行对位合并,最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
        combineLatest;//当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据,与reduce区别就是接收的事件是每次都有而reduce只有最后一次输出信息
        reduce;//把前2个数据聚合,然后与后1个数据继续进行聚合,依次类推
        collect;//将被观察者Observable发送的数据事件收集到一个数据结构里
        startWith;//在一个被观察者发送事件前,追加发送一些数据/ 一个新的被观察者, 后调用的startWith先追加
        startWithArray;
        count;//统计被观察者发送事件的数量

功能性操作符

        delay;//指定延迟时间
        do;//在某个事件的生命周期中调用
        retry;//重试,即出现错误,让被观察者重新发送数据
        retryWhen;//出现错误后,判断是否需要重新发送数据
        repeat;//重复不断地发送被观察者事件
        repeatWhen;//有条件地、重复发送 被观察者事件

过滤操作符

        filter;过滤 特定条件的事件
        ofType;过滤 特定类型的数据
        skip;跳过指定事件,也可跳过指定时间时间
        skipLast;跳过指定最后事件,也可跳过最后时间时间
        distinct;去重
        distinctUntilChanged;连续重复才去重
        take;接收事件数量
        takeLast;只接收最后几个事件数量
        throttleFirst;指定时间内只接收第一次事件
        throttleLast/sample;指定时间内只接收最后一次次事件
        throttleWithTimeout/debounce;采样频率,指定时间只接收最新事件
        firstElement;//获取第一个事件
        lastElement;//获取最后一个事件
        elementAt;//获取指定位置事件,如果超出位置没有任何提示,如果需要提示则调用elementAtOrError(),如超出位置需要指定默认值则调用elementAt 2个参数的方法

布尔操作符

    takeWhile;判断发送的每项数据是否满足设置的函数条件
    skipWhile;跳过满足条件的那些数据,发送不满足那些条件的数据
    takeUntil;Predicate参数时:执行到条件成立时,停止发送事件,但本次事件会发送出去;observable参数时:第二个observable开始发送数据时,原始的observable停止发送事件
    sequenceEqual;判定两个Observables需要发送的数据是否相同
    contains;判断发送的数据中是否包含指定数据
    isEmpty;判断发送的数据是否为空
    defaultIfEmpty;在不发送任何有效事件( Next事件)、仅发送了 Complete 事件的前提下,发送一个默认值
    amb;当需要发送多个 Observable时,只发送 先发送数据的Observable的数据,而其余 Observable则被丢弃

相关文章

  • 学习RxJava2---finish

    RxJava 观看菜鸟窝视频RxJava2整理我的github整理学习rxjava RxJava是一种响应式编程采...

  • 学习学习学习

    第三天了,连续三天,早上睁眼开始,看视频,做课件,连续作业,直到晚上十二点才睡觉。吃饭不规律,想起来就吃,感觉不饿...

  • 学习学习学习

    23岁的我,才真正明白,什么是学习,什么是努力,努力和不努力真的不同,就好比同样是一篇稿子,我用一周背下来,有的人...

  • 学习学习学习!

    妈妈总是让我学习,我只能用装当办法。 方法一: 方法二: 方法三: 方法四: ...

  • 学习学习学习

    001.今天看财富自由之路看了第二遍,而且看了一半,算是完成任务很开心。中间有想放弃的念头,坚持看完。眼睛痛,一直...

  • 学习学习学习

    马自达为什么坚持高压缩比自吸

  • 学习!学习!学习!

    学习的痛苦是暂时的 没有学到的痛苦是永恒的 因为学习而特别充实的一天 很踏实 ~~~~ 2015.11.28.阴天...

  • 学习!学习!学习!

    无数次想要去逃离,可这封闭的世界根本出不去。你没有什么可以抛弃、只能咬着牙带着面具微笑的活下去。 没有那个人、他也...

  • 学习学习学习!

    昨天和今天两个上午,都在学习新媒体运营,学习的过程中心里只有一个想法:这也太套路,太功利了吧。可真应了那句话...

  • 学习,学习,学习!

    近期学习重点有两个方面,一方面是把上一个阶段定下任务的几本书读完,并在读的过程中有输出和转化,让阅读和学习真正能有...

网友评论

      本文标题:学习RxJava2---finish

      本文链接:https://www.haomeiwen.com/subject/uqdplqtx.html