关于Rxjava的总结

作者: 奔跑吧李博 | 来源:发表于2018-02-09 13:17 被阅读87次

1. Rxjava是什么

a library for composing asynchronous and event-based programs using observable sequences for the Java VM(一个对于构成使用的Java虚拟机观察序列异步和基于事件的程序库)。随着项目逻辑的越来越复杂,在Rxjava的作用下却会保持始终的简洁,因为Rxjava不会存在一层一层的嵌套,步骤清晰。特别是配合Lamda表达式的情况下。

Rxjava的github地址

2. 引用库

    compile 'io.reactivex:rxjava:1.0.14'
    compile 'io.reactivex:rxandroid:1.0.1'

3. Rxjava的三大主角

  1. Observable 被观察者
  2. Subscriber/Observer 观察者
  3. OnSubscribe 被观察者和观察者的桥梁
    Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。比如我一家人要去餐厅吃饭,我是Observable,服务员是OnSubscribe,厨房是Subscriber,我通过服务员点餐,服务员将我的订单告诉厨房,厨房经过做各道菜,各道工序处理,每做好一道菜,就通知服务员端出来给我。菜做好端出来的回调是异步的,无需我一直等待。

4. 观察者的回调方法

  • onNext(T item)
    Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。

  • onError(Exception ex)
    当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。

  • onComplete
    正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。

5. 创建方式

create方式

内部代码:

    public final static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }

创建观察着和被观察者:

        Observable observable = Observable.create(new Observable.OnSubscribe<String>(){

            @Override
            public void call(Subscriber<? super String> subscriber) {
                //给观察者发送事件
                subscriber.onStart();
                subscriber.onNext("你好");
                subscriber.onNext("在吗");
                subscriber.onNext("中午吃什么?");
                subscriber.onCompleted();
            }
        });

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onNext(String s) {
                Log.i("minfo",s);
            }

            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }
        };

        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) {
                Log.i("minfo",s);
            }

            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }
        };

订阅:

        observable.subscribe(subscriber);
        observable.subscribe(observer);

Subscriber是实现了Observer的抽象类,比Observer多了一个onStart()和unsubscribe()方法。
onStart(),会在发送事件还没开始前调用,unsubscribe(),可以将Subscriber取消订阅。
还有不完整事件Action0和Action1也可以作为观察者。

from方式
    public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
        return create(new OnSubscribeFromIterable<T>(iterable));
    }

接受数组或集合,返回一个按参数列表顺序发射这些数据的Observable。

        String[] array={"How are you?","I am fine.","Thank you.","And you?"};
        Observable.from(array)
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {

                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        Log.i("TAG", "onNext" + s);
                    }
                });
just方式

接受1-9个参数,它们还可以是不同类型,返回一个按参数列表顺序发射这些数据的Observable。

        Observable.just(1,2,3,4)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {

                    }
                });

6. 变换操作

map

将图片本地路径,通过map方法转化,获取Bitmap。

       Observable.just("../image/logo.png")
        .map(new Func1<String, Bitmap>() {
            @Override
            public Bitmap call(String filePath) {
                return BitmapFactory.decodeFile(filePath);
            }
        })
        .subscribe(new Action1<Bitmap>() {
            @Override
            public void call(Bitmap bitmap) {
                imageView.setImageBitmap(bitmap);
            }
        });
flipMap

flatMap()接收一个Observable的输出作为输入,然后作为一个新的Observable再发射。

String[] array={"How are you?","I am fine.","Thank you.","And you?"};
Observable.from(array)
                .flatMap(new Func1<String, Observable<String>>() {
                    @Override
                    public Observable<String> call(String s) {
                        return Observable.just(s.toUpperCase());
                    }
                })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                       
                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(String s) {
                        Log.i("TAG", "onNext=" + s);
                    }
                });
scan

一个累加器函数,将依次2个元素作处理,再将结果同下一个元素继续处理。

7. 过滤操作

在Func中对每项元素进行过滤处理,满足条件的元素才会继续发送,比如这里的过滤偶数。

filter
        Observable.just(2,5,25,6,9,14,34,51)
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer%2 == 0;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {

                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(Integer integer) {
                        Log.i("TAG", "onNext" + integer);
                    }
                });
take()、takeLast()

只传递前几个元素

8. 合并操作

Merge操作

合并多个Observable,按照加入的Observable顺序将各个元素传递。

        Observable<Integer> observable1 = Observable.just(22, 4, 23,13);
        Observable<Integer> observable2 = Observable.just(15, 24, 3,66,133);
        Observable.merge(observable1,observable2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {

                    }
                    @Override
                    public void onError(Throwable e) {
                    }
                    @Override
                    public void onNext(Integer integer) {
                        Log.i("TAG", "onNext" + integer);
                    }
                });
zip操作

将各个Observable个对应位置各个元素取出做操作,然后将结果传递。

        Observable<Integer> observable1 = Observable.just(22, 4, 23,13);
        Observable<Integer> observable2 = Observable.just(15, 24, 3,66);
Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>
    @Override
    public Integer call(Integer integer, Integer integer2) {
        return integer + integer2;
    }
})
        .subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {

            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(Integer integer) {
                Log.i("TAG", "onNext" + integer);
            }
        });

9. 线程调度

RxJava默认遵循的是线程不变的原则。在事件在哪个线程产生,也同样在该线程接收事件。但是,如果在事件产生线程为子线程,处理线程中操作了UI,就会发生崩溃,这个要留意一下。


线程调度类型
SubscribeOn、ObserveOn

subscribeOn():事件产生的线程
observeOn():事件消费的线程

使用例子:子线程处理任务回调给主线程

Observable.create(new Observable.OnSubscribe() {
        @Override
        public void call(Subscriber subscriber) {
            Log.i("TAG", 执行耗时操作);
            try {
                Thread.sleep(2000);
                subscriber.onNext("耗时操作完成");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }).subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
        @Override
        public void call(String s) {
            Log.i("TAG", "获取到数据");
        }
    });
timer做定时器任务:
        Observable.timer(1000 , TimeUnit.MILLISECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        
                    }
                });

好了,以上就是我对于Rxjava学习的总结。该文章持续更新,更好地理清思路。

相关文章

网友评论

    本文标题:关于Rxjava的总结

    本文链接:https://www.haomeiwen.com/subject/embozxtx.html