美文网首页
认识 RxJava

认识 RxJava

作者: 桥头桥尾 | 来源:发表于2018-04-18 21:14 被阅读0次

RxJava 是一个针对响应式编程思想的一个框架,其本质是利用了观察者模式,充分的利用的函数式编程的思想,学习此框架,注意类的命名方式,这样有助了对其流程的理解

主要类

  • Publisher:发布者,发布数据的source

    • subscribe(Subscriber<? super T> s) 订阅观察者
  • Observer:观察者,接收来自Publisher的数据,进行处理

    • onSubscribe(Subscription s) 订阅时触发
    • onNext(T t) 接收一个消息时触发
    • onError(Throwable t) 发生错误时触发
    • onComplete() 消息全部结束(及结束)时触发
  • Subscription: 订阅,用于观察者对发布者订阅的信息

    • request(long n) 向Publisher请求n个数据
    • cancel() 告诉Publisher停止发送数据

触发流程

用一个最简单的程序例子来描述其运行流程

Flowable.just("1")
        .subscribe(System.out::println);

Flowable.just("1") 只是new了一个FlowableJust的Publisher对象

subscribe(System.out::println) 会new一个LambdaSubscriber的Subscriber的对象,

并发起数据流动操作调用subscribe(Subscriber<? super T> s)方法触发整个数据的操作

subscribe(Subscriber<? super T> s)开始解析其流程

//io.reactivex.Flowable.java
public final Disposable subscribe(Consumer<? super T> onNext) {
  return subscribe(onNext, Functions.ON_ERROR_MISSING,
                   Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Subscription> onSubscribe) {
  ...
  LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);
  subscribe(ls);
  return ls;
}

public final void subscribe(FlowableSubscriber<? super T> s) {
   ...
   subscribeActual(z);
}

subscribeActual将会调用具体Publiser实现的此方法

//io.reactivex.internal.operators.flowable.FlowableJust.java
protected void subscribeActual(Subscriber<? super T> s) {
  s.onSubscribe(new ScalarSubscription<T>(s, value));
}

onSubscribe 将会触发Observer此方法的实现

//io.reactivex.internal.subscribers.LambdaSubscriber
public void onSubscribe(Subscription s) {
  //注意,由于当前类extends AtomicReference<Subscription>, 此方法是将Subscription set到当前类
  //到后面再使用当前类的Subscription相关的方法时,其实使用的是s的实现方法
  if (SubscriptionHelper.setOnce(this, s)) {
    onSubscribe.accept(this);
    ...
  }
}

onSubscribe.accept(this). 将会调用Observer中的Subscription请求触发数据

//io.reactivex.internal.operators.flowable.FlowableInternalHelper.RequestMax
public void accept(Subscription t) throws Exception {
  //之前执行了在LambdaSubscriber中SubscriptionHelper.setOnce(this, s)方法,
  //所以其实调用的是ScalarSubscription.request方法
  t.request(Long.MAX_VALUE);
}

t.request(Long.MAX_VALUE) 的最终调用

//io.reactivex.internal.subscriptions.ScalarSubscription
@Override
public void request(long n) {
  //其s指的就是LambdaSubscriber,到这数据的流向处理就已经通了
  s.onNext(value);
}

相关文章

  • 认识 RxJava

    RxJava 是一个针对响应式编程思想的一个框架,其本质是利用了观察者模式,充分的利用的函数式编程的思想,学习此框...

  • RxJava使用之认识RxJava

    写在前面:由于公司拖欠工资,2月的工资都没拿到,所以最近出来面试(已收两家offer但薪资都很低),毫无疑问,每次...

  • Rxjava入门与使用

    认识 rxjava RxJava是 ReactiveX 在JVM上的一个实现,ReactiveX使用Observa...

  • Rxjava学习笔记

    1.认识 rxjava RxJava是 ReactiveX 在JVM上的一个实现,ReactiveX使用Obser...

  • Rxjava 认识5

  • Rxjava 认识4

  • Rxjava 认识3

  • Rxjava 认识2

  • Rxjava 认识1

    /** 1 概念: Rxjava 基于观察者的 异步 事件库* 2 rx的观察者模式 Observer...

  • 初步认识RXJava

    前言:面向对象变成是命令式编程的一种,面向计算机硬件的抽象,有变量、赋值、表达式和控制语句。函数式编程是面向数学的...

网友评论

      本文标题:认识 RxJava

      本文链接:https://www.haomeiwen.com/subject/liybkftx.html