美文网首页
(Summer vs Winter Observable)Rx

(Summer vs Winter Observable)Rx

作者: 叛逆的曾小砂 | 来源:发表于2017-05-19 18:38 被阅读57次

这个是Rx Observable和开发者对话的第六部,前面5部已经由掘金翻译计划翻译完毕,我也是看到前五篇,意犹未尽,所以找到第六部分,但是掘金计划还没翻译出来,所以我就自行看了一下,想翻译出来(但是很长很长),小弟英语水平比较低,如果有什么错误,请大家不要介怀,可以等“掘金翻译计划”出比较准确的翻译版,谢谢大家理解。之前五部分可以查看这里掘金翻译计划 后面的各部分链接我就不另外给链接了
原文:Continuation (Summer vs Winter Observable) of Dialogue between Rx Observable and a Developer (Me) [ Android RxJava2 ] ( What the hell is this ) Part6

WOW,我们有一天的时间学习新的东西让这一天过得非常棒的。
朋友们,希望你们过得不错,这是我们RxJava2 Android 的第六部分。在这部分,我们将继续和Rx进行对话,清楚一件重要的事情,基本上Summer vs Winter Observable 是 Hot (热)vs Cold(冷) 的意思
动机
动机和我在第一部分part1中分享给大家的一样。
介绍:
这部分没有介绍,因为这是我们上一篇准备要打算讲的,在开始之前,我们回顾一下上一篇的内容。在上一篇我们约见了Rx Observable,他给了我们一些学习Rx的建议,随后又给我们分享了一些创建Observable的方法,当我们准备谈到Hot and Cold Observable的时候,我们暂停了谈话
续集:
Observable:这就很多了,但我想我可以解释一下这两种Observables。第一种叫Cold Observable ,第二种叫Hot Observable。有时候开发者们经常称作Hot vs Cold Observable。这两个都是比较简单的概念,为此,我会通过几个小例子让你理解这个概念,然后我会教你怎么在代码里使用这些概念。最后,我想【Me】会给你提供一些例子,你觉得呢【Me】?
Me:必须的,我会尝试走在你前面,这样你就能看到我是对还是错了
Observable:哈哈,可以。那现在有多少人知道销售员里,哪些是超市里面最有价值的的,在超市门口又有谁试着通过一些口号去争取客户。
Me:我是这样想的,没有多少人知道像在巴基斯坦、印度等亚洲国家里面最有价值的文化是什么。你可以找来一些更加普遍的例子,好让人们都能更容易的抓住到这个概念。
Observable:可以,没问题啊,那有多少人知道咖啡店?
Me:我想所有人都知道。
Observable:很好,这里有两家咖啡店,分别叫做Cold Music 咖啡店和Hot Music 咖啡点。一个人他去Cold Music 咖啡店买了一杯咖啡,然后他可以坐在店里的任何地方,在咖啡店的座位里,提供这一些耳机,耳机里有个播放列表,播放这三首诗歌。非常好的是所有人都能带上这些耳机。而耳机里都是从第一首开始播放,如果有人期间放下再带上耳机,也是重新从第一首开始播放。同样,如果有人放下了耳机,就会停下播放。
与此相反,Hot Music咖啡点有一套完整的音乐系统,当你进入这咖啡店你就能来时听到诗歌,这是因为他们有一套很好音响设备和公放,他们也不限制歌曲,当第一个店员开门营业便开始这个播放。这是一个相对独立于客人外的系统,不管是谁进入这个咖啡店,他就会在当前时间点听到歌曲,但他不知道他进来之前已经有多少歌曲已经播放完了,这就等同于observables里面的概念.
就像ColdMusic咖啡店的耳机,cold observable 是懒惰的,就像一个耳机,在你需要用到的时候,才通过Observable.fromArray()或其他方法的去创建一个Observable。当你(订阅者)订阅这个Observable,你会开始接受数据就像一个人带上耳机,然后歌曲播放。现在订阅者(你)从Observable取消订阅,这样你就不能获取到任何新的数据,就像放下耳机停止播发歌曲。
最后的重点是ColdMusic咖啡店有很多耳机,但每个耳机只要带上就会开始播放。如果一个人已经听到了第二首,另外一个人带上另外一个耳机,他也会从第一首开始听。这就意味着每个人都会得到一个单独的歌曲播放列表。同理,如果我们又三个订阅者,他们同时订阅了Cold Observable,他们都会获得一个独立的数据流(stream),意味着观察者都能在订阅的时候单独调用onNext方法。在这里,我们可以说Cold Observables 就像耳机一样,依赖上面这些订阅者。
现在HotObservables 就像一间HotMusic 咖啡店的音乐系统,一旦咖啡店营业,音乐系统就不需要任何人照看开始播放歌曲。一直在播放歌曲,一旦有人进入店里,他就会开始听到那个时间点的歌曲。这样同样发生在Hot Observables,一但他们被创建,他们就开始发送数据,任何订阅者会订阅这个Observable 并开始接收这个时间点的数据,但他们获取不到旧的数据。这就意味着Hot Observable 是独立于订阅者,他们是不会介意任何之前的数据。无论什么时候,任何一个订阅者会都在她订阅的点上开始接收数据。我想我会通过代码用同样的例子说明,等下【Me】会给你一些真实的例子。
Cold Observable:

    public class HotVsCold {

    public static void main(String[] args) throws InterruptedException {

        List<String > poemsPlayList = Arrays.asList("Poem 1", "Poem 2", "Poem 3");
        Observable coldMusicCoffeCafe = Observable.fromArray(poemsPlayList);

        Consumer client1 = poem-> System.out.println(poem);
        Consumer client2 = poem-> System.out.println(poem);
        Consumer client3 = poem-> System.out.println(poem);
        Consumer client4 = poem-> System.out.println(poem);

        coldMusicCoffeCafe.subscribe(client1);
        coldMusicCoffeCafe.subscribe(client2);
        System.out.println(System.currentTimeMillis());
        Thread.sleep(2000);
        System.out.println(System.currentTimeMillis());
        coldMusicCoffeCafe.subscribe(client3);
        coldMusicCoffeCafe.subscribe(client4);
    }
}

