1. Rxjava是什么
a library for composing asynchronous and event-based programs using observable sequences for the Java VM(一个对于构成使用的Java虚拟机观察序列异步和基于事件的程序库)。随着项目逻辑的越来越复杂,在Rxjava的作用下却会保持始终的简洁,因为Rxjava不会存在一层一层的嵌套,步骤清晰。特别是配合Lamda表达式的情况下。
2. 引用库
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
3. Rxjava的三大主角
- Observable 被观察者
- Subscriber/Observer 观察者
- OnSubscribe 被观察者和观察者的桥梁
Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。比如我一家人要去餐厅吃饭,我是Observable,服务员是OnSubscribe,厨房是Subscriber,我通过服务员点餐,服务员将我的订单告诉厨房,厨房经过做各道菜,各道工序处理,每做好一道菜,就通知服务员端出来给我。菜做好端出来的回调是异步的,无需我一直等待。
4. 观察者的回调方法
-
onNext(T item)
Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法可能会被调用多次,取决于你的实现。 -
onError(Exception ex)
当Observable遇到错误或者无法返回期望的数据时会调用这个方法,这个调用会终止Observable,后续不会再调用onNext和onCompleted,onError方法的参数是抛出的异常。 -
onComplete
正常终止,如果没有遇到错误,Observable在最后一次调用onNext之后调用此方法。
5. 创建方式
create方式
内部代码:
public final static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
创建观察着和被观察者:
Observable observable = Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
//给观察者发送事件
subscriber.onStart();
subscriber.onNext("你好");
subscriber.onNext("在吗");
subscriber.onNext("中午吃什么?");
subscriber.onCompleted();
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.i("minfo",s);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.i("minfo",s);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
订阅:
observable.subscribe(subscriber);
observable.subscribe(observer);
Subscriber是实现了Observer的抽象类,比Observer多了一个onStart()和unsubscribe()方法。
onStart(),会在发送事件还没开始前调用,unsubscribe(),可以将Subscriber取消订阅。
还有不完整事件Action0和Action1也可以作为观察者。
from方式
public final static <T> Observable<T> from(Iterable<? extends T> iterable) {
return create(new OnSubscribeFromIterable<T>(iterable));
}
接受数组或集合,返回一个按参数列表顺序发射这些数据的Observable。
String[] array={"How are you?","I am fine.","Thank you.","And you?"};
Observable.from(array)
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i("TAG", "onNext" + s);
}
});
just方式
接受1-9个参数,它们还可以是不同类型,返回一个按参数列表顺序发射这些数据的Observable。
Observable.just(1,2,3,4)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
6. 变换操作
map
将图片本地路径,通过map方法转化,获取Bitmap。
Observable.just("../image/logo.png")
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) {
return BitmapFactory.decodeFile(filePath);
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
imageView.setImageBitmap(bitmap);
}
});
flipMap
flatMap()接收一个Observable的输出作为输入,然后作为一个新的Observable再发射。
String[] array={"How are you?","I am fine.","Thank you.","And you?"};
Observable.from(array)
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable.just(s.toUpperCase());
}
})
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i("TAG", "onNext=" + s);
}
});
scan
一个累加器函数,将依次2个元素作处理,再将结果同下一个元素继续处理。
7. 过滤操作
在Func中对每项元素进行过滤处理,满足条件的元素才会继续发送,比如这里的过滤偶数。
filter
Observable.just(2,5,25,6,9,14,34,51)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer%2 == 0;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.i("TAG", "onNext" + integer);
}
});
take()、takeLast()
只传递前几个元素
8. 合并操作
Merge操作
合并多个Observable,按照加入的Observable顺序将各个元素传递。
Observable<Integer> observable1 = Observable.just(22, 4, 23,13);
Observable<Integer> observable2 = Observable.just(15, 24, 3,66,133);
Observable.merge(observable1,observable2)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.i("TAG", "onNext" + integer);
}
});
zip操作
将各个Observable个对应位置各个元素取出做操作,然后将结果传递。
Observable<Integer> observable1 = Observable.just(22, 4, 23,13);
Observable<Integer> observable2 = Observable.just(15, 24, 3,66);
Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.i("TAG", "onNext" + integer);
}
});
9. 线程调度
RxJava默认遵循的是线程不变的原则。在事件在哪个线程产生,也同样在该线程接收事件。但是,如果在事件产生线程为子线程,处理线程中操作了UI,就会发生崩溃,这个要留意一下。

SubscribeOn、ObserveOn
subscribeOn():事件产生的线程
observeOn():事件消费的线程
使用例子:子线程处理任务回调给主线程
Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber subscriber) {
Log.i("TAG", 执行耗时操作);
try {
Thread.sleep(2000);
subscriber.onNext("耗时操作完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
@Override
public void call(String s) {
Log.i("TAG", "获取到数据");
}
});
timer做定时器任务:
Observable.timer(1000 , TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
}
});
好了,以上就是我对于Rxjava学习的总结。该文章持续更新,更好地理清思路。
网友评论