美文网首页
RxJava(1):使用和原理

RxJava(1):使用和原理

作者: 壹元伍角叁分 | 来源:发表于2021-06-29 11:17 被阅读0次

RxJava的核心思想

有一个起点和终点,起点开始流向我们的“事件”,把事件流向到终点,只不过在流向的过程中,可以增加拦截,
拦截时可以对“事件进行改变”,终点只关心它上一个拦截

利用Retrofit+OkHttp+RxJava进行登录操作

API:WangAndroidApi

注册
https://www.wanandroid.com/user/register

方法:POST
参数:username,password,repassword
登录
https://www.wanandroid.com/user/login

方法:POST
参数:username,password
public interface IReqUrl {
    public static String BASE_URL = "https://www.wanandroid.com/";

    /**
     * 注册
     * https://www.wanandroid.com/user/register
     * 参数:username,password,repassword
     *
     * @return
     */
    @POST("user/register")
    Observable<RegisterResBean> postRegister(@Query("username") String username, @Query("password") String password, @Query("repassword") String repassword);

    /**
     * 登录
     * https://www.wanandroid.com/user/login
     * 参数:username,password
     *
     * @return
     */
    @POST("user/login")
    Observable<LoginResBean> postLogin(@Query("username") String username, @Query("password") String password);

}
public class MyRetrofit {
    public static Retrofit createRetrofit() {
        // OKHttp客户端
        OkHttpClient.Builder builder = new OkHttpClient.Builder();
        
        // 各种参数配置
        builder.readTimeout(10, TimeUnit.SECONDS);
        builder.connectTimeout(10, TimeUnit.SECONDS);
        if (BuildConfig.DEBUG) {
            HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
            interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
            builder.addInterceptor(interceptor);
        }

        return new Retrofit.Builder()
                .baseUrl(BASE_URL)
                .client(builder.build())// 请求用 OKhttp
                .addConverterFactory(GsonConverterFactory.create()) // 添加一个json解析的工具
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())// 添加rxjava处理工具
                .build();
    }
}
  /**
    * 一行代码 实现点击注册并登录的需求
    * 需求:
    * 1.响应点击事件
    * 2.请求服务器注册操作
    * 3.注册完成之后,更新注册UI
    * 4.马上去登录服务器操作
    * 5.登录完成之后,更新登录的UI
    */
private void register() {
    RxView.clicks(mButton)
            .throttleFirst(2, TimeUnit.SECONDS)//设置2s内不可重复点击
            .subscribeOn(AndroidSchedulers.mainThread())//点击事件运行在主线程
            .flatMap(new Function<Object, ObservableSource<RegisterResBean>>() {
                @Override
                public ObservableSource<RegisterResBean> apply(Object o) throws Exception {
                    return MyRetrofit.createRetrofit()
                            .create(IReqUrl.class)
                            .postRegister(mEtUsername.getText(), mEtPassword.getText(), mEtPasswordRe.getText())
                            .subscribeOn(Schedulers.io());//在子线程进行注册操作
                }
            })
            .observeOn(AndroidSchedulers.mainThread())//需要更新ui操作,再次切换到主线程
            .flatMap(new Function<RegisterResBean, ObservableSource<LoginResBean>>() {
                @Override
                public ObservableSource<LoginResBean> apply(RegisterResBean registerResBean) throws Exception {
                    Log.d(TAG, "onNext: registerResBean=" + registerResBean);

                    int errorCode = registerResBean.getErrorCode();
                    if (errorCode == -1) {
                        Log.d(TAG, "onNext: errMsg=" + registerResBean.getErrorMsg());
                        Toast.makeText(MainActivity.this, registerResBean.getErrorMsg(), Toast.LENGTH_SHORT).show();
                        return MyRetrofit.createRetrofit()
                                .create(IReqUrl.class)
                                .postLogin(mEtUsername.getText(), mEtPassword.getText())
                                .subscribeOn(Schedulers.io());//已经注册,直接登录。子线程请求
                    } else if (errorCode == 0) {
                        Log.d(TAG, "onNext: 用户名成功注册");
                        mEtPasswordRe.setVisibility(View.GONE);
                        mButton.setText("登录");
                        RegisterResBean.DataBean data = registerResBean.getData();
                        return MyRetrofit.createRetrofit()
                                .create(IReqUrl.class)
                                .postLogin(mEtUsername.getText(), mEtPassword.getText())
                                .subscribeOn(Schedulers.io());//注册成功,直接登录。子线程请求
                    } else {
                        Log.d(TAG, "onNext: errorCode=" + errorCode);
                    }
                    return null;
                }
            })
            .observeOn(AndroidSchedulers.mainThread())//登录成功,再次切换到主线程,更新ui
            .subscribe(new Observer<LoginResBean>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe: ");
                }

                @Override
                public void onNext(LoginResBean loginResBean) {
                    Log.d(TAG, "onNext: loginResBean=" + loginResBean);
                    int errorCode = loginResBean.getErrorCode();
                    if (errorCode == 0) {
                        Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
                        mButton.setText("登录成功");
                    }
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError: e=" + e);
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete: ");
                }
            });
}

