RxJava的核心思想
有一个起点和终点,起点开始流向我们的“事件”,把事件流向到终点,只不过在流向的过程中,可以增加拦截,
拦截时可以对“事件进行改变”,终点只关心它上一个拦截
利用Retrofit+OkHttp+RxJava进行登录操作
API:WangAndroidApi
注册
https://www.wanandroid.com/user/register
方法:POST
参数:username,password,repassword
登录
https://www.wanandroid.com/user/login
方法:POST
参数:username,password
public interface IReqUrl {
public static String BASE_URL = "https://www.wanandroid.com/";
/**
* 注册
* https://www.wanandroid.com/user/register
* 参数:username,password,repassword
*
* @return
*/
@POST("user/register")
Observable<RegisterResBean> postRegister(@Query("username") String username, @Query("password") String password, @Query("repassword") String repassword);
/**
* 登录
* https://www.wanandroid.com/user/login
* 参数:username,password
*
* @return
*/
@POST("user/login")
Observable<LoginResBean> postLogin(@Query("username") String username, @Query("password") String password);
}
public class MyRetrofit {
public static Retrofit createRetrofit() {
// OKHttp客户端
OkHttpClient.Builder builder = new OkHttpClient.Builder();
// 各种参数配置
builder.readTimeout(10, TimeUnit.SECONDS);
builder.connectTimeout(10, TimeUnit.SECONDS);
if (BuildConfig.DEBUG) {
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
builder.addInterceptor(interceptor);
}
return new Retrofit.Builder()
.baseUrl(BASE_URL)
.client(builder.build())// 请求用 OKhttp
.addConverterFactory(GsonConverterFactory.create()) // 添加一个json解析的工具
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())// 添加rxjava处理工具
.build();
}
}
/**
* 一行代码 实现点击注册并登录的需求
* 需求:
* 1.响应点击事件
* 2.请求服务器注册操作
* 3.注册完成之后,更新注册UI
* 4.马上去登录服务器操作
* 5.登录完成之后,更新登录的UI
*/
private void register() {
RxView.clicks(mButton)
.throttleFirst(2, TimeUnit.SECONDS)//设置2s内不可重复点击
.subscribeOn(AndroidSchedulers.mainThread())//点击事件运行在主线程
.flatMap(new Function<Object, ObservableSource<RegisterResBean>>() {
@Override
public ObservableSource<RegisterResBean> apply(Object o) throws Exception {
return MyRetrofit.createRetrofit()
.create(IReqUrl.class)
.postRegister(mEtUsername.getText(), mEtPassword.getText(), mEtPasswordRe.getText())
.subscribeOn(Schedulers.io());//在子线程进行注册操作
}
})
.observeOn(AndroidSchedulers.mainThread())//需要更新ui操作,再次切换到主线程
.flatMap(new Function<RegisterResBean, ObservableSource<LoginResBean>>() {
@Override
public ObservableSource<LoginResBean> apply(RegisterResBean registerResBean) throws Exception {
Log.d(TAG, "onNext: registerResBean=" + registerResBean);
int errorCode = registerResBean.getErrorCode();
if (errorCode == -1) {
Log.d(TAG, "onNext: errMsg=" + registerResBean.getErrorMsg());
Toast.makeText(MainActivity.this, registerResBean.getErrorMsg(), Toast.LENGTH_SHORT).show();
return MyRetrofit.createRetrofit()
.create(IReqUrl.class)
.postLogin(mEtUsername.getText(), mEtPassword.getText())
.subscribeOn(Schedulers.io());//已经注册,直接登录。子线程请求
} else if (errorCode == 0) {
Log.d(TAG, "onNext: 用户名成功注册");
mEtPasswordRe.setVisibility(View.GONE);
mButton.setText("登录");
RegisterResBean.DataBean data = registerResBean.getData();
return MyRetrofit.createRetrofit()
.create(IReqUrl.class)
.postLogin(mEtUsername.getText(), mEtPassword.getText())
.subscribeOn(Schedulers.io());//注册成功,直接登录。子线程请求
} else {
Log.d(TAG, "onNext: errorCode=" + errorCode);
}
return null;
}
})
.observeOn(AndroidSchedulers.mainThread())//登录成功,再次切换到主线程,更新ui
.subscribe(new Observer<LoginResBean>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(LoginResBean loginResBean) {
Log.d(TAG, "onNext: loginResBean=" + loginResBean);
int errorCode = loginResBean.getErrorCode();
if (errorCode == 0) {
Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
mButton.setText("登录成功");
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: e=" + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
RxJave中的hook
在分析RxJave源码过程中,经常可以看到类似这样的代码:
RxJavaPlugins.onAssembly(new ObservableCreate<T>(source))
RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper))
在new出一个对象时,执行RxJavaPlugins.onAssembly这样的一个代码,那进去看看代码做了什么?
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
很简单的代码,判断onObservableAssembly变量是否为空,如果为空则直接将new出来的对象返回回去。
翻看源码,onObservableAssembly变量只在一个地方定义了,并且是个静态方法。
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
说明只要调用了以下这个方法,那在执行create、map等方法的时候,都会先执行下面apply中的代码
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@Override
public Observable apply(Observable observable) throws Exception {
//这里可以写入我们自己的代码,进行一些统计,如方法调用了多少次等
//return null;//不能返回null,运行起来会空指针异常
return observable;//直接返回
}
});
分析RxJava原理
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "subscribe: 自定义source 执行subscribe");
//手动调用next
emitter.onNext("检测到变化了...");
}
})
.map(new Function<String, Boolean>() {
@Override
public Boolean apply(String s) throws Exception {
Log.d(TAG, "apply: map 执行apply s=" + s + " 返回boolean值true");
return true;
}
})
.subscribe(new Observer<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: 终点的监听执行 onSubscribe");
}
@Override
public void onNext(Boolean aBoolean) {
Log.d(TAG, "onNext: 终点的监听执行 onNext aBoolean=" + aBoolean);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: 终点的监听执行 onComplete");
}
});
以上的代码执行,打印的日志顺序:
2021-06-30 13:25:59.048 15124-15124: onSubscribe: 终点的监听执行 onSubscribe
2021-06-30 13:25:59.048 15124-15124: subscribe: 自定义source 执行subscribe
2021-06-30 13:25:59.049 15124-15124: apply: map 执行apply s=检测到变化了... 返回boolean值true
2021-06-30 13:25:59.049 15124-15124: onNext: 终点的监听执行 onNext aBoolean=true
源码分析
-
订阅流程
-
Observable.create(ObservableOnSubscribe<T> source),传入一个自定义source
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { } })
自定义的source作为参数,创建了一个ObservableCreate对象,并ObservableCreate对象返回
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
重点标注:ObservableCreate保存的自定义的source对象
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source;//这个就是自定义的source,后面要用到!!!! public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } }
-
ObservableCreate.map(Function<? super T, ? extends R> mapper)
.map(new Function<String, Boolean>() { @Override public Boolean apply(String s) throws Exception { Log.d(TAG, "apply: map 执行apply s=" + s + " 返回boolean值true"); return true; } })
将ObservableCreate作为第一个参数,创建了一个ObservableMap对象,并ObservableMap对象返回;
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
ObservableMap的父类AbstractObservableWithUpstream保存着ObservableCreate对象。
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { super(source); this.function = function; } } abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> { /** The source consumable Observable. */ protected final ObservableSource<T> source; }
-
ObservableMap.subscribe(Observer<? super T> observer)
.subscribe(new Observer<Boolean>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: 终点的监听执行 onSubscribe"); } @Override public void onNext(Boolean aBoolean) { Log.d(TAG, "onNext: 终点的监听执行 onNext aBoolean=" + aBoolean); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.d(TAG, "onComplete: 终点的监听执行 onComplete"); } });
将终点的监听作为参数传入,subscribe()是ObservableMap父类Observable的方法,ObservableMap没有重写subscribe()。而实际调用的是subscribe(),ObservableMap重写了,所以走的是ObservableMap.subscribeActual(observer)方法
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer);//这句是重点 } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
再来看下ObservableMap中subscribeActual的具体实现,实际调用的又是source.subscribe(),这个source就是前面保存的ObservableCreate对象。所以实际调用的是ObservableCreate.subscribe()。
@Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); }
ObservableCreate没有重写subscribe(),走的父类Observable.subscribe()方法,还记得调用Observable.subscribe(),实际走的是哪个方法吗?对,是ObservableCreate..subscribeActual(observer)。
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer);//observer是终点的监听,这边封了一层箱 observer.onSubscribe(parent);//调用了终点的监听的onSubscribe方法 try { source.subscribe(parent);//这边调用的是我们自定义source的subscribe方法 } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
-
-
事件监听流程
new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { //这里手动调用next emitter.onNext("检测到变化了..."); } }
emitter就是上面定义的CreateEmitter。CreateEmitter中
final Observer<? super T> observer;//这个就是终点的监听 @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) {//如果没有被中止,就执行终点监听的onNext(t) observer.onNext(t); } }
代码执行流程:
- observable.create传入了自定义source,返回一个obserableCreate对象;
- obserableCreate调用.map(),传入funcation自定义对象,返回一个obserableMap对象,obserableMap对象有两个参数,一个是obserableCreate,一个是传入funcation自定义对象;
- obserableMap调用.subscribe(observer),传入一个自定义的终点监听observer。在subscribe(observer)方法中,将传入的终点监听封装了一层,将终点observer对象和传入funcation自定义对象作为参数,定义了一个mapObserver对象。调用自己的subscribeActual(observer);
- obserableMap的subscribeActual(observer)中调用了obserableCreate的subscribe(observer)。obserableCreate实际调用的同样是subscribeActual(observer);
- 而obserableCreate的subscribeActual(observer)中又将mapObserver封装了一层,定义了一个CreateEmitter。然后先是调用了mapObserver.onSubscribe(CreateEmitter),mapObserver父类的onSubscribe(CreateEmitter)中又继续调用了终点observer.onSubscribe(CreateEmitter),然后又调用了自定义source的onSubscribe(CreateEmitter)方法。
- 如果我们在自定义source的onSubscribe(CreateEmitter)方法中调用了CreateEmitter.onNext(),CreateEmitter.onNext()方法中又调用了mapObserver..onNext(),mapObserver.onNext()又继续调用终点observer.onNext()。
网友评论