Video source: Exploring RxJava 2 for Android ·Jake Wharton
RxJava provide:
- A set of classes for representing sources of data.
- A set of classes for listening to data sources.
- A set of methods for modifying and composing the data.
Sources
-
Observable<T>
- Emits 0 to n items
- Terminates with complete or error
- Does not have backpressure
-
Flowable<T>
- Emits 0 to n items
- Terminates with complete or error
- Has backpressure
-
Flowable vs. Observable
- Backpressure allows you to control how fast (slow them down) a source emits items.
- RxJava 1.x added backpressure late in the design process.
- All types exposed backpressure but not all sources respected it.
- Backpressure likes inheritance, must be designed for.
Observable<MotionEvent> events = RxView.touches(paintView);
Flowable<Row> rows = db.createQuery("SELECT * ...").
observablevsflowable.png
-
Single
- Either succeeds with an item or errors.
- No backpressure support
- Think "reactive scalar"
-
Completable
- Either completes or errors. Has no items!
- No backpressure support
- Think "reactive runnable"
-
Maybe
- Either succeeds with an item, completes with no items, or errors.
- No backpressure support
- Think "reactive optional"
Consumers
interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
interface Subscriber<T> {
void onNext(T t);
void onComplete();
void onError();
void onSubscriber(Subscription s);
}
interface Subscription {
void request(long n);
void cancel();
}
interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Creating Sources
Observable.fromCallable(new Callable<String>() {
@Override public String call() throws Exception {
return getName();
}
});
Flowable.fromCallable(() -> "Hello");
Maybe.fromCallable(() -> "Hello");
Maybe.fromAction(() -> System.out.println("Hello"));
Maybe.fromRunnable(() -> System.out.println("Hello"));
Single.fromCallable(() -> "Hello");
Completable.fromCallable(() -> "Hello");
Completable.fromAction(() -> System.out.println("Hello"));
Completable.fromRunnable(() -> System.out.println("Hello"));
- RxJava 2 has a fixed Observable.create() method:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Hello");
e.onComplete();
}
});
Observing Sources
Observable<String> o = Observable.just("Hello");
o.subscribe(new Observer<String>() {
@Override public void onNext(String s) {...}
@Override public void onComplete() {...}
@Override public void onError(Throwable t) {...}
@Override public void onSubscribe(Disposable d) {
...
}
});
// DisposableObserver help to handle unsubscribe logic
DisposableObserver observer = new DisposableObserver<String>() {
@Override public void onNext(String s) {...}
@Override public void onComplete() {...}
@Override public void onError(Throwable t) {...}
}
o.subscribe(observer);
observer.dispose();
// RxJava2 provide new subscribeWith() method, return a Disposable object, like RxJava1 Subscription object.
Disposable d = o.subscribeWith(new DisposableObserver<String>() {
@Override public void onNext(String s) {...}
@Override public void onComplete() {...}
@Override public void onError(Throwable t) {...}
});
d.dispose();
// RxJava2 also provide CompositeDisposable to handle composite disposable.
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(o.subscribeWith(new DisposableObserver<String>() {
@Override public void onNext(String s) {...}
@Override public void onComplete() {...}
@Override public void onError(Throwable t) {...}
}));
disposables.dispose();
Operators
- Manipulate or combine data in some way.
- Manipulate threading in some way.
- Manipulate emissions in some way.
first()
In RxJava1, Observable object use first()
return an Observable object.
In RxJava2, those return a Single object, if the Observable is empty, it will throw NoSuchElementException, because a Single either has an item, or errors.
rxjava2-first.PNGfirstElement()
In RxJava2, When the Observable is empty, Maybe can actually model that by completing without an error.
rxjava2-firstelement.PNGignoreElements()
If you are just ignoring the elements, all you care about is whether it completes or fails,that now returns Completable.
网友评论