RxJava2讲解

作者: Simplelove_f033 | 来源:发表于2019-03-06 14:52 被阅读3次

RxJava2 主要类关系图

如下图所示,为RxJava2中的主要类关系图,可清晰知道各响应式类的联系和区别。后面无特别说明均以Flowable说明。

image

Flowable & Observable

  • Observable: 不支持背压;

  • Flowable : Observable新的实现,支持背压,同时实现Reactive Streams 的 Publisher 接口。

  • 什么时候用 Observable:

    • 一般处理最大不超过1000条数据,并且几乎不会出现内存溢出;
    • 如果式GUI 鼠标事件,频率不超过1000 Hz,基本上不会背压(可以结合 sampling/debouncing 操作);
    • 如果处理的式同步流而你的Java平台又不支持Java Stream(如果有异常处理,Observable 比Stream也更适合);
  • 什么时候用 Flowable:

    • 处理以某种方式产生超过10K的元素;
    • 文件读取与分析,例如 读取指定行数的请求;
    • 通过JDBC 读取数据库记录, 也是一个阻塞的和基于拉取模式,并且由ResultSet.next() 控制;
    • 网络IO流;
    • 有很多的阻塞和/或 基于拉取的数据源,但是又想得到一个响应式非阻塞接口的。

Single & Completable & Maybe

  • Single: 可以发射一个单独onSuccess 或 onError消息。它现在按照Reactive-Streams规范被重新设计,并遵循协议 onSubscribe (onSuccess | onError)? .SingleObserver改成了如下的接口;
interface SingleObserver<T> {
    void onSubscribe(Disposable d);
    void onSuccess(T value);
    void onError(Throwable error);
}

  • Completable: 可以发送一个单独的成功或异常的信号,按照Reactive-Streams规范被重新设计,并遵循协议onSubscribe (onComplete | onError)?
    Completable.create(new CompletableOnSubscribe()
    {
        @Override
        public void subscribe(CompletableEmitter e) throws Exception
        {
            Path filePath = Paths.get("build.gradle");
            Files.readAllLines(filePath);
            e.onComplete();
        }
    }).subscribe(() -> System.out.println("OK!"),
            Throwable::printStackTrace);

  • Maybe:从概念上来说,它是Single 和 Completable 的结合体。它可以发射0个或1个通知或错误的信号, 遵循协议 onSubscribe (onSuccess | onError | onComplete)?。
Maybe.just(1)
        .map(v -> v + 1)
        .filter(v -> v == 1)
        .defaultIfEmpty(2)
        .test()
        .assertResult(21);
//        java.lang.AssertionError: Values at position 0 differ; Expected: 21 (class: Integer), Actual: 2 (class: Integer) (latch = 0, values = 1, errors = 0, completions = 1)
//
//        at io.reactivex.observers.BaseTestConsumer.fail(BaseTestConsumer.java:133)
//        ....

RxJava2 的主要操作

我们已经知道 RxJava主要特性为为一个扩展的观察者模式、流式操作和异步编程,支持ReactiveX 规范给出的一些操作, 同时RxJava2 符合响应式流规范,接下来以Flowable为例,按照功能分类讲解RxJava2中的重要操作[9];