这是一个很简单的例子的代码,我有4个客人和一个放在ColdMusic咖啡店的播放列表Observable,程序一开始,先是两个客户订阅cold Observabler,我等2秒钟,第三第四个客户订阅这cold observable ,然后最后当我们看到输出的时候,我们就能简单看到所有订阅者或者客户会获取所有开始到结束的歌曲

Output:

[Poem 1, Poem 2, Poem 3]

[Poem 1, Poem 2, Poem 3]

1494142518697

1494142520701

[Poem 1, Poem 2, Poem 3]

[Poem 1, Poem 2, Poem 3]

Hot Observable:

public static void main(String[] args) throws InterruptedException {

    Observable<Long> hotMusicCoffeeCafe = Observable.interval(1000, TimeUnit.MILLISECONDS);
    ConnectableObservable<Long> connectableObservable = hotMusicCoffeeCafe.publish();
    connectableObservable.connect(); //  Cafe open on this line and cafe boy start the system

    Consumer client1 = poem-> System.out.println("Client 1 poem"+poem);
    Consumer client2 = poem-> System.out.println("Client 2 poem"+poem);
    Consumer client3 = poem-> System.out.println("Client 3 poem"+poem);
    Consumer client4 = poem-> System.out.println("Client 4 poem"+poem);

    Thread.sleep(2000); // After two poems already played client 1 enter. So he should listens from poem 2.
    connectableObservable.subscribe(client1);
    Thread.sleep(1000); // Client two should start listening poem 3 
    connectableObservable.subscribe(client2);

    Thread.sleep(4000); // Client 3 and 4 enter will start from poem 9.
    connectableObservable.subscribe(client3);
    connectableObservable.subscribe(client4);

    while (true);
}

