美文网首页
RxJava之连接操作符介绍

RxJava之连接操作符介绍

作者: 103style | 来源:发表于2019-06-26 14:11 被阅读0次

转载请以链接形式标明出处:
本文出自:103style的博客

连接相关的操作符 以及 官方介绍

RxJava连接操作符 官方介绍 :Connectable Observable Operators

  • ConnectableObservable.connect( )

    instructs a Connectable Observable to begin emitting items
    指示Connectable Observable开始发出项目

  • Observable.publish( )

    represents an Observable as a Connectable Observable
    Observable表示为可连接的Observable

  • Observable.replay( )

    ensures that all Subscribers see the same sequence of emitted items, even if they subscribe after the Observable begins emitting the items
    确保所有订阅者都看到相同的发射项目序列,即使他们在Observable开始发布项目后订阅

  • ConnectableObservable.refCount( )

    makes a Connectable Observable behave like an ordinary Observable
    使Connectable Observable的行为类似于普通的Observable


示例:

非连接操作
ConnectableObservable firstMillion = Observable.range(1, 1000000)
        .sample(7, TimeUnit.MILLISECONDS)
        .publish();

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #1:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #1 complete");
    }
});

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #2:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #2 complete");
    }
});

输出:

Subscriber #1:391999
Sequence #1 complete
Subscriber #2:556663
Sequence #2 complete

publish and connect

官方示例:

ConnectableObservable firstMillion = Observable.range(1, 1000000)
        .sample(7, TimeUnit.MILLISECONDS)
        .publish();

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #1:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #1 complete");
    }
});

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #2:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #2 complete");
    }
});

firstMillion.connect();

输出:

Subscriber #1:984513
Subscriber #2:984513
Sequence #1 complete
Sequence #2 complete

publish and refCount

官方示例:

ConnectableObservable firstMillion = Observable.range(1, 1000000)
        .sample(7, TimeUnit.MILLISECONDS)
        .publish();

firstMillion.refCount().subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #1:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #1 complete");
    }
});

firstMillion.refCount().subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #2:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #2 complete");
    }
});

输出:

Subscriber #1:438899
Sequence #1 complete
Subscriber #2:684698
Sequence #2 complete

以上

相关文章

  • RxJava之连接操作符介绍

    转载请以链接形式标明出处:本文出自:103style的博客 连接相关的操作符 以及 官方介绍 RxJava 之 连...

  • RxJava详解之操作符执行原理(五)

    RxJava详解之操作符执行原理(五) 上一篇文章介绍了RxJava的执行原理。下面继续介绍一下操作符的执行原理,...

  • RxJava 操作符第二波

    RxJava操作符第二波啦,上篇RxJava 操作符第一波和本篇都只是简单介绍rxjava操作符的使用,有哪里写的...

  • RxJava操作符系列四

    RxJava操作符系列传送门 RxJava操作符源码RxJava操作符系列一RxJava操作符系列二RxJava操...

  • RxJava操作符系列五

    RxJava操作符系列传送门 RxJava操作符源码RxJava操作符系列一RxJava操作符系列二RxJava操...

  • RxJava操作符系列六

    RxJava操作符系列传送门 RxJava操作符源码RxJava操作符系列一RxJava操作符系列二RxJava操...

  • RxJava操作符系列二

    RxJava操作符源码传送门 在上篇文章RxJava操作符系列一我们介绍的操作符几乎都是创建被观察者的操作符,那么...

  • Android拾萃 - RxJava2之变换操作符及其demo

    Android拾萃 - RxJava2操作符汇总Android拾萃 - RxJava2之创建操作符及其demo 一...

  • Android Rxjava系列(简介)

    RxJava系列1(简介) RxJava系列2(基本概念及使用介绍) RxJava系列3(转换操作符) RxJav...

  • RxJava操作符系列三

    RxJava操作符系列传送门 RxJava操作符源码RxJava操作符系列一RxJava操作符系列二 前言 在之前...

网友评论

      本文标题:RxJava之连接操作符介绍

      本文链接:https://www.haomeiwen.com/subject/podazqtx.html