创建一个Flowable

  • fromArray & fromIterable & just,直接从数组或迭代器中产生;
    List<String> list = Arrays.asList(
            "blue", "red", "green", "yellow", "orange", "cyan", "purple"
    );
    Flowable.fromIterable(list).skip(2).subscribe(System.out::println);
    Flowable.fromArray(list.toArray()).subscribe(System.out::println);
    Flowable.just("blue").subscribe(System.out::println);

  • fromFuture & fromCallable:

    fromFuture, 事件从非主线程中产生; fromCallable, 事件从主线程中产生, 在需要消费时生产;

    ExecutorService executor = Executors.newFixedThreadPool(2);
    System.out.println("MAIN: " + Thread.currentThread().getId());
    Callable<String> callable = () -> {
        System.out.println("callable [" + Thread.currentThread().getId() + "]: ");
        Path filePath = Paths.get("build.gradle");
        return Files.readAllLines(filePath).stream().flatMap(s -> Arrays.stream(s.split
                (""))).count() + "";
    };

    Future<String> future = executor.submit(callable);

    Consumer<String> onNext = v -> System.out
            .println("consumer[" + Thread.currentThread().getId() + "]:" + v);

    Flowable.fromCallable(callable).subscribe(onNext);
    Flowable.fromFuture(future).subscribe(onNext);
    System.out.println("END");

  • fromPublisher ,从标准(Reactive Streams)的发布者中产生;

  • 自定义创建(generate & create)

    下面以斐波那契数列产生为例说明 generate & create的使用, generate为RxJava2新增的创建方式。

    class Fib
    {
        long a;
        long b;

        public Fib(long a, long b)
        {
            this.a = a;
            this.b = b;
        }

        public long fib()
        {
            return a + b;
        }
    }

    //斐波那契数列
    Flowable.create(new FlowableOnSubscribe<Fib>()
    {
        @Override
        public void subscribe(FlowableEmitter<Fib> e) throws Exception
        {
            Fib start = new Fib(1L, 1L);

            while (!e.isCancelled()) {
                e.onNext(start);
                start = new Fib(start.b, start.fib());
            }
            e.onComplete();
        }
    }, BackpressureStrategy.BUFFER).map(x -> x.fib()).take(10).subscribe(System.out::println);

    Flowable.generate(() -> new Fib(1L, 1L), (x, y) -> {
        Fib fib = new Fib(x.b, x.fib());
        y.onNext(fib);
        return fib;
    }).ofType(Fib.class).map(x -> x.fib()).take(10).subscribe(System.out::println);

  • amb & concat & merge, 由多个Flowable产生结合;

    • amb: 给定两个或多个Flowable,只发射最先发射数据的Flowable,如下面示例中的f1被发射;

    • concat: 给定多个Flowable, 按照Flowable数组顺序,依次发射数据,不会交错,下面示例中f1,f2中数据依次发射;

    • merge: 给定多个Flowable, 按照Flowable数组中数据发射的顺序组合成新的Flowable,各Flowable数据可能会交错(等价于转换操作中的flatMap);

    • switchOnNext:给定能发射多个Flowable的Flowable,顺序发射各子Flowable,最新发射的子Flowable覆盖当前子Flowable中还未发射的元素(由switchMap实现)。

      image
    Flowable<String> f1 = Flowable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS).map(index -> "f1-" + index);
    Flowable<String> f2 = Flowable.intervalRange(1, 3, 2, 2, TimeUnit.SECONDS).map(index -> "f2-" + index);

    Flowable.ambArray(f1, f2).map(x -> "amb: " + x).subscribe(System.out::println);
    System.out.println("----------concat-----------");
    Flowable.concat(f1, f2).map(x -> "concat: " + x).subscribe(System.out::println);

    System.out.println("----------merge-----------");
    Flowable.merge(f1, f2).map(x -> "merge: " + x).subscribe(System.out::println);

    Flowable<String>[] flowables = new Flowable[]{f1, f2};
    Flowable.switchOnNext(Flowable.intervalRange(0, 2, 0, 3, TimeUnit.SECONDS).map(i -> flowables[i.intValue()]))
            .map(x -> "switchOnNext-" + x).subscribe(System.out::println);
    Flowable.intervalRange(0, 2, 0, 3, TimeUnit.SECONDS).map(i -> flowables[i.intValue()])
            .switchMap((io.reactivex.functions.Function) Functions.identity())
            .map(x -> "switchMap-" + x).subscribe(System.out::println);

  • zip & combineLatest, 多Flowable中元素结合变换

    • zip :每个Flowable中的元素都按顺序结合变换,直到元素最少Flowable的已经发射完毕;

      image
    • combineLatest: 每个Flowable中的发射的元素都与其他Flowable最近发射的元素结合变换,知道所有的Flowable的元素发射完毕;

      image

转换、过滤与聚合操作