HotMusic咖啡点营业,服务业开启播放系统,当我们调用connect 方法时,歌曲如上所示开始播放,暂时不要只专注于connect方法,而是把握的概念。两首诗或秒之后的第一个客户进入咖啡馆,所以他将从诗2开始倾听。一秒后下一个顾客进入所以他开始从诗3开始倾听。在顾客2的4秒后,第三第四个顾客进入咖啡店,现在,他们开始听到诗9.你可以看到Hot Observable是依赖于订阅者的。一旦他开始发射数据,他不会理会任何人有没有订阅,在另一方面所有订阅者当他们订阅的时候获取到对应时间的数据,但他们获取不到已经发射了的历史和事件数据。
现在我已经感觉到你掌握了Hot vs Cold Observable 的概念了,是时候根据上面这些要点,看看怎么创建这些Observable。

Cold Observable:
1.所有Obesrvable 实际上都是Cold Observables,这就说明,如果我们使用Observable.create() 或者Observable.fromArray()等方法创建的Observable 都是Cold Observable.
2.所有订阅者当他订阅Cold Observable时,都会单独获取到完整的数据流
3.如果没有订阅者订阅,Cold Observable将不会做任何事,他们是懒惰的。
Hot Observable:
1.一旦Hot Observable 创建,他们就不理会所有订阅者。
2.当所有的订阅者在同一时间订阅一个Hot Observable时,将会获取到相同的数据。
Me:嗯,好的。你能告诉我们怎么将Cold Observable转化长Hot Observable.
Observable:可以,Cold 转变成Hot Observable是件相当容易的事

List<Integer> integers = new ArrayList<>();
Observable.range(0, 10000)
        .subscribe(count -> integers.add(count));

Observable<List<Integer>> listObservable = Observable.fromArray(integers);

现在上面的代码,listObservable 是一个Cold Observable,这时候看看我们怎么将这个Cold Observable 转变成Hot Observable.

Observable<List<Integer>> listObservable = Observable.fromArray(integers);
ConnectableObservable connectableObservable = listObservable.publish();

上面代码所示,我们通过使用publish()方法将我的Cold Observable转变成Hot Observable.我们可以说所有Cold Observable都能通过使用publish()方法将Cold 变成Hot ,并且返回一个当前还没开始发射数据的ConnectableObservable.这是一个有趣的事情,当observable调用publish()方法,就意味着所有订阅者将订阅这个Observable,并在订阅时共享这个时间点的相同数据。正如我们知道进入HotMusic咖啡店的每个人都进入了同一个歌曲数据播放流,不同的是当他们不同时间点获取到的数据不一样。现在有趣的一点是,如果任何订阅者订阅的是conenectbleObservable,他们将获取不了任何。可能你们会困惑,主要有两点,当我们调用publish()意味着这个Observable现在将发或射一组数据,或者说这个Observabel 有一组数据将要发射给所有的订阅者,但要开始发送数据需要我调用connect()方法,就如下面所示:

Observable<List<Integer>> listObservable = Observable.fromArray(integers);
ConnectableObservable connectableObservable = listObservable.publish();
connectableObservable.connect();

就这么简单的例子,记住publish()会将Cold转换成Hot 但不开始发送数据。发送数据我们需要调用connect()方法,当我调用ConnectableObservable的connect()方法时,不管有没有订阅者,还是成千上百的订阅者,数据都将开始发送。现在也有一些其他的方法在现实生活中真正有用的项目,如:refCount(),share(),replay(),但我暂时不说,现在我们做一个回顾,[Me]会给你们一个很好的例子。好让你们真的掌握这个概念。
Me:😯,这这例子比较长但还是简单的。
Observable:哈哈哈,没关系的[Me],我只需要通过这方法解释好让所有人都能把握。
Me:同意,所以现在我要给一个可能更有助于更准确地把握这一概念的例子。考虑现在我们有一个Observable的如下所示。