RxJave中的hook

在分析RxJave源码过程中,经常可以看到类似这样的代码:

RxJavaPlugins.onAssembly(new ObservableCreate<T>(source))
RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper))

在new出一个对象时,执行RxJavaPlugins.onAssembly这样的一个代码,那进去看看代码做了什么?

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

很简单的代码,判断onObservableAssembly变量是否为空,如果为空则直接将new出来的对象返回回去。

翻看源码,onObservableAssembly变量只在一个地方定义了,并且是个静态方法。

public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
    if (lockdown) {
        throw new IllegalStateException("Plugins can't be changed anymore");
    }
    RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}

说明只要调用了以下这个方法,那在执行create、map等方法的时候,都会先执行下面apply中的代码

RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
    @Override
    public Observable apply(Observable observable) throws Exception {
        //这里可以写入我们自己的代码,进行一些统计,如方法调用了多少次等
        //return null;//不能返回null,运行起来会空指针异常
        return observable;//直接返回
    }
});

分析RxJava原理

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        Log.d(TAG, "subscribe: 自定义source 执行subscribe");

        //手动调用next
        emitter.onNext("检测到变化了...");
    }
})
        .map(new Function<String, Boolean>() {
            @Override
            public Boolean apply(String s) throws Exception {
                Log.d(TAG, "apply: map  执行apply s=" + s + " 返回boolean值true");
                return true;
            }
        })
        .subscribe(new Observer<Boolean>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: 终点的监听执行 onSubscribe");
            }

            @Override
            public void onNext(Boolean aBoolean) {
                Log.d(TAG, "onNext: 终点的监听执行 onNext aBoolean=" + aBoolean);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: 终点的监听执行 onComplete");
            }
        });

以上的代码执行,打印的日志顺序:

2021-06-30 13:25:59.048 15124-15124: onSubscribe: 终点的监听执行 onSubscribe
2021-06-30 13:25:59.048 15124-15124: subscribe: 自定义source 执行subscribe
2021-06-30 13:25:59.049 15124-15124: apply: map 执行apply s=检测到变化了... 返回boolean值true
2021-06-30 13:25:59.049 15124-15124: onNext: 终点的监听执行 onNext aBoolean=true

源码分析

  1. 订阅流程
    1. Observable.create(ObservableOnSubscribe<T> source),传入一个自定义source

      Observable.create(new ObservableOnSubscribe<String>() {
          @Override
          public void subscribe(ObservableEmitter<String> emitter) throws Exception {
          }
      })
      

      自定义的source作为参数,创建了一个ObservableCreate对象,并ObservableCreate对象返回

      public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
          ObjectHelper.requireNonNull(source, "source is null");
          return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
      }
      

      重点标注:ObservableCreate保存的自定义的source对象

      public final class ObservableCreate<T> extends Observable<T> {
       final ObservableOnSubscribe<T> source;//这个就是自定义的source,后面要用到!!!!
      
          public ObservableCreate(ObservableOnSubscribe<T> source) {
              this.source = source;
          }
      }
      
    2. ObservableCreate.map(Function<? super T, ? extends R> mapper)

      .map(new Function<String, Boolean>() {
          @Override
          public Boolean apply(String s) throws Exception {
              Log.d(TAG, "apply: map  执行apply s=" + s + " 返回boolean值true");
              return true;
          }
      })
      

      将ObservableCreate作为第一个参数,创建了一个ObservableMap对象,并ObservableMap对象返回;

      public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
          ObjectHelper.requireNonNull(mapper, "mapper is null");
          return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
      }
      

      ObservableMap的父类AbstractObservableWithUpstream保存着ObservableCreate对象。

      public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
          final Function<? super T, ? extends U> function;
      
          public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
              super(source);
              this.function = function;
          }
      }
      
      
      abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
      
          /** The source consumable Observable. */
          protected final ObservableSource<T> source;
      }
      
    3. ObservableMap.subscribe(Observer<? super T> observer)

      .subscribe(new Observer<Boolean>() {
          @Override
          public void onSubscribe(Disposable d) {
              Log.d(TAG, "onSubscribe: 终点的监听执行 onSubscribe");
          }
      
          @Override
          public void onNext(Boolean aBoolean) {
              Log.d(TAG, "onNext: 终点的监听执行 onNext aBoolean=" + aBoolean);
          }
      
          @Override
          public void onError(Throwable e) {
      
          }
      
          @Override
          public void onComplete() {
              Log.d(TAG, "onComplete: 终点的监听执行 onComplete");
          }
      });
      

      将终点的监听作为参数传入,subscribe()是ObservableMap父类Observable的方法,ObservableMap没有重写subscribe()。而实际调用的是subscribe(),ObservableMap重写了,所以走的是ObservableMap.subscribeActual(observer)方法

      public final void subscribe(Observer<? super T> observer) {
          ObjectHelper.requireNonNull(observer, "observer is null");
          try {
              observer = RxJavaPlugins.onSubscribe(this, observer);
      
              ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
      
              subscribeActual(observer);//这句是重点
          } catch (NullPointerException e) { // NOPMD
              throw e;
          } catch (Throwable e) {
              Exceptions.throwIfFatal(e);
              RxJavaPlugins.onError(e);
      
              NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
              npe.initCause(e);
              throw npe;
          }
      }
      

      再来看下ObservableMap中subscribeActual的具体实现,实际调用的又是source.subscribe(),这个source就是前面保存的ObservableCreate对象。所以实际调用的是ObservableCreate.subscribe()。

      @Override
      public void subscribeActual(Observer<? super U> t) {
          source.subscribe(new MapObserver<T, U>(t, function));
      }
      

      ObservableCreate没有重写subscribe(),走的父类Observable.subscribe()方法,还记得调用Observable.subscribe(),实际走的是哪个方法吗?对,是ObservableCreate..subscribeActual(observer)。

      @Override
      protected void subscribeActual(Observer<? super T> observer) {
          CreateEmitter<T> parent = new CreateEmitter<T>(observer);//observer是终点的监听,这边封了一层箱
          observer.onSubscribe(parent);//调用了终点的监听的onSubscribe方法
      
          try {
              source.subscribe(parent);//这边调用的是我们自定义source的subscribe方法
          } catch (Throwable ex) {
              Exceptions.throwIfFatal(ex);
              parent.onError(ex);
          }
      }
      