在Java8中Stream也有包含这些功能的操作,由于多了时间这个维度,在 RxJava 中操作相对更加丰富。 这里主要介绍一些重点操作。

  • buffer & groupBy & window

    buffer 和 window 都可以按时间或者元素数量窗口,buffer是直接转换成元素集,window是将元素集转换成另一个Flowable, groupBy,按照key 来分组,需要元素发射完成才能消费,如果只是对数据处理使用Java8 groupBy更方便;

    Flowable<String> f1 = Flowable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS).delay((t) ->
            Flowable.timer(t % 3 + new Random().nextLong() % 3, TimeUnit.SECONDS))
            .map(index -> index % 3 + "-f1-" + index);
    f1.buffer(5, TimeUnit.SECONDS).map(x -> "buffer-" + x).subscribe(System.out::println);

    f1.window(5, TimeUnit.SECONDS).map(x -> x.toList())
            .subscribe(x -> x.subscribe(System.out::println));

    Disposable b = f1.groupBy((x) -> x.split("-", 2)[0])
            .subscribe(x -> x.toList().subscribe(System.out::println));
    Map<String, List<String>> map = f1.toList().blockingGet().stream()
                .collect(Collectors.groupingBy((x) -> x.split
                        ("-", 2)[0]));
    System.out.println(map);

    while (!b.isDisposed()) {
    }

  • debounce & throttleFirst & sample 按照时间区间采集数据

debounce 防抖动,元素发射后在设定的超时时间内没有其它元素发射,则将此元素用于后续处理, 在前端APP应用较多。 如果是空间上的防抖动可以利用distinctUntilChanged操作符。

image

throttle 限流操作,对于 throttleFirst是 取发射后元素,经过间隔时间后的第一个元素进行发射。

image

sample 数据采样, 对于源数据,发射间隔时间内的最后出现的元素。

image
  • take & skip & first & emlmentAt,精确获取数据(集)

    take, 类似java8 limit 操作,但是这里支持更多的操作(take/takeLast/takeUntil/takeWhen),同时支持在时间区间上获取数据集; skip, 类似java8 skip 操作,但是这里的可以扩展到时间区间上 first/firstElement/last/lastElement, 由 Flowable -> Single/Maybe.

        Flowable<String> f1 = Flowable
                .fromArray("blue", "red", "green", "yellow11", "orange", "cyan", "purple"
                );

        f1.elementAt(4, "hello").subscribe(System.out::println);
        //out: orange
        f1.takeUntil(x -> x.length() > 5).map(x -> "takeUntil-" + x).toList()
                .subscribe(System.out::println);
        //out: [takeUntil-blue, takeUntil-red, takeUntil-green, takeUntil-yellow11]
        f1.takeWhile(x -> x.length() <= 5).map(x -> "takeWhile-" + x).toList()
                .subscribe(System.out::println);
        //out: [takeWhile-blue, takeWhile-red, takeWhile-green]

        f1.skipWhile(x -> x.length() <= 5).map(x -> "skipWhile-" + x).toList()
                .subscribe(System.out::println);
        //[skipWhile-yellow11, skipWhile-orange, skipWhile-cyan, skipWhile-purple]

        Disposable d = f1.delay(v -> Flowable.timer(v.length(), TimeUnit.SECONDS))
                .skipUntil(Flowable.timer(5, TimeUnit.SECONDS)).map(x -> "skipUntil-" + x)
                .subscribe(System.out::println);
//        skipUntil-green
//        skipUntil-orange
//        skipUntil-purple
//        skipUntil-yellow11
        while (!d.isDisposed()) {
        }

异步与并发(Asynchronized & Concurrency)

RxJava 通过一些操作统一了 同步和异步,阻塞与非阻塞,并行与并发编程。

