今天看到一个 Observable.fromEmitter 的函数,这里是这个函数的 javadoc
Provides an API (via a cold Observable) that bridges the reactive world with the callback-style, generally non-backpressured world.
Example:
You should call the AsyncEmitter's onNext, onError and onCompleted methods in a serialized fashion. The rest of its methods are thread-safe.
Observable.<Event>fromEmitter(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onNext(e);
if (e.isLast()) {
emitter.onCompleted();
}
}
@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};
AutoCloseable c = api.someMethod(listener);
emitter.setCancellation(c::close);
}, BackpressureMode.BUFFER);
这是一个实验性的功能,用于和传统回调模式的程序对接。为了理解这个机制,我写了一个完整的例子。
package org.wcy123.rxjava1;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import org.junit.Test;
import lombok.extern.slf4j.Slf4j;
import rx.AsyncEmitter;
import rx.Observable;
@Slf4j
public class FromEmitterTest {
@Test
public void main1() throws Exception {
final ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch latch = new CountDownLatch(3 * 4);
Observable.fromEmitter(
emitter -> IntStream.range(0, 3).boxed().forEach(
threadIndex -> service.submit(
() -> {
for (int i = 0; i < 4; ++i) {
emitter.onNext("thread + " + threadIndex
+ " i = " + i);
Utils.sleep(1000);
latch.countDown();
}
if (threadIndex == 2) {
emitter.onCompleted();
}
})),
AsyncEmitter.BackpressureMode.BUFFER)
.subscribe(s -> log.info("item {}", s));
log.info("提前打印这里, subscribe 没有阻塞住");
log.info("开始等待解锁");
latch.await();
log.info("解锁完毕");
}
}
这个例子的执行结果是
02:12:24.244 [main] INFO org.wcy123.rxjava1.FromEmitterTest - 提前打印这里, subscribe 没有阻塞住
02:12:24.244 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 0
02:12:24.250 [main] INFO org.wcy123.rxjava1.FromEmitterTest - 开始等待解锁
02:12:24.251 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 1 i = 0
02:12:24.251 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 0
02:12:25.245 [pool-1-thread-2] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 1
02:12:25.255 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 1
02:12:26.248 [pool-1-thread-3] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 2
02:12:26.249 [pool-1-thread-3] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 1 i = 2
02:12:26.257 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 2
02:12:27.252 [pool-1-thread-2] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 3
02:12:27.258 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 3
02:12:28.264 [main] INFO org.wcy123.rxjava1.FromEmitterTest - 解锁完毕
注意到 log.info("item")
是运行在三个不同的线程中。fromEmitter 的第一个参数是一个函数,即 f, 该函数的第一个参数是 emitter ,类型是 AsyncEmitter。fromEmitter 返回一个 Observable , 这个Obverable 被订阅的时候,就会运行函数 f 。f 运行时,创建了 3 个线程,每个线程里面,都会调用 emitter
来发布数据,emitter.onNext(...) ,一旦调用这个函数,会触发后面所有的 Observable 定义的行为,触发 s->log.info("iterm{}", s)
的执行。
注意到 thread 0 并没有机会打印出来最后一个 i = 3, 因为 thread 2 提前调用了 emitter.onComplet()
latch.await()
等待所有线程结束。
网友评论