该章节说明可连接的Obseravble的子类和它的操作符。
- ConnectableObservable.connect( )— 指示一个可连接的Obseravble开始发射item。
- Observable.publish( )— 表示一个Obseravble是可连接的
- Observable.replay( )— 确保所有的订阅者看到相同的发射item的序列,即使他们在Observable开始发射item之后订阅。
- ConnectableObservable.refCount( )— 让一个可连接的Obseravble表现的像一个普通的Obseravble
一个可连接的Obseravble与一个普通的Obseravble相似,除了它不会在被订阅后发射item,而是在connect()被调用后。 在这种方式下你可以在该Obseravble发射item之前等待所有打算订阅Observable的Subscriber 。
image.png如下例子代码展示了两个观察者如何订阅同一个Obseravble。第一个例子,他们订阅了一个普通Obseravble。 第二个例子,他们订阅两个可连接的Observable,该Observable仅仅在两个订阅者订阅之后才开始连接。 注意输出结果的不同。
Example #1:
def firstMillion = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS);
firstMillion.subscribe(
{ println("Subscriber #1:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #1 complete"); } // onCompleted
);
firstMillion.subscribe(
{ println("Subscriber #2:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #2 complete"); } // onCompleted
);
Subscriber #1:211128
Subscriber #1:411633
Subscriber #1:629605
Subscriber #1:841903
Sequence #1 complete
Subscriber #2:244776
Subscriber #2:431416
Subscriber #2:621647
Subscriber #2:826996
Sequence #2 complete
Example #2:
def firstMillion = Observable.range( 1, 1000000 ).sample(7, java.util.concurrent.TimeUnit.MILLISECONDS).publish();
firstMillion.subscribe(
{ println("Subscriber #1:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #1 complete"); } // onCompleted
);
firstMillion.subscribe(
{ println("Subscriber #2:" + it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence #2 complete"); } // onCompleted
);
firstMillion.connect();
Subscriber #2:208683
Subscriber #1:208683
Subscriber #2:432509
Subscriber #1:432509
Subscriber #2:644270
Subscriber #1:644270
Subscriber #2:887885
Subscriber #1:887885
Sequence #2 complete
Sequence #1 complete
另请参阅:
网友评论