observeOn & subscribeOn & Scheduler

  • subscribeOn 和 observeOn 都是用来切换线程用的,都需要参数 Scheduler.

  • Scheduler ,调度器, 是RxJava 对线程控制器 的 一个抽象,RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

    • trampoline, 直接在当前线程运行(继续上一个操作中,最后处理完成的数据源所处线程,并不一定是主线程),相当于不指定线程;

    • computation, 这个 Scheduler 使用的固定的线程池(FixedSchedulerPool),大小为 CPU 核数, 适用于CPU 密集型计算。

    • io,I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率;

    • newThread, 总是启用新线程,并在新线程中执行操作;

    • single, 使用定长为1 的线程池(newScheduledThreadPool(1)),重复利用这个线程;

    • Schedulers.from, 将java.util.concurrent.Executor 转换成一个调度器实例。

      java.util.function.Consumer<Object> pc = x -> System.out
             .println("Thread[" + Thread.currentThread().getName() + " ," + Thread
                     .currentThread().getId() + "] :" + x);
      Executor executor = Executors.newFixedThreadPool(2);
      Schedulers.from(executor).scheduleDirect(() -> pc.accept("executor one"));
      Schedulers.from(executor).scheduleDirect(() -> pc.accept("executor two"));
      Schedulers.trampoline().scheduleDirect(() -> pc.accept("trampoline"), 1, TimeUnit.SECONDS);
      Schedulers.single().scheduleDirect(() -> pc.accept("single one DONE"));
      Schedulers.single().scheduleDirect(() -> pc.accept("single two DONE"));
      Schedulers.computation()
             .scheduleDirect(() -> pc.accept("computation one DONE"), 1, TimeUnit.SECONDS);
      Schedulers.computation()
             .scheduleDirect(() -> pc.accept("computation two DONE"), 1, TimeUnit.SECONDS);
      Schedulers.io().scheduleDirect(() -> pc.accept("io one DONE"));
      Schedulers.io().scheduleDirect(() -> pc.accept("io two DONE"), 1, TimeUnit.SECONDS);
      Schedulers.io().scheduleDirect(() -> pc.accept("io tree DONE"), 1, TimeUnit.SECONDS);
      Schedulers.newThread().scheduleDirect(() -> pc.accept("newThread tree DONE"));
      
      
  • subscribeOn 将Flowable 的数据发射 切换到 Scheduler 所定义的线程, 只有第一个 subscribeOn 操作有效 ;

  • observeOn 指定 observeOn 后续操作所在线程,可以联合多个 observeOn 将切换多次 线程 ;

    示例 Schedulers.newThread() 定义的线程发送数据;

    Schedulers.computation() 定义的线程 执行doOnNext;

    Schedulers.single() 执行 subscribe

      Consumer<Object> threadConsumer = x -> System.out
              .println("Thread[" + Thread.currentThread().getName() + " ," + Thread
                      .currentThread().getId() + "] :" + x);
    
      Flowable<Path> f1 = Flowable.create((FlowableEmitter<Path> e) -> {
          Path dir = Paths.get("/home/clouder/berk/workspaces/cattle").toRealPath();
          try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(dir)) {
              Iterator<Path> iter = dirStream.iterator();
              while (iter.hasNext() && !e.isCancelled()) {
                  e.onNext(iter.next());
              }
              e.onComplete();
          }
      }, BackpressureStrategy.BUFFER);
      f1.subscribeOn(Schedulers.newThread()).subscribeOn(Schedulers.io())
              .observeOn(Schedulers.computation()).take(5).doOnNext(consumer).observeOn(Schedulers
              .single()).subscribe(consumer);
    
    

多线程并发示例

上小节给出示例 发射元素都会经过同样的线程切换,元素间不会产生并行执行的效果。 如果需要达到 类似 Java8 parallel 执行效果。可以采用FlatMap 变换 自定义并发操作,在返回的Flowable进行线程操作,如下示例所示:

  • f1 中元素会在Schedulers.newThread()中发射;

  • 读取文本内容的操作(Files::readAllLines, Collection::size) 会在 Schedulers.io() 所指定的线程池执行;

  • sorted 操作会在 Schedulers.computation() 所指定的线程池中执行;

  • subscribe() 同样会在 Schedulers.computation() 所指定的线程池中执行;

    f1.filter(Files::isRegularFile).doOnNext(consumer).subscribeOn(Schedulers.newThread())
            .flatMap(y -> Flowable.just(y).subscribeOn(Schedulers.io())
                    .map(Files::readAllLines)).map(Collection::size)
            .observeOn(Schedulers.computation()).doOnNext(consumer)
            .sorted(Comparator.naturalOrder())
            .observeOn(Schedulers.trampoline()).subscribe(consumer);

