美文网首页
RxJava(1):使用和原理

RxJava(1):使用和原理

作者: 壹元伍角叁分 | 来源:发表于2021-06-29 11:17 被阅读0次

    RxJava的核心思想

    有一个起点和终点,起点开始流向我们的“事件”,把事件流向到终点,只不过在流向的过程中,可以增加拦截,
    拦截时可以对“事件进行改变”,终点只关心它上一个拦截

    利用Retrofit+OkHttp+RxJava进行登录操作

    API:WangAndroidApi

    注册
    https://www.wanandroid.com/user/register
    
    方法:POST
    参数:username,password,repassword
    
    登录
    https://www.wanandroid.com/user/login
    
    方法:POST
    参数:username,password
    
    public interface IReqUrl {
        public static String BASE_URL = "https://www.wanandroid.com/";
    
        /**
         * 注册
         * https://www.wanandroid.com/user/register
         * 参数:username,password,repassword
         *
         * @return
         */
        @POST("user/register")
        Observable<RegisterResBean> postRegister(@Query("username") String username, @Query("password") String password, @Query("repassword") String repassword);
    
        /**
         * 登录
         * https://www.wanandroid.com/user/login
         * 参数:username,password
         *
         * @return
         */
        @POST("user/login")
        Observable<LoginResBean> postLogin(@Query("username") String username, @Query("password") String password);
    
    }
    
    public class MyRetrofit {
        public static Retrofit createRetrofit() {
            // OKHttp客户端
            OkHttpClient.Builder builder = new OkHttpClient.Builder();
            
            // 各种参数配置
            builder.readTimeout(10, TimeUnit.SECONDS);
            builder.connectTimeout(10, TimeUnit.SECONDS);
            if (BuildConfig.DEBUG) {
                HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
                interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
                builder.addInterceptor(interceptor);
            }
    
            return new Retrofit.Builder()
                    .baseUrl(BASE_URL)
                    .client(builder.build())// 请求用 OKhttp
                    .addConverterFactory(GsonConverterFactory.create()) // 添加一个json解析的工具
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())// 添加rxjava处理工具
                    .build();
        }
    }
    
      /**
        * 一行代码 实现点击注册并登录的需求
        * 需求:
        * 1.响应点击事件
        * 2.请求服务器注册操作
        * 3.注册完成之后,更新注册UI
        * 4.马上去登录服务器操作
        * 5.登录完成之后,更新登录的UI
        */
    private void register() {
        RxView.clicks(mButton)
                .throttleFirst(2, TimeUnit.SECONDS)//设置2s内不可重复点击
                .subscribeOn(AndroidSchedulers.mainThread())//点击事件运行在主线程
                .flatMap(new Function<Object, ObservableSource<RegisterResBean>>() {
                    @Override
                    public ObservableSource<RegisterResBean> apply(Object o) throws Exception {
                        return MyRetrofit.createRetrofit()
                                .create(IReqUrl.class)
                                .postRegister(mEtUsername.getText(), mEtPassword.getText(), mEtPasswordRe.getText())
                                .subscribeOn(Schedulers.io());//在子线程进行注册操作
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())//需要更新ui操作,再次切换到主线程
                .flatMap(new Function<RegisterResBean, ObservableSource<LoginResBean>>() {
                    @Override
                    public ObservableSource<LoginResBean> apply(RegisterResBean registerResBean) throws Exception {
                        Log.d(TAG, "onNext: registerResBean=" + registerResBean);
    
                        int errorCode = registerResBean.getErrorCode();
                        if (errorCode == -1) {
                            Log.d(TAG, "onNext: errMsg=" + registerResBean.getErrorMsg());
                            Toast.makeText(MainActivity.this, registerResBean.getErrorMsg(), Toast.LENGTH_SHORT).show();
                            return MyRetrofit.createRetrofit()
                                    .create(IReqUrl.class)
                                    .postLogin(mEtUsername.getText(), mEtPassword.getText())
                                    .subscribeOn(Schedulers.io());//已经注册,直接登录。子线程请求
                        } else if (errorCode == 0) {
                            Log.d(TAG, "onNext: 用户名成功注册");
                            mEtPasswordRe.setVisibility(View.GONE);
                            mButton.setText("登录");
                            RegisterResBean.DataBean data = registerResBean.getData();
                            return MyRetrofit.createRetrofit()
                                    .create(IReqUrl.class)
                                    .postLogin(mEtUsername.getText(), mEtPassword.getText())
                                    .subscribeOn(Schedulers.io());//注册成功,直接登录。子线程请求
                        } else {
                            Log.d(TAG, "onNext: errorCode=" + errorCode);
                        }
                        return null;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())//登录成功,再次切换到主线程,更新ui
                .subscribe(new Observer<LoginResBean>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: ");
                    }
    
                    @Override
                    public void onNext(LoginResBean loginResBean) {
                        Log.d(TAG, "onNext: loginResBean=" + loginResBean);
                        int errorCode = loginResBean.getErrorCode();
                        if (errorCode == 0) {
                            Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
                            mButton.setText("登录成功");
                        }
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: e=" + e);
                    }
    
                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });
    }
    

    RxJave中的hook

    在分析RxJave源码过程中,经常可以看到类似这样的代码:

    RxJavaPlugins.onAssembly(new ObservableCreate<T>(source))
    
    RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper))
    

    在new出一个对象时,执行RxJavaPlugins.onAssembly这样的一个代码,那进去看看代码做了什么?

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
    

    很简单的代码,判断onObservableAssembly变量是否为空,如果为空则直接将new出来的对象返回回去。

    翻看源码,onObservableAssembly变量只在一个地方定义了,并且是个静态方法。

    public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
        if (lockdown) {
            throw new IllegalStateException("Plugins can't be changed anymore");
        }
        RxJavaPlugins.onObservableAssembly = onObservableAssembly;
    }
    

    说明只要调用了以下这个方法,那在执行create、map等方法的时候,都会先执行下面apply中的代码

    RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
        @Override
        public Observable apply(Observable observable) throws Exception {
            //这里可以写入我们自己的代码,进行一些统计,如方法调用了多少次等
            //return null;//不能返回null,运行起来会空指针异常
            return observable;//直接返回
        }
    });
    

    分析RxJava原理

    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d(TAG, "subscribe: 自定义source 执行subscribe");
    
            //手动调用next
            emitter.onNext("检测到变化了...");
        }
    })
            .map(new Function<String, Boolean>() {
                @Override
                public Boolean apply(String s) throws Exception {
                    Log.d(TAG, "apply: map  执行apply s=" + s + " 返回boolean值true");
                    return true;
                }
            })
            .subscribe(new Observer<Boolean>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe: 终点的监听执行 onSubscribe");
                }
    
                @Override
                public void onNext(Boolean aBoolean) {
                    Log.d(TAG, "onNext: 终点的监听执行 onNext aBoolean=" + aBoolean);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete: 终点的监听执行 onComplete");
                }
            });
    

    以上的代码执行,打印的日志顺序:

    2021-06-30 13:25:59.048 15124-15124: onSubscribe: 终点的监听执行 onSubscribe
    2021-06-30 13:25:59.048 15124-15124: subscribe: 自定义source 执行subscribe
    2021-06-30 13:25:59.049 15124-15124: apply: map 执行apply s=检测到变化了... 返回boolean值true
    2021-06-30 13:25:59.049 15124-15124: onNext: 终点的监听执行 onNext aBoolean=true

    源码分析

    1. 订阅流程
      1. Observable.create(ObservableOnSubscribe<T> source),传入一个自定义source

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            }
        })
        

        自定义的source作为参数,创建了一个ObservableCreate对象,并ObservableCreate对象返回

        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
        

        重点标注:ObservableCreate保存的自定义的source对象

        public final class ObservableCreate<T> extends Observable<T> {
         final ObservableOnSubscribe<T> source;//这个就是自定义的source,后面要用到!!!!
        
            public ObservableCreate(ObservableOnSubscribe<T> source) {
                this.source = source;
            }
        }
        
      2. ObservableCreate.map(Function<? super T, ? extends R> mapper)

        .map(new Function<String, Boolean>() {
            @Override
            public Boolean apply(String s) throws Exception {
                Log.d(TAG, "apply: map  执行apply s=" + s + " 返回boolean值true");
                return true;
            }
        })
        

        将ObservableCreate作为第一个参数,创建了一个ObservableMap对象,并ObservableMap对象返回;

        public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
            ObjectHelper.requireNonNull(mapper, "mapper is null");
            return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
        }
        

        ObservableMap的父类AbstractObservableWithUpstream保存着ObservableCreate对象。

        public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
            final Function<? super T, ? extends U> function;
        
            public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
                super(source);
                this.function = function;
            }
        }
        
        
        abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
        
            /** The source consumable Observable. */
            protected final ObservableSource<T> source;
        }
        
      3. ObservableMap.subscribe(Observer<? super T> observer)

        .subscribe(new Observer<Boolean>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: 终点的监听执行 onSubscribe");
            }
        
            @Override
            public void onNext(Boolean aBoolean) {
                Log.d(TAG, "onNext: 终点的监听执行 onNext aBoolean=" + aBoolean);
            }
        
            @Override
            public void onError(Throwable e) {
        
            }
        
            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: 终点的监听执行 onComplete");
            }
        });
        

        将终点的监听作为参数传入,subscribe()是ObservableMap父类Observable的方法,ObservableMap没有重写subscribe()。而实际调用的是subscribe(),ObservableMap重写了,所以走的是ObservableMap.subscribeActual(observer)方法

        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
        
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        
                subscribeActual(observer);//这句是重点
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(e);
        
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
        

        再来看下ObservableMap中subscribeActual的具体实现,实际调用的又是source.subscribe(),这个source就是前面保存的ObservableCreate对象。所以实际调用的是ObservableCreate.subscribe()。

        @Override
        public void subscribeActual(Observer<? super U> t) {
            source.subscribe(new MapObserver<T, U>(t, function));
        }
        

        ObservableCreate没有重写subscribe(),走的父类Observable.subscribe()方法,还记得调用Observable.subscribe(),实际走的是哪个方法吗?对,是ObservableCreate..subscribeActual(observer)。

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);//observer是终点的监听,这边封了一层箱
            observer.onSubscribe(parent);//调用了终点的监听的onSubscribe方法
        
            try {
                source.subscribe(parent);//这边调用的是我们自定义source的subscribe方法
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
        
    订阅流程代码执行流程图.jpg
    1. 事件监听流程
      new ObservableOnSubscribe<String>() {
          @Override
          public void subscribe(ObservableEmitter<String> emitter) throws Exception {
              
              //这里手动调用next
              emitter.onNext("检测到变化了...");
          }
      }
      

      emitter就是上面定义的CreateEmitter。CreateEmitter中

      final Observer<? super T> observer;//这个就是终点的监听
      
      @Override
      public void onNext(T t) {
          if (t == null) {
              onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
              return;
          }
          if (!isDisposed()) {//如果没有被中止,就执行终点监听的onNext(t)
              observer.onNext(t);
          }
      }
      
    事件监听代码执行流程图.jpg

    代码执行流程:

    1. observable.create传入了自定义source,返回一个obserableCreate对象;
    2. obserableCreate调用.map(),传入funcation自定义对象,返回一个obserableMap对象,obserableMap对象有两个参数,一个是obserableCreate,一个是传入funcation自定义对象;
    3. obserableMap调用.subscribe(observer),传入一个自定义的终点监听observer。在subscribe(observer)方法中,将传入的终点监听封装了一层,将终点observer对象和传入funcation自定义对象作为参数,定义了一个mapObserver对象。调用自己的subscribeActual(observer);
    4. obserableMap的subscribeActual(observer)中调用了obserableCreate的subscribe(observer)。obserableCreate实际调用的同样是subscribeActual(observer);
    5. 而obserableCreate的subscribeActual(observer)中又将mapObserver封装了一层,定义了一个CreateEmitter。然后先是调用了mapObserver.onSubscribe(CreateEmitter),mapObserver父类的onSubscribe(CreateEmitter)中又继续调用了终点observer.onSubscribe(CreateEmitter),然后又调用了自定义source的onSubscribe(CreateEmitter)方法。
    6. 如果我们在自定义source的onSubscribe(CreateEmitter)方法中调用了CreateEmitter.onNext(),CreateEmitter.onNext()方法中又调用了mapObserver..onNext(),mapObserver.onNext()又继续调用终点observer.onNext()。

    相关文章

      网友评论

          本文标题:RxJava(1):使用和原理

          本文链接:https://www.haomeiwen.com/subject/jzbhultx.html