美文网首页Android 开发相关文章收集
rxjava 如何和传统回调函数结合

rxjava 如何和传统回调函数结合

作者: 017830d97951 | 来源:发表于2016-11-20 02:13 被阅读0次

    今天看到一个 Observable.fromEmitter 的函数,这里是这个函数的 javadoc

    Provides an API (via a cold Observable) that bridges the reactive world with the callback-style, generally non-backpressured world.
    Example:
    You should call the AsyncEmitter's onNext, onError and onCompleted methods in a serialized fashion. The rest of its methods are thread-safe.

    Observable.<Event>fromEmitter(emitter -> {
    Callback listener = new Callback() {
    @Override
    public void onEvent(Event e) {
    emitter.onNext(e);
    if (e.isLast()) {
    emitter.onCompleted();
    }
    }
    
    @Override
    public void onFailure(Exception e) {
    emitter.onError(e);
    }
    };
    
    AutoCloseable c = api.someMethod(listener);
    
    emitter.setCancellation(c::close);
    
    }, BackpressureMode.BUFFER);
    

    这是一个实验性的功能,用于和传统回调模式的程序对接。为了理解这个机制,我写了一个完整的例子。

    package org.wcy123.rxjava1;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.stream.IntStream;
    
    import org.junit.Test;
    
    import lombok.extern.slf4j.Slf4j;
    import rx.AsyncEmitter;
    import rx.Observable;
    
    @Slf4j
    public class FromEmitterTest {
    
        @Test
        public void main1() throws Exception {
            final ExecutorService service = Executors.newCachedThreadPool();
            final CountDownLatch latch = new CountDownLatch(3 * 4);
            Observable.fromEmitter(
                     emitter -> IntStream.range(0, 3).boxed().forEach(
                            threadIndex -> service.submit(
                                    () -> {
                                        for (int i = 0; i < 4; ++i) {
                                            emitter.onNext("thread + " + threadIndex
                                                    + " i = " + i);
                                            Utils.sleep(1000);
                                            latch.countDown();
                                        }
                                        if (threadIndex == 2) {
                                            emitter.onCompleted();
                                        }
                                    })),
                    AsyncEmitter.BackpressureMode.BUFFER)
                    .subscribe(s -> log.info("item {}", s));
            log.info("提前打印这里, subscribe 没有阻塞住");
            log.info("开始等待解锁");
            latch.await();
            log.info("解锁完毕");
        }
    }
    

    这个例子的执行结果是

    02:12:24.244 [main] INFO org.wcy123.rxjava1.FromEmitterTest - 提前打印这里, subscribe 没有阻塞住
    02:12:24.244 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 0
    02:12:24.250 [main] INFO org.wcy123.rxjava1.FromEmitterTest - 开始等待解锁
    02:12:24.251 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 1 i = 0
    02:12:24.251 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 0
    02:12:25.245 [pool-1-thread-2] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 1
    02:12:25.255 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 1
    02:12:26.248 [pool-1-thread-3] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 2
    02:12:26.249 [pool-1-thread-3] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 1 i = 2
    02:12:26.257 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 2
    02:12:27.252 [pool-1-thread-2] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 2 i = 3
    02:12:27.258 [pool-1-thread-1] INFO org.wcy123.rxjava1.FromEmitterTest - item thread + 0 i = 3
    02:12:28.264 [main] INFO org.wcy123.rxjava1.FromEmitterTest - 解锁完毕
    

    注意到 log.info("item") 是运行在三个不同的线程中。fromEmitter 的第一个参数是一个函数,即 f, 该函数的第一个参数是 emitter ,类型是 AsyncEmitter。fromEmitter 返回一个 Observable , 这个Obverable 被订阅的时候,就会运行函数 f 。f 运行时,创建了 3 个线程,每个线程里面,都会调用 emitter 来发布数据,emitter.onNext(...) ,一旦调用这个函数,会触发后面所有的 Observable 定义的行为,触发 s->log.info("iterm{}", s) 的执行。

    注意到 thread 0 并没有机会打印出来最后一个 i = 3, 因为 thread 2 提前调用了 emitter.onComplet()

    latch.await() 等待所有线程结束。

    相关文章

      网友评论

        本文标题:rxjava 如何和传统回调函数结合

        本文链接:https://www.haomeiwen.com/subject/dgwepttx.html