阻塞与非阻塞示例

  • 从阻塞到非阻塞 我们可以通过 subscribeOn() 来达到;

     //f1 为 主线程发射数据的Flowable
     //会阻塞主线程知道消费完成
     f1.subscribe(System.out::println);
    
     // d会理解返回.
     Disposable d = f1.subscribeOn(Schedulers.newThread())
             .subscribe(System.out::println);
     while (!d.isDisposed()) {
     }
    
    
    • 从非阻塞到阻塞,可以通过blocking* 相关操作来实现
    Flowable<Path> f2 = f1.subscribeOn(Schedulers.newThread());
    //f2 为非阻塞flowable
    // 可以通过 blockingSubscribe 变为在主线程上消费
    f2.blockingSubscribe(System.out::println);
    // 也可以通过下面操作返回结果。
    List<Path> list = f2.toList().blockingGet();
    Iterable<Path> iterator = f2.blockingIterable();

错误处理 (error handling)

  • RxJava

  • 无需显示的catch 编译异常,RxJava2 已经支持所有函数接口抛出Exception;

  • 更方便在异步多线程环境下进行错误处理;

    如下示例 会打印第一个异常;

    Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS).map(index -> {
        throw new IOException(index + "");
    }).map(index -> {
        throw new IllegalArgumentException(index + "");
    });
    Disposable d = f1.subscribe(System.out::println, Throwable::printStackTrace);
    while (!d.isDisposed()) {
    }

  • 异常可以被转换,源数据发射终止
    • 当出现异常时,可以通过 onErrorReturn* 转换成一个正常值返回;
    • 当出现异常时,通过 onErrorResumeNext 自定义一个Publisher返回,意味着可以转换一个异常类型;
    Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS).map(index -> {
        throw new IOException(index + "");
    }).map(index -> {
        throw new IllegalArgumentException(index + "");
    });
    f1.onErrorReturnItem(-1L).take(5)
            .subscribe(System.out::println, Throwable::printStackTrace);
    // 打印 -1 
    Disposable d = f1.onErrorResumeNext(e -> {
        if (e instanceof IOException) {
            return Flowable.error(new UncheckedIOException((IOException) e));
        }
        return Flowable.error(e);
    }).subscribe(System.out::println, Throwable::printStackTrace);
    // 打印 UncheckedIOException 异常
    while (!d.isDisposed()) {
    }

  • Flowable,map抛出异常,但数据继续发射

    暂没有找到直接方法可以达到,但可以采取如下两种方法达到

    Function<Long, Long> exceptionMap = x -> {
        if (new Random().nextInt(5) > 2) {
            throw new IOException(x + "");
        }
        return x;
    };

    // 使用flatMap + onErrorReturnItem
    Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS);
    f1.flatMap(index -> Flowable.just(index).map(exceptionMap).onErrorReturnItem(-1L))
    .take(5).subscribe(System.out::println);

   //直接封装lift 操作
   public class ErrorResumeOperator<D, U> implements FlowableOperator<D, U>
    {
        private final Function<U, D> function;
        private final D defaultValue;

        public ErrorResumeOperator(Function<U, D> function, D defaultValue)
        {
            this.function = function;
            this.defaultValue = defaultValue;
        }

        @Override
        public Subscriber<? super U> apply(Subscriber<? super D> observer) throws Exception
        {
            Subscriber<U> subscriber = new Subscriber<U>()
            {
                @Override
                public void onSubscribe(Subscription s)
                {
                    observer.onSubscribe(s);
                }

                @Override
                public void onNext(U onNext)
                {
                    try {
                        observer.onNext(function.apply(onNext));
                    }
                    catch (Exception e) {
                        observer.onNext(defaultValue);
                    }
                }

                @Override
                public void onError(Throwable t)
                {
                    observer.onError(t);
                }

                @Override
                public void onComplete()
                {
                    observer.onComplete();
                }
            };
            return subscriber;
        }
    }
    Disposable d = f1.lift(new ErrorResumeOperator<>(exceptionMap, -1L)).take(5)
            .subscribe(System.out::println);

    while (!d.isDisposed()) {
    }

  • 出错重试(retry)

    RxJava 提供了retry以及相关的多个操作,提供出错后重新发射数据功能;


    Function<Long, Long> exceptionMap = x -> {
        if (new Random().nextInt(5) > 3) {
            throw new IOException(x + "");
        }
        if (new Random().nextInt(6) < 1) {
            throw new SQLException(x + "");
        }
        return x;
    };
    Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS);