Observable<String> just = Observable.just("Hello guys");

现在有两个不同的订阅者,都订阅了这个Observabel

public class HotVsCold {
    public static void main(String[] args) {
        Observable<String> just = Observable.just("Hello guys");
        just.subscribe(s-> System.out.println(s));
        just.subscribe(s-> System.out.println(s));
    }
}

Output:
Hello guys
Hello guys
现在,我有个问题?这个Observable是Cold还是Hot,我想你已经知道这里没有publish(),所以是Cold.想象一下这是Observable是我从第三方库得到的,我们不知道这个Observable的类型。现在我需要一个新的例子让大家都清晰很多东西。

public static void main(String[] args) {
    Random random = new Random();
    Observable<Integer> just = Observable.create(source->source.onNext(random.nextInt()));
    just.subscribe(s-> System.out.println(s));
    just.subscribe(s-> System.out.println(s));
}

在这里我有一个随机值,同时用来审查程序输出,和讨论是Cold或Hot Observable?

Output:
1531768121
607951518

所示两个值是不同的,这就意味着这是个Cold Observabel,因为根据Cold Observable的定义,每次我获取到一个新的值,可见他们从来没有分享数据的。每次产生一个新的或新数据,通过简单的词语onNext()方法调用两次为两个不同的用户。
现在这是时候将同一个Cold变成Hot Observable。

public static void main(String[] args) {
    Random random = new Random();
    Observable<Integer> just = Observable.create(source->source.onNext(random.nextInt()));
    ConnectableObservable<Integer> publish = just.publish();
    publish.subscribe(s-> System.out.println(s));
    publish.subscribe(s-> System.out.println(s));
    publish.connect();
}

在解释上面代码之前,我们先看一下这输出的结果:
Output:
1926621976
1926621976
这时候两个订阅者获到相同的数据,这就表示这是一个Hot Observable,因为Hot Observable 总是从一个源发送的数据,简单来说我们只有在调用onNext()方法时获取一组数据。我等下准备解释一下publish()和connect()方法的调用。
当我调用publish()的时候,就意味着我这个Observble独立于订阅者,只分享给所有订阅者相同的数据源信息。简单来讲,Hot Observable给所有订阅者 同一个onNext()方法来调取数据。这里可能有一点困惑,我在两个订阅者订阅后调用connect(),是因为我想展示给你看Hot Observable 是独立的,应该由调用onNext时发送数据,我们知道只有当我们调用connect(),HotObservable 才开始发送数据。所以之前我们两个订阅者订阅,然后调用connect()方法,这样我们同时接受到相同的数据。现在我要给你一个相同的例子。

Random random = new Random();
Observable<Integer> just = Observable.create(source->source.onNext(random.nextInt()));
ConnectableObservable<Integer> publish = just.publish();
publish.connect();
publish.subscribe(s-> System.out.println(s));
publish.subscribe(s-> System.out.println(s))

这里有点不同,我在订阅者订阅之前调用connet.现在我们将会得到什么输出,有人能说得出来吗?
Output:

Process finished with exit code 0
是的,空的,你们困惑吗?好,我来解释一下,正如你看到的,我创建一个产生随机int 值的Observable ,只调用一次。当我创建的Cold通过调用publish()变成Hot.转换完后我调用connect().现在,因为这个是HotObservable ,而且我们知道这个是独立于订阅者的,所以开始发送随机数,而且我们只会生成一个随机数。当调用connect后,我们订阅者订阅,但这个时候,我们已经获取不到任何数据,因为Hot Observable已经发射了一个值了。我相信大家都明白的,现在我们可以添加日志观察内部的发射。这样我们就可以确认,我说的是真的。

