美文网首页Android开发Android技术知识Android技术进阶
深入Rxjava原理——带你实现一个基本的rxjava框架

深入Rxjava原理——带你实现一个基本的rxjava框架

作者: 谁动了我的代码 | 来源:发表于2022-08-31 21:58 被阅读0次

Rxjava 本质上是 观察者模式框架。被观察者(Observable)->传递数据-> 观察者(observer)在调用subscribe ()方法进行订阅时,会把observer 层层往上构造出新的observer。

例如:Observable.create().map().subscribe(observer) ,observer会先被new MapObserver(observer),用MapObserver接收事件源,再传递到原始的observer。

RxJava源码分析

我们先来分析一下RxJava的工程目录

从源码结构上看无非是观察者Observer/Subscriber、被观察者Observerable、以及与订阅/观察相关的功能类:

  • annotations是相关注解

  • exceptions

  • functions 主要是订阅相关的接口类,比如Action1、Func0等

  • internal是内部使用的util、operaters的综合,方便将订阅关系捋顺,比较重要

  • observerable包主要是专门为某种场景定制的Oberverable类

  • observer包特定的订阅者及集合

  • schedulers包有关异步逻辑的线程关系

  • subjects包订阅中间产生的对象

  • subscriptions包是订阅集合,网络调用常用到

上述最最最核心的就是Observerable类了,光代码就有上万行,不过先不要慌,内部是很有条理的,我们下面继续看。

Observerable类包含三类方法
  • a.产生Observerable被观察者的方法,主要的有create、just、list等

  • b.对Observerable进行中间变换的方法,我们比较熟悉的是map、flatMap、lift、merge、zip、startwith、takeab类方法的特征是都返回Observerable<T>对象

  • c.订阅方法,返回subscriptiond订阅对象

Observerable类是所有异步处理的开始、进行和结束,是核心类,理解了这个类就理解了RxJava。

Observerable类的众多方法中总有一款适合用来处理你的异步逻辑,有兴趣的可以深入的研究一下这些方法。

实战一个简易rxjava

为学习rxjava的基本流程,写一个精简版的rxjava

Subscriber观察者

Observer 接口

public interface Observer<T> {
    void onCompleted();
    void onError(Throwable t);
    void onNext(T var1);
}

SubScriber 简化:

public abstract class Subscriber<T> implements Observer<T> {
    public void start() {
    }
}

Observable订阅源

Observable(订阅源)

在RxJava里面是一个大而杂的类,拥有很多工厂方法和各式各样的操作符。每个Observable里面有一个OnSubscribe对象,只有一个方法(void call(Subscriber<? super T> subscriber);),用来产生数据流,这是典型的命令模式。

public class Observable<T> {
    final OnSubscribe<T> onSubscribe;

    private Observable(OnSubscribe<T> onSubscribe) {
        this.onSubscribe = onSubscribe;
    }

    public static <T> Observable<T> create(OnSubscribe<T> onSubscribe) {
        return new Observable<T>(onSubscribe);
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        subscriber.start();
        onSubscribe.call(subscriber);
    }

    public interface OnSubscribe<T> {
        void call(Subscriber<? super T> subscriber);
    }
}

这样一个大致的框架就出来了

测试

        Observable.create(new Observable.OnSubscribe<Integer>() {
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i < 10; i++) {
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        }).subscribe(new Subscriber<String>() {
                public void onCompleted() {
                  System.out.println("complete");
                }

                public void onError(Throwable r) {

                }

                public void onNext(String string) {
                  System.out.println(Thread.currentThread().getName());
                  System.out.println(string);
                }
              });

下面实现map,起始map是就是对结果再包装一层Observe.

实现结果测试

Observable.create(new Observable.OnSubscribe<Integer>() {
      public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 10; i++) {
          subscriber.onNext(i);
        }
        subscriber.onCompleted();
      }
    })
              .map(new Observable.Transformer<Integer, String>() {
                public String call(Integer from) {
                  System.out.println("subsc1@ " + Thread.currentThread().getName());
                  return "maping " + from;
                }
              })
              .map(new Observable.Transformer<String, String>() {
                public String call(String from) {
                  System.out.println("subsc2@ " + Thread.currentThread().getName());
                  return "maping2 " + from;
                }
              })
              .subscribe(new Subscriber<String>() {
                public void onCompleted() {
                  System.out.println("complete");
                }

                public void onError(Throwable r) {

                }

                public void onNext(String string) {
                  System.out.println(Thread.currentThread().getName());
                  System.out.println(string);
                }
              });

至于线程切换,就是在指定的线程调用call 函数、或调用subscriber里的onNext()等函数

小结一下,文章到这里主要简单说明了它的原理,以及源码分析。再到一个简单的实战演练。很好的理解rxjava的使用原理。

文末

总体来说RxJava主要作用帮你优雅的处理异步逻辑。RxJava是处理异步逻辑的利器,以往我们处理异步时,需要创建一个线程,传入callback或者listener,线程处理完任务后通过callback、listener、notify或者发送广播去通知UI线程和其他线程。使用RxJava可以在一个方法体内完成这所有逻辑。

相关文章

网友评论

    本文标题:深入Rxjava原理——带你实现一个基本的rxjava框架

    本文链接:https://www.haomeiwen.com/subject/qlcjnrtx.html