// 仅为 IOException 异常时最多重试3次,其它异常立即打印异常
    Disposable d = f1.map(exceptionMap).retry(3, e -> e instanceof IOException)
            .subscribe(System.out::println, Throwable::printStackTrace);
    while (!d.isDisposed()) {
    }

冷热数据流

ConnectableFlowable & publish & connect

  • ConnectableFlowable 可连接的Flowable, 不管是否消费,只有调用了connect, 数据就一直在发射,不受消费影响 ('冷' 的Flowable 变成'热'的)
  • publish 将 普通 Flowable,变成 ConnectableFlowable ;
image

;

    ConnectableFlowable<String> f1 = Flowable.generate(() -> new BufferedReader(new InputStreamReader(System.in))
            , (reader, e) -> {
                while (true) {
                    String line = reader.readLine();
                    if (line == null || line.equalsIgnoreCase("exit")) {
                        break;
                    }
                    e.onNext(line);
                }
                e.onComplete();
            }).ofType(String.class).subscribeOn(Schedulers.io()).publish();

    TimeUnit.SECONDS.sleep(5);
    f1.connect(System.out::println);
    TimeUnit.SECONDS.sleep(5);
    f1.observeOn(Schedulers.newThread()).map(x -> "s0- " + x).subscribe(System.out::println);
    TimeUnit.SECONDS.sleep(5);
    f1.map(x -> "s1- " + x).subscribe(System.out::println);
    TimeUnit.SECONDS.sleep(50);

replay

replay 将Flowable变成 ConnectableFlowable, 在connect之后,确保每次消费都使用相同数据。

    java.util.function.Function<String, Consumer<Object>> m = s -> v -> System.out
            .println("[" + System.currentTimeMillis() / 100 + "] " + s + "-" + v);
    ConnectableFlowable<Long> f1 = Flowable.intervalRange(1, 100, 0, 1, TimeUnit.SECONDS)
            .onBackpressureBuffer().replay();
    m.apply("").accept("start");
    TimeUnit.SECONDS.sleep(5);
    f1.connect();
    TimeUnit.SECONDS.sleep(5);
    f1.subscribe(m.apply("o1"));

    TimeUnit.SECONDS.sleep(5);
    f1.subscribe(m.apply("o2"));
    TimeUnit.SECONDS.sleep(20);

cache

缓存功能,将Flowable进行缓存

    java.util.function.Function<String, Consumer<Object>> m = s -> v -> System.out
            .println("[" + System.currentTimeMillis() / 100 + "] " + s + "-" + v);

    Flowable<Path> f1 = Flowable.create((FlowableEmitter<Path> e) -> {
        Path dir = Paths.get("/home/clouder/berk/workspaces/cattle").toRealPath();
        try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(dir)) {
            Iterator<Path> iter = dirStream.iterator();
            while (iter.hasNext() && !e.isCancelled()) {
                Path path = iter.next();
                m.apply("-----create").accept(path);
                e.onNext(path);
            }
            e.onComplete();
        }
    }, BackpressureStrategy.BUFFER).cache();

    f1.count().subscribe(m.apply("count"));
    f1.filter(Files::isDirectory).subscribe(m.apply("filter"));

背压(Backpressure)问题