public static void main(String[] args) {
    Random random = new Random();
    Observable<Integer> just = Observable.create(source -> {
                int value = random.nextInt();
                System.out.println("Emitted data: " + value);
                source.onNext(value);
            }
    );
    ConnectableObservable<Integer> publish = just.publish();
    publish.connect();
    publish.subscribe(s -> System.out.println(s));
    publish.subscribe(s -> System.out.println(s));
}

Output:

Emitted data: -690044789

Process finished with exit code 0
你可以看到上面的输出显示,我的HotObservable在调用connect()后开始发送数据,但订阅者订阅慢了。这就是为什么我们获取到空的值。现在我打算进入下一节前回顾一下;
1、所有的Observables 都默认是冷Observable.
2、Cold 转换成Hot,只需要调用publish(),然后会返回一个ConnectableObservable.这是一个并不马上发送数据的Hot Observable。
3、ConnectableObservable 开始发送数据需要我们调用connect()方法。
Observable:抱歉打断一下,进入下一节前,你可以写一个带时间间隔的HotObservable代码会更好一些。
Me:当然。

public static void main(String[] args) throws InterruptedException {
    Random random = new Random();
    Observable<Integer> just = Observable.create(
            source -> {
                Observable.interval(1000, TimeUnit.MILLISECONDS)
                        .subscribe(aLong -> {
                            int value = random.nextInt();
                            System.out.println("Emitted data: " + value);
                            source.onNext(value);
                        });
            }
    ); // Simple same Observable which we are using only I added a one thing now this will produce data after every one second.
    ConnectableObservable<Integer> publish = just.publish();
    publish.connect();

    Thread.sleep(2000); // Hot observable start emitting data and our new subscribers will subscribe after 2 second.
    publish.subscribe(s -> System.out.println(s));
    publish.subscribe(s -> System.out.println(s));

    while (true);

Output:

Emitted data: -918083931
Emitted data: 697720136
Emitted data: 416474929
416474929
416474929
Emitted data: -930074666
-930074666
-930074666
Emitted data: 1694552310
1694552310
1694552310
Emitted data: -61106201
-61106201
-61106201
现在我们可以清楚的看到, 根据定义我们对Hot进行了100%的讨论了。当Hot开始发送数据,我们获取到三个数据,但还没有订阅者。2秒后,我们两个订阅者订阅Hot,他们开始获得并相同的数字。
是时候到下一节。我们已经掌握了Cold 和Hot的概念。关于Hot的下一个节,我要解释的使用场景。
场景1:
我要一个HotObservable,任何订阅者订阅能获取所有之前的值,这HotObservable已经发出的值和所有的值应该是同步的。为了解决这种情况下我们有一个非常简单的方法。这叫replay()。只有我们需要调用该方法。

public static void main(String[] args) throws InterruptedException {

    Random random = new Random();
    Observable<Integer> just = Observable.create(
            source -> {
                Observable.interval(500, TimeUnit.MILLISECONDS)
                        .subscribe(aLong -> {
                            int value = random.nextInt();
                            System.out.println("Emitted data: " + value);
                            source.onNext(value);
                        });
            }
    );
    ConnectableObservable<Integer> publish = just.replay();
    publish.connect();

    Thread.sleep(2000);
    publish.subscribe(s -> System.out.println("Subscriber 1: "+s));
    publish.subscribe(s -> System.out.println("Subscriber 2: "+s));

    while (true);

}

Output:
Emitted data: -1320694608
Emitted data: -1198449126
Emitted data: -1728414877
Emitted data: -498499026
Subscriber 1: -1320694608
Subscriber 1: -1198449126
Subscriber 1: -1728414877
Subscriber 1: -498499026
Subscriber 2: -1320694608
Subscriber 2: -1198449126
Subscriber 2: -1728414877
Subscriber 2: -498499026
Emitted data: -1096683631
Subscriber 1: -1096683631
Subscriber 2: -1096683631
Emitted data: -268791291
Subscriber 1: -268791291
Subscriber 2: -268791291
在这里如果你一起查看我们输出和代码。你能比较简单掌握Hot Observable的replay()概念。首先,我们创建HotObservable后开始发送数据。2秒钟,我们第一第二个订阅者订阅这个Hot Observable,但这时候,我们已经发送了4个值。所以我们能看到输出,我们的订阅者先是获取到已经发送了的值,随后他们同步获取到Hot Observable 发送的值。
场景2:
我要一个Hot Observable,当第一个订阅者订阅时开始数据发射,并且在所有订订阅者取消订阅时取消发射。这一个是很简单实现的。

public static void main(String[] args) throws InterruptedException {

    Observable<Long> observable = Observable.interval(500, TimeUnit.MILLISECONDS).publish().refCount();

    Consumer<Long > firstSubscriber = s -> System.out.println("Subscriber 1: "+s);
    Consumer<Long > secondSubscriber = s -> System.out.println("Subscriber 2: "+s);

    Disposable subscribe1 = observable.subscribe(firstSubscriber);
    Disposable subscribe2 = observable.subscribe(secondSubscriber);

    Thread.sleep(2000);
    subscribe1.dispose();
    Thread.sleep(2000);
    subscribe2.dispose();

    Consumer<Long > thirdSubscriber = s -> System.out.println("Subscriber 3: "+s);
    Disposable subscribe3 = observable.subscribe(thirdSubscriber);

    Thread.sleep(2000);
    subscribe3.dispose();

    while (true);
}

Output:
Subscriber 1: 0
Subscriber 2: 0
Subscriber 1: 1
Subscriber 2: 1
Subscriber 1: 2
Subscriber 2: 2
Subscriber 1: 3
Subscriber 2: 3
Subscriber 2: 4
Subscriber 2: 5
Subscriber 2: 6
Subscriber 2: 7
Subscriber 3: 0
Subscriber 3: 1
Subscriber 3: 2
首先也是最重要的一点,这里的Observable 是一个Hot Observable,但只有在第一个订阅者订阅后才发射数据。并在所有订阅者取消订阅的时候停止发射。可以从上面的输出。当首先两个订阅者订阅HotObservable,开始发送数据,然后一个订阅者取消,但Hot Observable 没有停止,因为当前还有一个订阅者订阅着但随后就取消订阅,所以Hot Observable停止发送数据。然后第三个订阅者在2秒之后订阅同一个Hot Observable,这时候Observable 又开始发送数据,但不是从0这个点而是之前停止的那个点。
Observable :哇,你真是让我[Me]惊喜,你通过一个好方式解释这个概念。
Me:谢谢,Observable
Observable:现在你还有其他问题吗?
Me:当然有啦,你可以告诉我Subject和像Publish,Behaviour等不同subject的概念吗?
Observable :恩,在这个概念,我有种感想。我会告诉你关于Observer 的API,他们是怎么工作的,在不使用一个完整的Observer接口下你要怎么使用Lambda or Functioal 接口。你怎么看?
Me:当然可以,我随你。
Observable:正如我们知道的Observables,Observe有一个概念,我们已经使用在很多例子……。
Conclusion(总结):
朋友,这个对话非常长,我需要暂停在这里,否则这篇文章会像一本很厚的书,可能是好,但就会失去主要目的。我想我们都应该学习和几乎什么都知道。所以我打算在这里暂停对话,我会在下一部份继续对话。如果可以的话,试着把你真实的项目和重构这些实践。最后我想说谢谢RxObservable,是他给我很多他/她的时间。

Happy Weekend Friends Bye.

‘’我翻译的时候,没想到第七部分已经出来了
我也要去继续学习啦 by 小砂‘’

Next post part7 (Continuation (Observable Marriage Proposal to Observer) of Dialogue between Rx Observable and a Developer (Me) [ Android RxJava2 ] ( What the hell is this ))

相关文章

网友评论

      本文标题:(Summer vs Winter Observable)Rx

      本文链接:https://www.haomeiwen.com/subject/rrktxxtx.html