响应式编程小记

作者: 山东大葱哥 | 来源:发表于2019-06-15 09:36 被阅读8次

背景

同事在搭建nacos + spring-cloud-gateway-core 2.1.0.RELEASE 动态路由配置的过程中,发现spring cloud gateway不能删除路由。

问题分析

通过断点调试,使用了InMemoryRouteDefinitionRepository也就是内存级的动态路由管理,删除方法也就是delete,其中代码如下:

package org.springframework.cloud.gateway.route;

import java.util.LinkedHashMap;
import java.util.Map;

import org.springframework.cloud.gateway.support.NotFoundException;

import static java.util.Collections.synchronizedMap;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * @author Spencer Gibb
 */
public class InMemoryRouteDefinitionRepository implements RouteDefinitionRepository {

   private final Map<String, RouteDefinition> routes = synchronizedMap(new LinkedHashMap<String, RouteDefinition>());

   @Override
   public Mono<Void> save(Mono<RouteDefinition> route) {
      return route.flatMap( r -> {
         routes.put(r.getId(), r);
         return Mono.empty();
      });
   }

   @Override
   public Mono<Void> delete(Mono<String> routeId) {
      return routeId.flatMap(id -> {
         if (routes.containsKey(id)) {
            routes.remove(id);
            return Mono.empty();
         }
         return Mono.defer(() -> Mono.error(new NotFoundException("RouteDefinition not found: "+routeId)));
      });
   }

   @Override
   public Flux<RouteDefinition> getRouteDefinitions() {
      return Flux.fromIterable(routes.values());
   }
}

通过断点调试发现routes.remove(id);代码并没有执行。
相关调用代码如下:

this.routeDefinitionWriter.delete(Mono.just(id));

简化代码

通过代码了解到spring cloud gateway中使用了reactor(反应式编程)来实现异步的方法执行,所以初步判断既然是异步执行,肯定在会有一个时间差,是不是这个异步导致的看不到代码执行呢?通过反复尝试,发现不是时间的问题,不管等待多少时间,这个代码都不会执行,routes中都不会删除对应的键值。
为了更方便的测试把代码简化提取为如下:

public class MonoTest {
    private final Map<String, String> routes = synchronizedMap(new LinkedHashMap<String, String>());
    public static void main(String[] args) {
        MonoTest monoTest = new MonoTest();
        Map<String, String> routes = monoTest.routes;
        routes.put("123", "test11111");
        routes.put("561", "test11111");
        monoTest.delete(Mono.just("123"));
        System.out.println(monoTest.routes);
    }

    public void add(String routeId) {
        routes.put(routeId, "test11111");
    }

    public Mono<Void> delete(Mono<String> routeId) {
        return routeId.flatMap(id -> {
            System.out.println("准备进入判断语句 routes.containsKey(id)");
            if (routes.containsKey(id)) {
                System.out.println("routes.containsKey(id)");
                routes.remove(id);
                return Mono.empty();
            }
            return Mono.defer(() -> Mono.error(
                    new NotFoundException("RouteDefinition not found: " + routeId)));
        });
    }
}

需要在pom中引入reactor依赖:

  <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.1.4.RELEASE</version>
    </dependency>

Reactor相关知识

既然代码中有Mono这个鬼东西,所以需要对其进行了解。
Project Reactor(以下简称“Reactor”)与Spring是兄弟项目,侧重于Server端的响应式编程,主要 artifact 是 reactor-core,这是一个基于 Java 8 的实现了响应式流规范 (Reactive Streams specification)的响应式库。
Reactor中的发布者(Publisher)由FluxMono两个类定义,它们都提供了丰富的操作符(operator)。一个Flux对象代表一个包含0..N个元素的响应式序列,而一个Mono对象代表一个包含零/一个(0..1)元素的结果。

既然是“数据流”的发布者,Flux和Mono都可以发出三种“数据信号”:元素值、错误信号、完成信号,错误信号和完成信号都是终止信号,完成信号用于告知下游订阅者该数据流正常结束,错误信号终止数据流的同时将错误传递给下游订阅者。

  • Flux类型
    下图所示就是一个Flux类型的数据流,黑色箭头是时间轴。它连续发出“1” - “6”共6个元素值,以及一个完成信号(图中⑥后边的加粗竖线来表示),完成信号告知订阅者数据流已经结束。


    Flux类型
  • Mono类型
    下图所示是一个Mono类型的数据流,它发出一个元素值后,又发出一个完成信号。


    Mono类型

既然Flux具有发布一个数据元素的能力,为什么还要专门定义一个Mono类呢?举个例子,一个HTTP请求产生一个响应,所以对其进行“count”操作是没有多大意义的。表示这样一个结果的话,应该用Mono<HttpResponse>而不是 Flux<HttpResponse>,对于的操作通常只用于处理 0/1 个元素。它们从语义上就原生包含着元素个数的信息,从而避免了对Mono对象进行多元素场景下的处理。

  • just
    我们可以用如下代码声明上边两幅图所示的Flux和Mono:
Flux.just(1, 2, 3, 4, 5, 6);
Mono.just(1);
  • 三种信号 元素值、错误信号、完成信号
    这三种信号都不是一定要具备的:
    首先,错误信号和完成信号都是终止信号,二者不可能同时共存;
    如果没有发出任何一个元素值,而是直接发出完成/错误信号,表示这是一个空数据流;
    如果没有错误信号和完成信号,那么就是一个无限数据流。

  • 订阅前什么都不会发生

// 订阅并触发数据流
subscribe(); 
// 订阅并指定对正常数据元素如何处理
subscribe(Consumer<? super T> consumer); 
// 订阅并定义对正常数据元素和错误信号的处理
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 
// 订阅并定义对正常数据元素、错误信号和完成信号的处理
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 
// 订阅并定义对正常数据元素、错误信号和完成信号的处理,以及订阅发生时的处理逻辑
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); 

这里需要注意的一点是,Flux.just(1, 2, 3, 4, 5, 6)仅仅声明了这个数据流,此时数据元素并未发出,只有subscribe()方法调用的时候才会触发数据流。所以,订阅前什么都不会发生。

问题解决

看完上文就明白为什么没有删除了,订阅之前什么都不会发生,所以想要执行对应的方法,必须手动调用subscribe()方法。

 monoTest.delete(Mono.just("123")).subcribe();

详细学习reactor可以看下面的文章:
https://blog.51cto.com/liukang/2090191

相关文章

网友评论

    本文标题:响应式编程小记

    本文链接:https://www.haomeiwen.com/subject/jbjpfctx.html