响应式编程入门之 Project Reactor

作者: 殷天文 | 来源:发表于2021-11-29 22:05 被阅读0次

    本文目标

    • 理解响应式编程

    前言

    之前的《聊聊 IO 多路复用》中,我们理解了非阻塞 IO 的意义。但是 Spring MVC 并不能完美的应用非阻塞编程,于是 Spring 团队开发了 WebFlux,而 WebFlux 的基础正是本文要讲到的 Project Reactor(下文简称为 Reactor)

    本文以 Reactor 为例带大家入门响应式编程

    版本

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.4.6</version>
        </dependency>
    

    什么是 Reactor

    Reactor 是 JVM 的非阻塞响应式编程基础,支持背压。 它直接与 Java 8 函数式 API 集成,特别是 CompletableFuture、Stream 和 Duration。 它提供了可组合的异步序列 API — Flux(用于 [N] 个元素)和 Mono(用于 [0|1] 个元素),并实现了 Reactive Streams 规范。
    在 Reactor 的基础上还演化出了适合微服务架构的 Reactor Netty 。为 HTTP(包括 Websockets)、TCP 和 UDP 提供支持背压和响应式的网络引擎。

    上面是对于官方文档的翻译。下面来说说我自己对 Reactor 和响应式编程的理解。

    回想一下之前的非阻塞 IO 编程,例如我们现在要用非阻塞的方式调用一个远程服务,当远程接口数据可用时去做一些业务处理。这时候代码怎么写呢?我们需要提供一个回调函数,然后在响应就绪的时候,去调用我们的回调函数。

    从逻辑上来看,这完全没有问题。但是如果我们的回调很复杂,代码看起来会是什么样呢?

    // 以下案例来自 Reactor 官网
    userService.getFavorites(userId, new Callback<List<String>>() { 
      public void onSuccess(List<String> list) { 
        if (list.isEmpty()) { 
          suggestionService.getSuggestions(new Callback<List<Favorite>>() {
            public void onSuccess(List<Favorite> list) { 
              UiUtils.submitOnUiThread(() -> { 
                list.stream()
                    .limit(5)
                    .forEach(uiList::show); 
                });
            }
    
            public void onError(Throwable error) { 
              UiUtils.errorPopup(error);
            }
          });
        } else {
          list.stream() 
              .limit(5)
              .forEach(favId -> favoriteService.getDetails(favId, 
                new Callback<Favorite>() {
                  public void onSuccess(Favorite details) {
                    UiUtils.submitOnUiThread(() -> uiList.show(details));
                  }
    
                  public void onError(Throwable error) {
                    UiUtils.errorPopup(error);
                  }
                }
              ));
        }
      }
    
      public void onError(Throwable error) {
        UiUtils.errorPopup(error);
      }
    });
    

    这个代码说实话已经有点回调地狱那味儿了,让一段不是很复杂的逻辑变得很难读了。但是如果用 Reactor 写呢?

    // 以下案例来自 Reactor 官网
    userService.getFavorites(userId) 
               .flatMap(favoriteService::getDetails) 
               .switchIfEmpty(suggestionService.getSuggestions()) 
               .take(5) 
               .publishOn(UiUtils.uiThreadScheduler()) 
               .subscribe(uiList::show, UiUtils::errorPopup);
    

    可以看到,代码变得非常的简洁。唯一带来的困扰就是,我们不知道这些函数到底是啥意思 😂

    响应式编程虽然有非常多的特性,但是它并不是什么神奇的技术,它也是建立在传统命令式编程的基础上。只不过它所提供的 API 以及规范更适合在非阻塞 IO 中使用。虽然在非阻塞 IO 框架中几乎只使用响应式编程(Vertx,WebFlux),只是因为这样做更合适,并不是说没了响应式编程,就玩不了非阻塞 IO 了。

    响应式编程内幕

    Reactor 实现了 org.reactivestreams 提供的 Java 响应式编程规范,我们只要了解 reactivestreams 中代码是如何运转的,再看 Reactor 相关的代码就容易多了。

    下图展示了 reactivestreams 中的核心接口

    reactivestreams 核心接口
    • Publisher:发布者

    • Subscriber:订阅者

    • Subscription:这个单词中文翻译为名词的订阅,在代码中它是发布者和订阅者之间的媒介

    • Processor:该接口继承了发布者和订阅者,可以理解为发布者和订阅者的中间操作(但是 Reactor 的中间操作并没有实现 Processor,在最新版本的 Reactor 中,Processor 的相关实现接口已经被弃用)

    在了解了响应式编程的核心接口之后,我们来看下响应式编程是如何运作的

    响应式编程执行逻辑

    在 Reactor 中大部分实现都是按照上图的逻辑来执行的

    1. 首先是Subscriber(订阅者)主动订阅 Publisher(发布者),通过调用 Publisher 的 subscribe 方法
    2. Publisher 在向下游发送数据之前,会先调用 Subscriber 的 onSubscribe 方法,传递的参数为 Subscription(订阅媒介)
    3. Subscriber 通过 Subscription#request 来请求数据,或者 Subscription#cancel 来取消数据发布(这就是响应式编程中的背压,订阅者可以控制数据发布)
    4. Subscription 在接收到订阅者的调用后,通过 Subscriber#onNext 向下游订阅者传递数据。
    5. 在数据发布完成后,调用 Subscriber#onComplete 结束本次流,如果数据发布或者处理遇到错误会调用 Subscriber#onError

    调用 Subscriber#onNext,onComplete,onError 这三个方法,可能是在 Publisher 中做的,也可能是在 Subscription 中做的,根据不同的场景有不同的实现方式,并没有什么严格的要求。可以认为 Publisher 和 Subscription 共同配合完成了数据发布

    其实 Reactor 中 API 实现原理也都是这个套路,我这边也自己写了个例子便于让读者加深对响应式编程的理解

    import org.reactivestreams.Publisher;
    import org.reactivestreams.Subscriber;
    import org.reactivestreams.Subscription;
    
    /**
     * @author tianwen.yin
     */
    public class SimpleReactiveStream {
    
        /**
         * 实现一个简单的响应式编程发布者
         * 逻辑:当订阅者发起订阅时,像下游发送一个 HelloWorld,发布逻辑由 SimpleSubscription 完成
         */
        static class SimplePublisher implements Publisher {
            @Override
            public void subscribe(Subscriber s) {
                // 2. Publisher 发布数据之前,调用 Subscriber 的 onSubscribe
                s.onSubscribe(new SimpleSubscription(data(), s));
            }
    
            private String data() {
                return "Hello World";
            }
        }
    
        static class SimpleSubscriber implements Subscriber {
            @Override
            public void onSubscribe(Subscription s) {
                // 3. Subscriber 通过 Subscription#request 来请求数据
                // 或者 Subscription#cancel 来取消数据发布
                s.request(Long.MAX_VALUE);
            }
    
            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }
    
            @Override
            public void onError(Throwable t) {
                System.out.println("error");
            }
    
            @Override
            public void onComplete() {
                System.out.println("complete");
            }
        }
    
        static class SimpleSubscription implements Subscription {
            String data;
            Subscriber actual;
            boolean isCanceled;
    
            public SimpleSubscription(String data, Subscriber actual) {
                this.data = data;
                this.actual = actual;
            }
    
            @Override
            public void request(long n) {
                if (!isCanceled) {
                    try {
                        // 4. Subscription 在接收到订阅者的调用后
                        // 通过 Subscriber#onNext 向下游订阅者传递数据
                        actual.onNext(data);
                        // 5. 在数据发布完成后,调用 Subscriber#onComplete 结束本次流
                        actual.onComplete();
                    } catch (Exception e) {
                        // 如果数据发布或者处理遇到错误会调用 Subscriber#onError
                        actual.onError(e);
                    }
                }
            }
    
            @Override
            public void cancel() {
                isCanceled = true;
            }
        }
    
        public static void main(String[] args) {
            // 1. Subscriber ”订阅“ Publisher
            new SimplePublisher().subscribe(new SimpleSubscriber());
        }
    
    }
    
    

    响应式编程思想

    响应式编程,就像装配一条流水线。Publisher 规定了数据如何生产,中间会有 Operators(操作符)对流水线的数据进行解析,校验,转换等等操作,最终处理好的数据流转到 Subscriber。

    image.png

    这条流水线还有一个特点。大部分情况下当 Publisher 的 subscribe 方法被调用之前,什么都不会发生。在被订阅之前我们只是在定义流水线该如何工作,直到真正有人需要的时候,流水线才会启动。

    Reactor 中的 Operator

    Operators 怎么理解呢?对于上游来说,Operators 像一个订阅者,而对于它的下游来说,它像一个发布者(我们上文说过了 Reactor 中的中间操作并没有实现 Processor 接口)

        Mono.just("hello")
                .map(a -> a + "world")
                .subscribe(System.out::println);
    

    举个简单的例子,在上面的代码中,map 就是一个 Operator,它的实现思路是什么?来看下面的代码

        // 注意,这是我基于 Reactor API 实现的伪代码!
        public static class MonoMap implements Publisher {
            // 我们自定义的转换逻辑
            private Function mapper;
            // source 代表当前操作符的上游发布者
            private Publisher source;
    
            public MonoMap(Publisher source, Function mapper) {
                this.source = source;
                this.mapper = mapper;
            }
    
            @Override
            public void subscribe(Subscriber actual) {
                source.subscribe(new MonoMapSubscriber(mapper, actual));
            }
        }
    
        public static class MonoMapSubscriber implements Subscriber {
            // 我们自定义的转换逻辑
            private Function mapper;
            // 真正的下游
            private Subscriber actual;
    
            public MonoMapSubscriber(Function mapper, Subscriber actual) {
                this.mapper = mapper;
                this.actual = actual;
            }
    
            @Override
            public void onSubscribe(Subscription s) {
                actual.onSubscribe(s);
            }
    
            @Override
            public void onNext(Object o) {
                // 当上游数据发送过来时,先进行转换再发送给下游
                Object result = mapper.apply(o);
                actual.onNext(result);
            }
    
            @Override
            public void onError(Throwable t) {
                actual.onError(t);
            }
    
            @Override
            public void onComplete() {
                actual.onComplete();
            }
        }
    

    上述代码是我自己实现的一个伪代码,用于让大家理解操作符的实现思路,实际 Reactor 代码也是这个思路,只不过实现的更加巧妙和严谨

    我们首先来分析一下 Mono.just("hello").map(a -> a + "world") 这句话

    1. 当执行到 Mono.just 时,会新建一个 MonoJust 对象作为当前的 Publisher。该发布者的逻辑是,当订阅时,向下游发送数据 "hello"

    2. 当执行到 map 方法时,会新建一个 MonoMap 对象替作为当前的 Publisher,MonoJust 成为了 MonoMap 中的一个属性 source(实际的上游)

      • 当 MonoMap 被订阅时,会先将它的下游 actual 做一层包装,也就是我们上面的 MonoMapSubscriber。然后去调用 source 的 subscribe 方法。上游发布数据时,MonoMapSubscriber 先对数据进行转换(我们上面的拼接字符串操作),然后再发送给 actual(它的下游)

      • 当 MonoMap 被再次转换时,MonoMap 就变成了下游操作符的 source...

    最后通过一张图来总结一下

    Operator 实现原理

    Reactor 该如何学习

    本文并没有介绍太多 Reactor 的细节,因为这些东西实在是太多了。我想聊聊我自己是如何学习 Reactor 的

    如果你已经通过本文理解了响应式编程的核心接口是如何工作的了,那恭喜你已经迈向了成功的第一步了。接下来就是阅读官方文档,不断的练习和阅读 Reactor 的源码。源码追踪的方向已经很明确了,当我们想了解一个发布者的实现原理是什么,我就要去关注这个发布者的 subscribe 方法和 Subscription 都做了什么。想了解消费者的逻辑,就看它的 onNext,onComplete,onError。

    最后

    如果觉得我的文章对你有帮助,动动小手点下关注,你的支持是对我最大的帮助

    相关文章

      网友评论

        本文标题:响应式编程入门之 Project Reactor

        本文链接:https://www.haomeiwen.com/subject/eycaoltx.html