JAVA之RxJava

作者: isLJli | 来源:发表于2021-08-03 17:30 被阅读0次

    RxJava概述

    RxJava的基本使用

    RxJava有三个基本的元素:

    1. 被观察者(Observable)
    2. 观察者(Observer)
    3. 订阅(subscribe)

    首先在gradle文件中添加依赖:

    implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
    

    最新的版本可查看官网 :
    RxJava: https://github.com/ReactiveX/RxJava
    RxAndroid: https://github.com/ReactiveX/RxAndroid

    1. 创建被观察者
    Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
      @Override
      public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
          emitter.onNext(1);
          emitter.onComplete();
      }
    });
    
    1. 创建观察者
    Observer observer = new Observer<Integer>
       @Override
       public void onSubscribe(@NonNull Dispo
           Log.d("数据", "onSubscribe");
       }
       @Override
       public void onNext(@NonNull Integer in
           Log.d("数据", "onNext" + integer);
       }
       @Override
       public void onError(@NonNull Throwable
           Log.d("数据", "onError");
       }
       @Override
       public void onComplete() {
           Log.d("数据", "onComplete");
       }
    };
    
    1. 订阅
    observable.subscribe(observer);
    

    被观察者只有通过subscribe订阅绑定观察者后,才可以发送数据给观察者。

    RxJava核心思想

    RxJava的核心思路是观察者模式和响应式编程。

    1. 观察者模式: Observable被观察者就像一个生产者,Observer观察者像是一个消费者,生产者通过subscribe订阅消费者,并开始把数据发送给消费者。
    2. 响应式编程:把Observable当作起点Observer是终点,其中间可以添加各式的操作符。
      img

    RxJava配合Retrofit

    Retrofit本身并没有如OkHttp请求网络和RxJav发送数据的功能,但它对这些做了封装管理,所以项目中经常使用OkHttp和RxJava配合Retrofit请求数据。下面做一个demo:

    1. 请求API
    public interface NetApi {
    
      @POST("register")
      @FormUrlEncoded
      Observable<RegisterResponse> registerAction(@Body RegisterRequest request);
    
    }
    
    1. 创建Retrofit配合OkHttp、RxJava、Gson
    public class RetrofitUtil {
    
      public static <T> T create(Class<T> tClass) {
          return new Retrofit.Builder()
                  .baseUrl("http://xxxxxxx")
                  .client(getHttpClient())
                  .addConverterFactory(GsonConverterFactory.create())
                  .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                  .build()
                  .create(tClass);
      }
    
      private static OkHttpClient getHttpClient() {
          OkHttpClient.Builder builder = new OkHttpClient.Builder()
                  .readTimeout(10, TimeUnit.SECONDS)
                  .connectTimeout(9, TimeUnit.SECONDS);
          if (BuildConfig.DEBUG) {
              HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
              interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
              builder.addInterceptor(interceptor);
          }
          return builder.build();
      }
    
    }
    
    
    1. RxJava获取数据流
    RetrofitUtil.create(NetApi.class)
          .registerAction(new RegisterRequest())
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(new Observer<RegisterResponse>() {
              @Override
              public void onSubscribe(@NonNull Disposable d) {
              }
              @Override
              public void onNext(@NonNull RegisterResponse registerRe
              }
              @Override
              public void onError(@NonNull Throwable e) {
              }
              @Override
              public void onComplete() {
              }
          });
    

    通过Retrofit的配合,我们不用写OkHttp的请求网络执行代码,也不用把Json字符串转换成对象。只需定义RxJava的操作和接收。

    RxJava模式与原理

    RxJava的操作符应用

    Rxjava操作符应用.png

    我们可以通过create、just、fromIterable等操作符去请求发送数据;通过map操作符去操作转换发送的数据;通过flatMap操作符重新创建Observable发送数据;通过concat、merge操作符组合多个事件的发送;通过subscribeOn操作符决定上面代码的执行线程环境,通过observeOn操作符决定下面代码的执行线程环境。
    更多的操作符号使用可参考:RxJava2 只看这一篇文章就够了

    RxJava之create原理分析

    wecom-temp-84d497613df1cb553b4478c5abce56d1.png
    上面这段代码可以分成两部分,也是最简单的起点和终点

    1. Observable.create()返回ObservableCreate

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
       Objects.requireNonNull(source, "source is null");
       return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
    }
    

    并把ObservableOnSubscribe作为 source参数传入ObservableCreate类中:

    public final class ObservableCreate<T> extends Observable<T> {
     // 把`ObservableOnSubscribe`传进来
      final ObservableOnSubscribe<T> source;
      public ObservableCreate(ObservableOnSubscribe<T> source) {
          this.source = source;
      }
    ....
    

    2. 通过subscribe订阅观察者,执行ObservableCreate.subscribe()方法:

    subscribe()
    • ObservableCreate先执行父类Observable的.subscribe()方法,最后把Observer实例传给ObservableCreate类中的subscribeActual方法执行:
      subscribeActual
    • 首先把observer包装了一层,并调用了observer.onSubscribe()方法
    • 执行source接口实例的subscribe,并把包裹的parent传了过去。

    3. 往下执行数据给Observer
    source的实例的subscribe方法

    subscribe

    通过包裹了Observer的ObservableEmitter类给Observer发送数据

    onNext

    整体流程图如下:


    create流程图

    RxJava之map原理分析

    map代码

    1. Observable.map返回ObservableMap对象

    ObservableMap

    2. 往上执行Observable的subscribeActual方法

    ObservableMap.subscribeActual
    ObservableCreate.subscribeActual

    3. 执行各操作符的方法,并往下发送数据给Observer

    • create往下发送数据


      create发送数据
      create发送数据
    • map往下发送数据


      map发送数据
      map发送数据

    相关文章

      网友评论

        本文标题:JAVA之RxJava

        本文链接:https://www.haomeiwen.com/subject/ohcevltx.html