RxJava 在 GitHub 的介绍:
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
RxJava是响应扩展的Java VM实现:通过使用可观察的序列来组合异步和基于事件的程序的库。
总结:RxJava 是一个 基于事件流、实现异步操作的库
RxJava 作用特点
使用简单现异步操作,类似于 Android中的 AsyncTask 、Handler作用。
由于 RxJava的使用方式是:基于事件流的链式调用,所以使得 RxJava:逻辑简洁、实现优雅、使用简单,更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁 & 优雅。
Rxjava 原理
基于 一种扩展的观察者模式,扩展观察者模式中有4个角色,被观察者(Observable)产生事件、观察者(Observer)接收事件,并给出响应动作、订阅(Subscribe)连接 被观察者 & 观察者、事件(Event)被观察者 & 观察者 沟通的载体。
RxJava 原理可总结为:被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 给观察者 (Observer), 观察者(Observer) 按顺序接收事件 & 作出对应的响应动作。
基本使用
分步骤实现:该方法主要为了深入说明Rxjava的原理 & 使用,主要用于演示说明
public void SimpleRxJava(View view){
Observable observable=Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
Observer observer=new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
Log.e("sss","接受到"+o+"事件");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("sss","接受事件完成");
}
};
observable.subscribe(observer);
}
基于事件流的链式调用:主要用于实际使用
public void SimpleChainRxJava(View view){
Observable.just("1","2","3").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("sss", "接受到" + s + "事件");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("sss", "接受事件异常");
}
});
}
额外说明
观察者 Observer的subscribe()具备多个重载的方法
public final Disposable subscribe() {}
// 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
public final Disposable subscribe(Consumer<? super T> onNext) {}
// 表示观察者只对被观察者发送的Next事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
// 表示观察者只对被观察者发送的Next事件 & Error事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出响应
public final void subscribe(Observer<? super T> observer) {}
// 表示观察者对被观察者发送的任何事件都作出响应
可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接
即观察者 无法继续 接收 被观察者的事件,但被观察者还是可以继续发送事件。
网友评论