续上一篇文章
一、RxJava的变换。
引用别人的一句话,变换就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。那么何为加工,下面开始以API举例
1、map()
Observable.just("abc")
.map(new Func1() {
@Override
public Object call(Object o) {
return o.toString()+"def";
}
}).subscribe(new Subscriber() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
System.out.println(o.toString());
}
});
由上方代码可以看出,在map方法里将传入的字符串进行了变化,这里出现了一个新的类叫Func1。它和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。 Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。
另外可以看出经过map变化后,数据直接传入了onNext中进行使用,另外还可以将数据类型进行转变。这就由读者自行修改了。
2、flatMap()
这是一个很好用的方法,类似于map()方法对要发射的数据进行转换,但是与map()不同的是flatMap()方法并不是直接作用于SubScriber中,而是返回一个Observable的对象,然后由这个对象来执行:
Observable.just("abc","bcd","cde")
.flatMap(new Func1() {
@Override
public Object call(Object o) {
return Observable.just(o.toString()+"---");
}
}).subscribe(new Subscriber() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
System.out.println(o.toString());
}
});
map和flatmap的区别有点像just和from,一个是对数据直接传入,一个是对数组的数据进行分割处理。
/**
* Returns an Observable that emits items based on applying a function that you supply to each item emitted
* by the source Observable, where that function returns an Observable, and then merging those resulting
* Observables and emitting the results of this merger.
*/
这是源码中官方的解释。对Observable发射的数据都应用一个函数,这个函数返回一个Observable,然后合并这些Observables,并且发送合并的结果。 flatMap和map操作符很相像,flatMap发送的是合并后的Observables,map操作符发送的是应用函数后返回的结果集。
3、lift()
在RxJava中,有各种的变换方式,但是原理都是基于lift方法的原理来做的,什么是lift方法,先看一下源码:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
//1
return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
看一下源码可以得出几个结论
1、lift变换是new了一个新的被观察者来继续之前的操作
2、Operator对象可以视为一个带返回值的Subscriber对象
3、在new新的Observable对象时,创建了一个新的OnSubscribeLift对象作为OnSubscribe传入
4、新的OnSubscribeLift使用的OnSubscribe为原始的OnSubscribe,Operator为新的Subscriber
5、Operator的构造方法中已经传入了原始的Subscriber,可以直接使用
6、在subscribe方法调用时,其实是进入到了新的OnSubscribeLift的call方法中,然后由原始的onSubscribe对象调用新的Subscriber对象。举个例子:
Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
System.out.println("====OnSubscribe===call=");
subscriber.onNext("123");
subscriber.onCompleted();
}
})
.lift(new Observable.Operator<Object, Object>() {
@Override
public Subscriber<? super Object> call(Subscriber<? super Object> subscriber) {
System.out.println("====lift===call=");
return new Subscriber<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
System.out.println("====lift===onNext=");
subscriber.onNext(Integer.decode(o.toString()));
}
@Override
public void onStart() {
super.onStart();
System.out.println("====lift===onStart=");
}
};
}
}).subscribe(new Subscriber() {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(Object o) {
System.out.println("====Subscriber===onNext=");
System.out.println(o.toString());
}
@Override
public void onStart() {
super.onStart();
System.out.println("====Subscriber===onStart=");
}
});
打印结果为
====Subscriber===onStart=
====lift===call=
====lift===onStart=
====OnSubscribe===call=
====lift===onNext=
====Subscriber===onNext=
123
先由subscribe方法中调用原始的Subscriber的start方法,然后调用到lift的call方法,在call方法中调用了新的Subscriber的start方法,然后原始的OnSubscribe(parent)调用了call方法,传入了新的Subscriber,然后顺序调用onNext方法。
我们可以看一下map()方法的实现:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(new OnSubscribeMap<T, R>(this, func));
}
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
final Observable<T> source;
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
}
逻辑基本是一样的,就不多做赘述。
Subject
RxJava中常见的Subject有4种,分别是AsyncSubject、 BehaviorSubject、 PublishSubject、 ReplaySubject。
Subject既可以做观察者也可以做被观察者。
1、AsyncSubject :AsyncSubject无论输入多少参数,永远只输出最后一个参数。
2、BehaviorSubject:会发送离订阅最近的上一个值,没有上一个值的时候会发送默认值。
3、PublishSubject:发送订阅起到Completed之间所有的值。
4、ReplaySubject:无论何时订阅,都会将所有历史订阅内容全部发出。
使用方法:
// PublishSubject sb = PublishSubject.create();
BehaviorSubject sb = BehaviorSubject.create();//配置默认值为BehaviorSubject.create("defaultData");
// AsyncSubject sb = AsyncSubject .create();
// ReplaySubject sb = ReplaySubject.create();
sb.subscribe(
new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println(o.toString());
}
});
sb.onNext(1);
sb.onNext(2);
sb.onNext(3);
sb.onCompleted();
网友评论