订阅流程代码执行流程图.jpg
  1. 事件监听流程
    new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            
            //这里手动调用next
            emitter.onNext("检测到变化了...");
        }
    }
    

    emitter就是上面定义的CreateEmitter。CreateEmitter中

    final Observer<? super T> observer;//这个就是终点的监听
    
    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {//如果没有被中止,就执行终点监听的onNext(t)
            observer.onNext(t);
        }
    }
    
事件监听代码执行流程图.jpg

代码执行流程:

  1. observable.create传入了自定义source,返回一个obserableCreate对象;
  2. obserableCreate调用.map(),传入funcation自定义对象,返回一个obserableMap对象,obserableMap对象有两个参数,一个是obserableCreate,一个是传入funcation自定义对象;
  3. obserableMap调用.subscribe(observer),传入一个自定义的终点监听observer。在subscribe(observer)方法中,将传入的终点监听封装了一层,将终点observer对象和传入funcation自定义对象作为参数,定义了一个mapObserver对象。调用自己的subscribeActual(observer);
  4. obserableMap的subscribeActual(observer)中调用了obserableCreate的subscribe(observer)。obserableCreate实际调用的同样是subscribeActual(observer);
  5. 而obserableCreate的subscribeActual(observer)中又将mapObserver封装了一层,定义了一个CreateEmitter。然后先是调用了mapObserver.onSubscribe(CreateEmitter),mapObserver父类的onSubscribe(CreateEmitter)中又继续调用了终点observer.onSubscribe(CreateEmitter),然后又调用了自定义source的onSubscribe(CreateEmitter)方法。
  6. 如果我们在自定义source的onSubscribe(CreateEmitter)方法中调用了CreateEmitter.onNext(),CreateEmitter.onNext()方法中又调用了mapObserver..onNext(),mapObserver.onNext()又继续调用终点observer.onNext()。

相关文章

  • RxJava(1):使用和原理

    RxJava的核心思想 有一个起点和终点,起点开始流向我们的“事件”,把事件流向到终点,只不过在流向的过程中,可以...

  • RxJava

    使用RxJava:添加依赖: 走进RxJava:RxJava实质上就是一个异步操作库。API介绍和原理解析:1.扩...

  • RxJava 原理和封装使用<1>

    前言:RxJava 本质压缩一句话,异步操作库 好用并且流行的原因:简洁(针对思维和处理业务) RxJava扩展的...

  • RxJava

    其它文章 RxJava操作符大全 1、RxJava之一——一次性学会使用RxJava RxJava简单的使用和使用...

  • 学习清单

    HTTP原理解析 RxJava使用和原理,并应用到BaseLib库,还有Dragger MVVM 多线程和线程池使...

  • RxJava2初探

    1.RxJava概念及原理 RxJava – Reactive Extensions for the JVM – ...

  • RxJava学习系列(二)--原理

    原文链接 RxJava学习系列(一)--使用RxJava学习系列(二)--原理学习了 RxJava有一段时间了,也...

  • RxJava详解之执行原理(四)

    RxJava详解之执行原理(四) 前面几篇文章介绍了RxJava的基本使用,也说了RxJava的优缺点。下面我们就...

  • Rxjava2 操作符原理(2)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • Rxjava2 线程切换(3)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

网友评论

      本文标题:RxJava(1):使用和原理

      本文链接:https://www.haomeiwen.com/subject/jzbhultx.html