问题描述: 在rxjava中会经常遇到一种情况就是被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息。那么随之而来的就是如何处理这些未处理的消息。

如下示例: f1 比 f2 元素发射速度快一倍。而zip是按照发射顺序结合,所以出现f1的产生速度快于其消费速度,因此会有背压问题产生(当发射到一定数量时会有异常抛出)。

    Consumer<Object> consumer = v -> System.out.println("[" + System.currentTimeMillis() / 100 + "] " + v);
    Flowable<Long> f1 = Flowable.interval(100, TimeUnit.MILLISECONDS);
    Flowable<Long> f2 = Flowable.interval(200, TimeUnit.MILLISECONDS);

    Flowable<Long> f3 = Flowable.zip(f1, f2, (x, y) -> x * 10000 + y);
    f3.subscribe(consumer);

对于出现的背压问题: - Flowable默认队列大小为128,并且规范要求,所有的操作符强制支持背压。 - 通过操作节流(Throttling)相关操作(sample 、throttleLast、throttleFirst、throttleWithTimeout、debounce等)来改变Flowable的发射数率;

  • 通过设置缓冲区和窗口(buffer,window)操作,来缓存过剩的数据,然后发送特定数据。

  • 设置背压策略(onBackpressurebuffer & onBackpressureDrop & onBackpressureLatest)

RxJava 测试

RxJava2 支持test() 操作符,将Flowable转变为 TestSubscriber,从而支持多种断言操作。

    List<String> list = Arrays.asList(
            "orange", "blue", "red", "green", "yellow", "cyan", "purple");

    Flowable.fromIterable(list).subscribeOn(Schedulers.newThread()).sorted().test().assertValues(list.stream().sorted().toArray(String[]::new));
    Flowable.fromIterable(list).count().test().assertValue(Integer.valueOf(list.size()).longValue());
    List<String> out1 = Flowable.fromIterable(list).sorted().test().values();

Reference

  1. 响应式宣言.https://github.com/reactivemanifesto/reactivemanifesto/blob/master/README.zh-cn.md

  2. RxJava 2.0 Released with Support for Reactive Streams Specification. https://www.infoq.com/news/2016/11/rxjava-2-with-reactive-streams

  3. https://www.lightbend.com/blog/7-ways-washing-dishes-and-message-driven-reactive-systems

  4. Use reactive streams API to combine akka-streams with rxJava. http://www.smartjava.org/content/use-reactive-streams-api-combine-akka-streams-rxjava

  5. What's different in 2.0. https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0

  6. Learning Reactive Programming with Java8. https://github.com/zouzhberk/rxjava-study/raw/master/docs/LearningReactiveProgramming.pdf

相关文章

  • RxJava2讲解

    RxJava2 主要类关系图 如下图所示,为RxJava2中的主要类关系图,可清晰知道各响应式类的联系和区别。后面...

  • Rxjava2 方法讲解

    前言 关于RxJava介绍的文章已经很多,但是关于实战的教程却不尽人意,今天就从代码的角度分析一下RxJava O...

  • RxJava2笔记(二、事件取消流程)

    在上一篇文章RxJava2笔记(一、事件订阅流程)中,我们讲解了RxJava的事件订阅流程,本文我们将继续讲解Rx...

  • Rxjava2 操作符原理(2)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • Rxjava2 线程切换(3)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • Rxjava2 基本用法(1)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • Rxjava2 简析Flowable背压(4)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • 给初学者的RxJava2.0教程(二)

    Outline [TOC] 前言 上一节教程讲解了最基本的RxJava2的使用, 在本节中, 我们将学习RxJav...

  • RxJava2--Flowable与BackPress

    转载自:Rxjava2入门教程五:Flowable背压支持——对Flowable最全面而详细的讲解 背压介绍 当上...

  • Rxjava2-小白入门(二)

    前言 上篇文章我们主要讲解了观察者模式。那么这节课我们主要讲解Rxjava2的基本使用和操作符。其实网上的关于Rx...

网友评论

    本文标题:RxJava2讲解

    本文链接:https://www.haomeiwen.com/subject/ahwtpqtx.html