声明
本文主要针对RxJava的使用,不会对概念做过多的讲解。
1.作用
RxJava的目的是异步。
2.前提
学习RxJava之前最好对观察者设计模式的概念有所了解,下面是本人整理的博客
Android之观察者设计模式
观察者设计模式的使用步骤是:
- 创建被观察者
- 创建观察者
- 观察者和被观察者之间的绑定
这里需要再次提醒,想要深入了解Rxjava必须知道观察者设计模式,必须彻底理解观察者和被观察者的概念。
3.RxJava的引用
implementation 'io.reactivex.rxjava2:rxjava:2.1.3'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
4.基本代码
-
创建被观察者
//创建一个被观察者 Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("A"); emitter.onNext("B"); emitter.onNext("C"); emitter.onComplete(); } });
分析以上代码
(1)Observable是rxjava封装的被观察者抽象类;
(2)create方法是创建一个被观察者实例;
(3)注意以下泛型,泛型的类型决定事件传递数据的类型;
(4)emitter是事件发射器,主要负责发送三种事件onNext、onError、onComplete;
-
创建观察者
//创建观察者 Observer<String> observer = new Observer<String>(){ @Override public void onSubscribe(Disposable d) { System.out.println("开始采用subscribe连接!"); } @Override public void onNext(String value) { System.out.println("对Next事件作出响应:"+value); } @Override public void onError(Throwable e) { System.out.println("对Error事件作出响应!"); } @Override public void onComplete() { System.out.println("事件执行完毕!"); } };
分析以上代码:
(1)onSubscribe:当被观察者和观察者关联时立刻执行,其中它的形参Disposable中的dispose()可以立刻解除观察者和被观察者的订阅(关联)关系;
(2)onNext:当一个任务执行完毕时再执行另一个任务;
(3)onError:当一个任务报错是立刻中断当前任务和后续任务的执行;
(4)onComplete:当所有任务执行完毕的时候执行;
-
让观察者和被观察者产生关联
//订阅 observable.subscribe(observer);
当产生关联(订阅)时,将立刻执行观察者的onSubscribe回调方法,然后再执行被观察者的subscribe回调。
5.几种创建被观察者的方式
方式1:
//创建一个被观察者
observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
emitter.onComplete();
}
});
方式2:
//创建一个被观察者
Observable observable = Observable.fromArray("A", "B");
相当于方式1的
emitter.onNext("A");
emitter.onNext("B");
方式3:
//创建一个被观察者
Observable observable = Observable.just("A", "B");
相当于方式1的
emitter.onNext("A");
emitter.onNext("B");
方式4:
List<String> list = new ArrayList<>();
list.add("A");
list.add("B");
//创建一个被观察者
observable = Observable.fromIterable(list);
方式5:(线程有回调时:Callable )
Callable callable = new CallbaleIml();
new Thread(new FutureTask<String>(callable)).start();
//创建一个被观察者
observable = Observable.fromCallable(callable);
public class CallbaleIml implements Callable {
@Override
public Object call() throws Exception {
return "call A";
}
}
方式6:(线程有回调时:Future )
public class FutureIml implements Future {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return false;
}
@Override
public Object get() throws InterruptedException, ExecutionException {
return "AA";
}
@Override
public Object get(long timeout, @NonNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return "BB";
}
}
当没有延迟时,返回“AA”, 当有延迟时返回“BB”
Future future = new FutureIml();
//创建一个被观察者(默认)
observable = Observable.fromFuture(future);
//或者
//创建一个被观察者(有延迟)
observable = Observable.fromFuture(future, 1000, TimeUnit.MILLISECONDS);
方式7:
public class PublisherIml implements Publisher {
@Override
public void subscribe(Subscriber s) {
s.onNext("A");
s.onNext("B");
s.onNext("C");
s.onComplete();
}
}
Publisher publisher = new PublisherIml();
方式8:
//创建一个被观察者
observable = Observable.fromPublisher(publisher);
方式9:
public class CallbaleIml implements Callable {
@Override
public ObservableSource call() throws Exception {
return Observable.just("A");
}
}
Callable callable = new CallbaleIml();
observable = Observable.defer(callable);
创建被观察者还有其他方式,在后续的文章中会给出。
网友评论