1. 定义
RxJava
是一个 基于事件流、实现异步操作的库
2. 作用
实现异步操作
类似于 Android
中的 AsyncTask
、Handler
作用
3. 特点
- 逻辑简洁
- 实现优雅
- 使用简单
更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁 & 优雅
4. 实现步骤
- 创建事件 Observable.create
- 创建观察者 Observer
- 订阅observable.subsribe
public void text1() {
//3创建事件
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
LogUtil.f().i(MainActivity.TAG, "subscribe");
e.onNext("132");
e.onNext("456");
e.onComplete();
//注意onComplete() 与 onError() 同时只能调用一个
// e.onError(new Throwable());
}
});
//2创建观察者
Observer<String> observer = new Observer<String>() {
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable=d;
LogUtil.f().i(MainActivity.TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
LogUtil.f().i(MainActivity.TAG, "onNext:" + s);
//可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接
// disposable.dispose();
}
@Override
public void onError(Throwable e) {
LogUtil.f().i(MainActivity.TAG, "onError");
}
@Override
public void onComplete() {
LogUtil.f().i(MainActivity.TAG, "onComplete");
}
};
//3订阅
observable.subscribe(observer);
}
输出的结
image.png
切断观察者
//可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接
@Override
public void onNext(String s) {
LogUtil.f().i(MainActivity.TAG, "onNext:" + s);
//可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接
disposable.dispose();
}
输出的结
image.png
切断后,后的方法就不会再执行
额外说明
观察者 Observer的subscribe()具备多个重载的方法
public final Disposable subscribe() {}
// 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
public final Disposable subscribe(Consumer<? super T> onNext) {}
// 表示观察者只对被观察者发送的Next事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
// 表示观察者只对被观察者发送的Next事件 & Error事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出响应
public final void subscribe(Observer<? super T> observer) {}
// 表示观察者对被观察者发送的任何事件都作出响应
切换线程
//切换线程
observable = observable
.subscribeOn(Schedulers.io())//运行子线程
.observeOn(AndroidSchedulers.mainThread());//回调在android 主线程
运行结果:
以上可以看出,运行是在子线程,回调是在android 主线程.
优雅的实现
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.i(MainActivity.TAG, "subscribe" + "线程:" + Thread.currentThread().getName());
e.onNext("132");
e.onNext("456");
e.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(MainActivity.TAG, "onSubscribe" + "线程:" + Thread.currentThread().getName());
}
@Override
public void onNext(String s) {
Log.i(MainActivity.TAG, "onNext" + "线程:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i(MainActivity.TAG, "onError" + "线程:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.i(MainActivity.TAG, "onComplete" + "线程:" + Thread.currentThread().getName());
}
});
}
运行结果:
image.png操作符
Map
map是RxJava中最简单的一个变换操作符了, 它的作用就是对上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化. 用事件图表示如下:
//Integer
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
//Strig
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return String.format("The %s time", integer);
}
//String 叠加
}).map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return String.format("%s , He is second time", s);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
在上游我们发送的是数字类型, 而在下游我们接收的是String类型, 中间起转换作用的就是map操作符, 运行结果为:
The 1 time , He is second time
The 2 time , He is second time
FlatMap
flatMap是一个非常强大的操作符, 先用一个比较难懂的概念说明一下:
FlatMap将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里.
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList();
list.add("He is "+integer);
list.add("He is "+integer);
list.add("He is "+integer);
return Observable.fromIterable(list).delay(10,TimeUnit.MICROSECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
System.out.println(str);
}
});
运行结果:
He is 1
He is 1
He is 1
He is 2
He is 2
He is 2
举个例子:注册成功后调登录
api.register(new RegisterRequest()) //发起注册请求
.subscribeOn(Schedulers.io()) //在IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求注册结果
.doOnNext(new Consumer<RegisterResponse>() {
@Override
public void accept(RegisterResponse registerResponse) throws Exception {
//先根据注册的响应结果去做一些操作
}
})
.observeOn(Schedulers.io()) //回到IO线程去发起登录请求
.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
@Override
public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
return api.login(new LoginRequest());
}
})
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求登录的结果
.subscribe(new Consumer<LoginResponse>() {
@Override
public void accept(LoginResponse loginResponse) throws Exception {
Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(MainActivity.this, "登录失败", Toast.LENGTH_SHORT).show();
}
});
public void login() {
renoteRegister().doOnNext(new Consumer<String>() {//开始注册
@Override
public void accept(String s) throws Exception {//注册成功回调
System.out.println(s);
System.out.println(Thread.currentThread().getName());
}
}).flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
System.out.println(s + "--开始登录");
System.out.println(Thread.currentThread().getName());
return renoteLogin();//调用登录
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {//登录回调
System.out.println(s);
System.out.println(Thread.currentThread().getName());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println(throwable.getMessage());
System.out.println(Thread.currentThread().getName());
}
});
}
//远程登录
public Observable<String> renoteLogin() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("登录成功");
emitter.onComplete();
}
});
}
//远程注册
public Observable<String> renoteRegister() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("注册成功");
emitter.onComplete();
}
});
}
网友评论