美文网首页
Gateway源码剖析

Gateway源码剖析

作者: 王侦 | 来源:发表于2023-03-17 16:21 被阅读0次

在微服务架构中,各个服务都是独立运行来完成某个特定领域的功能,服务间通过 REST API 或 RPC 进行通信。当前端一个请求发生时,比如查看商品详情,客户端可能要调用商品服务、库存服务、评价服务等多个微服务,如果客户端直接对接各个微服务,在复杂的调用过程中存在的问题:

  • 客户端需要发起多次请求,增加网络通信成本和客户端处理的复杂性
  • 服务的鉴权会分布在每个微服务中,存在重复鉴权
  • 微服务提供的接口协议不同(REST、RPC),客户端要对不同的协议进行适配

网关的出现可以解决这些问题,网关是微服务架构体系对外提供能力的统一接口,本质上是对请求进行转发、前置和后置的过滤:

  • 对请求进行一次性鉴权、限流、熔断、日志
  • 统一协议(常见 HTTP),屏蔽后端多种不同的协议
  • 统一错误码处理
  • 请求转发,实现内外网的隔离
  • 通过请求分发规则,实现灰度发布

常见的 API 网关实现方案:OpenResty(Nginx+lua)、Zuul(Netflix)、Gateway(Spring)、Kong。

Spring Cloud 已经有了 Zuul,Spring 为什么重新研发了 Gateway?

  • 替代 Zuul 提供简单高效的 API 网关
  • Zuul 1.x 采用 thread per connection 方式处理请求(每个请求一个线程进行处理),一旦服务响应慢,线程会被阻塞不释放,性能有一定瓶颈
  • 虽然 Zuul 2.x 是适合高并发的版本,但是在 Zuul 2.x 开源前 Spring 团队启动了 Gateway

几个重要组成部分:

  • 路由(Route),网关的基本组件,由 ID、目标 URI、Predicate 集合和 Filter 集合组成。
  • Predicate,Java 8 引入的函数式接口,提供断言(assert)功能,可以匹配 HTTP 请求中的任何内容,如果 Predicate 集合判断结果是 true,表示请求会由该 Route 进行转发。
  • Filter,为请求提供前置(pre)和后置(post)过滤。

0.自动配置类

GatewayAutoConfiguration

  • PropertiesRouteDefinitionLocator,从配置文件读取路由配置信息
  • RouteDefinitionRouteLocator,将RouteDefinition转换为Route
  • RoutePredicateHandlerMapping,Gateway实现的HandlerMapping,匹配请求对应的路由Route,并返回FilteringWebHandler
  • GatewayProperties,属性配置类
  • GlobalFilter,包括ForwardedHeadersFilter,RemoveHopByHopHeadersFilter,XForwardedHeadersFilter,AdaptCachedBodyGlobalFilter,RemoveCachedBodyFilter,RouteToRequestUrlFilter,ForwardRoutingFilter,ForwardPathFilter,WeightCalculatorWebFilter,WebsocketRoutingFilter
  • RoutePredicateFactory,包括AfterRoutePredicateFactory,BeforeRoutePredicateFactory,BetweenRoutePredicateFactory,CookieRoutePredicateFactory,HeaderRoutePredicateFactory,HostRoutePredicateFactory,MethodRoutePredicateFactory,PathRoutePredicateFactory,QueryRoutePredicateFactory,ReadBodyRoutePredicateFactory,RemoteAddrRoutePredicateFactory等等
  • GatewayFilterFactory,包括AddRequestHeaderGatewayFilterFactory,MapRequestHeaderGatewayFilterFactory,AddRequestParameterGatewayFilterFactory,AddResponseHeaderGatewayFilterFactory,ModifyRequestBodyGatewayFilterFactory等等
  • NettyConfiguration
    HttpClient,基于Netty实现的HttpClient
    NettyRoutingFilter,请求下游微服务
    NettyWriteResponseFilter
    ReactorNettyWebSocketClient

GatewayReactiveLoadBalancerClientAutoConfiguration

  • @ConditionalOnClass({LoadBalancerClient.class, ReactiveLoadBalancer.class, LoadBalancerAutoConfiguration.class, DispatcherHandler.class})
  • ReactiveLoadBalancerClientFilter
  • OnNoRibbonDefaultCondition

GatewayLoadBalancerClientAutoConfiguration

  • LoadBalancerClientFilter,根据lb://前缀过滤处理,使用serviceId选择一个服务实例,从而实现负载均衡

1.NettyWebServer

ReactiveWebServerApplicationContext#onRefresh

  • ReactiveWebServerApplicationContext#createWebServer
  • this.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit);
  • this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit);
    this.webServer = factory.getWebServer(this.handler);
  • NettyReactiveWebServerFactory#getWebServer
  • NettyReactiveWebServerFactory#createHttpServer

1.1 配置

NettyReactiveWebServerFactory#createHttpServer

    private HttpServer createHttpServer() {
        HttpServer server = HttpServer.create();
        if (this.resourceFactory != null) {
            LoopResources resources = this.resourceFactory.getLoopResources();
            Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
            server = server
                    .tcpConfiguration((tcpServer) -> tcpServer.runOn(resources).bindAddress(this::getListenAddress));
        }
        else {
            server = server.tcpConfiguration((tcpServer) -> tcpServer.bindAddress(this::getListenAddress));
        }
        if (getSsl() != null && getSsl().isEnabled()) {
            SslServerCustomizer sslServerCustomizer = new SslServerCustomizer(getSsl(), getHttp2(),
                    getSslStoreProvider());
            server = sslServerCustomizer.apply(server);
        }
        if (getCompression() != null && getCompression().getEnabled()) {
            CompressionCustomizer compressionCustomizer = new CompressionCustomizer(getCompression());
            server = compressionCustomizer.apply(server);
        }
        server = server.protocol(listProtocols()).forwarded(this.useForwardHeaders);
        return applyCustomizers(server);
    }

这里采用了层层嵌套的方式

  • 最里层的source是TcpServerBind,包含ServerBootstrap一些配置信息
  • 向外第二层HttpServerTcpConfig,其tcpServerMapper是(tcpServer) -> tcpServer.runOn(resources).bindAddress(this::getListenAddress)
  • 向外第三层HttpServerTcpConfig,其tcpServerMapper是tcpServer -> tcpServer.bootstrap(b -> HttpServerConfiguration.protocols(b, supportedProtocols))
  • 向外第四层HttpServerTcpConfig,其tcpServerMapper是tcp -> tcp.bootstrap(b -> HttpServerConfiguration.forwardedHeaderHandler(b, null))

1.2 创建NettyWebServer

NettyReactiveWebServerFactory#getWebServer

    public WebServer getWebServer(HttpHandler httpHandler) {
        HttpServer httpServer = createHttpServer();
        ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
        NettyWebServer webServer = new NettyWebServer(httpServer, handlerAdapter, this.lifecycleTimeout, getShutdown());
        webServer.setRouteProviders(this.routeProviders);
        return webServer;
    }

这里对httpServer又加了一层配置:

  • this.httpServer = httpServer.channelGroup(new DefaultChannelGroup(new DefaultEventExecutor()));单线程池
  • tcpServer -> tcpServer.channelGroup(channelGroup)

1.3 启动

AbstractApplicationContext#finishRefresh

  • getLifecycleProcessor().onRefresh()
  • WebServerStartStopLifecycle#start
  • WebServerManager#start

WebServerManager#start

    void start() {
        this.handler.initializeHandler();
        this.webServer.start();
        this.applicationContext
                .publishEvent(new ReactiveWebServerInitializedEvent(this.webServer, this.applicationContext));
    }

1.3.1 handler

来看看handler:

  • HttpWebHandlerAdapter
  • ExceptionHandlingWebHandler
  • FilteringWebHandler
  • DispatcherHandler

1.3.2 启动

NettyWebServer#startHttpServer又封装了一层

  • HttpServerHandle,其handler是上面的嵌套handler
  • server是多层嵌套的HttpServerTcpConfig
  • HttpServer#bind()
  • bind(tcpConfiguration())

首先来看看tcpConfiguration()

  • 最外层是HttpServerHandle#tcpConfiguration
    source.tcpConfiguration().bootstrap(this),后面的bootstrap会创建TcpServerBootstrap,其bootstrapMapper是HttpServerHandle。
  • 第二层HttpServerTcpConfig#tcpConfiguration
    tcpServer -> tcpServer.channelGroup(channelGroup)
  • 第三层HttpServerTcpConfig#tcpConfiguration
    tcp -> tcp.bootstrap(b -> HttpServerConfiguration.forwardedHeaderHandler(b, null))
  • 第四层HttpServerTcpConfig#tcpConfiguration
    cpServer -> tcpServer.bootstrap(b -> HttpServerConfiguration.protocols(b, supportedProtocols))
  • 第五层HttpServerTcpConfig#tcpConfiguration
    (tcpServer) -> tcpServer.runOn(resources).bindAddress(this::getListenAddress)
  • 第六层,HttpServerBind#tcpConfiguration,返回的是tcpServer,是TcpServerBind,然后从里层依次向上执行。

这个执行的逻辑是从最里层向外执行

  • TcpServerBind,返回的是ServerBootStrap
  • (tcpServer) -> tcpServer.runOn(resources).bindAddress(this::getListenAddress),runOn返回一个TcpServerRunOn,bindAddress()返回一个TcpServerBootstrap

最后返回的是:


然后来看看bind()

  • HttpServerHandle#bind(TcpServerBootstrap)
  • HttpServerTcpConfig#bind(TcpServerBootstrap),DefaultChannelGroup
  • HttpServerTcpConfig#bind(TcpServerBootstrap),HttpRequestDecoderSpec
  • HttpServerTcpConfig#bind(TcpServerBootstrap),HttpServer
  • HttpServerTcpConfig#bind(TcpServerBootstrap),HttpServer,HttpProtocol
  • HttpServerTcpConfig#bind(TcpServerBootstrap),NettyReactiveWebServerFactory
  • HttpServerBind#bind
  • TcpServerBoostrap#bind -> TcpServer#bind
  • ServerBootstrap b = configure()
  • bootstrapMapper.apply(source.configure())
  • 这里又是层层嵌套调用configure,直至最里层的TcpServerRunOn#configure,调用至DefaultLoopResources#cacheNioServerLoops创建NioEventLoopGroup,线程数为4,这里group和childGroup都是公用这个线程池,channel是NioServerSocketChannel
  • bind(b)
  • TcpServerOperator#bind,这里又是层层嵌套调用
  • BootstrapHandlers#finalizeHandler,给childHandler添加BootstrapInitializerHandlern

BootstrapInitializerHandler包含的属性BootstrapInitializerHandler(pipeline, opsFactory, childListener):


1.4 pipeline

DefaultChannelPipeline

  • HeadContext
  • HttpServerCodec
  • HttpTrafficHandler
  • ChannelOperationsHandler
  • TailContext

2.核心处理流程

网关的核心是 Filter 以及 Filter Chain,客户端向 Spring Cloud Gateway 发出请求,然后在 Gateway Handler Mapping 中找到与请求相匹配的路由,将其发送到 Gateway Web Handler。Handler 再通过指定的过滤器链来将请求发送到我们实际的服务执行业务逻辑,然后返回。过滤器之间用虚线分开是因为过滤器可能会在发送代理请求之前(pre)或之后(post)执行业务逻辑。

2.1 DispatcherHandler#handle

ChannelOperationsHandler#channelRead

  • HttpServerOperations#onInboundNext
  • TcpServerBind.ChildObserver#onStateChange(HttpServerState.REQUEST_RECEIVED)
  • HttpServerHandler#onStateChange
  • DispatcherHandler#handle
    public Mono<Void> handle(ServerWebExchange exchange) {
        if (this.handlerMappings == null) {
            return createNotFoundError();
        }
        return Flux.fromIterable(this.handlerMappings)
                .concatMap(mapping -> mapping.getHandler(exchange))
                .next()
                .switchIfEmpty(createNotFoundError())
                .flatMap(handler -> invokeHandler(exchange, handler))
                .flatMap(result -> handleResult(exchange, result));
    }

2.2 mapping#getHandler

包含四个HandlerMapping

  • RouterFunctionMapping
  • RequestMappingHandlerMapping
  • RoutePredicateHandlerMapping
  • SimpleUrlHandlerMapping

核心是看RoutePredicateHandlerMapping

  • AbstractHandlerMapping#getHandler
  • RoutePredicateHandlerMapping#getHandlerInternal

2.2.1 lookupRoute

  • RoutePredicateHandlerMapping#lookupRoute
    protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
        return this.routeLocator.getRoutes().concatMap((route) -> {
            return Mono.just(route).filterWhen((r) -> {
                exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                return (Publisher)r.getPredicate().apply(exchange);
            }).doOnError((e) -> {
                this.logger.error("Error applying predicate for route: " + route.getId(), e);
            }).onErrorResume((e) -> {
                return Mono.empty();
            });
        }).next().map((route) -> {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Route matched: " + route.getId());
            }

            this.validateRoute(route, exchange);
            return route;
        });
    }

RouteDefinitionRouteLocator#getRoutes

  • routes = this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute)
  • getRouteDefinitions()首先读取路由配置信息,最终是从GatewayProperties#getRoutes,解析生成的RouteDefinition
  • convertToRoute(),((Route.AsyncBuilder)Route.async(routeDefinition).asyncPredicate(predicate).replaceFilters(gatewayFilters)).build(),将RouteDefinition转换为Route

Mono.just(route).filterWhen((r)

  • 根据请求url和断言条件匹配路由
  • (Publisher)r.getPredicate().apply(exchange)
  • GatewayPredicate#test

2.2.2绑定路由信息到请求上下文exchange

exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR, r)

2.3 invokeHandler(exchange, handler)

DispatcherHandler#invokeHandler

    private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
        if (this.handlerAdapters != null) {
            for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
                if (handlerAdapter.supports(handler)) {
                    return handlerAdapter.handle(exchange, handler);
                }
            }
        }
        return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
    }

适配的是SimpleHandlerAdapter

  • SimpleHandlerAdapter#handle
  • FilteringWebHandler#handle,获取请求的局部过滤器和全局过滤器,然后组合在一起排序
  • 过滤器的顺序
    RemoveCachedBodyFilter
    SentinelGatewayFilter
    AdaptCachedBodyGlobalFilter
    NettyWriteResponseFilter
    ForwardPathFilter
    RouteToRequestUrlFilter
    LoadBalancerClientFilter
    WebsocketRoutingFilter
    GateFilterAdapter
    NettyRoutingFilter
    ForwardRoutingFilter
  • new DefaultGatewayFilterChain(combined).filter(exchange)

FilteringWebHandler#handle

    public Mono<Void> handle(ServerWebExchange exchange) {
        Route route = (Route)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
        List<GatewayFilter> gatewayFilters = route.getFilters();
        List<GatewayFilter> combined = new ArrayList(this.globalFilters);
        combined.addAll(gatewayFilters);
        AnnotationAwareOrderComparator.sort(combined);
        if (logger.isDebugEnabled()) {
            logger.debug("Sorted gatewayFilterFactories: " + combined);
        }

        return (new DefaultGatewayFilterChain(combined)).filter(exchange);
    }

排序后的过滤器:


GlobalFilter -> GateFilter,使用适配器模式

  • 通过GatewayFilterAdapter将GlobalFilter适配成GatewayFilter

全局过滤器链

  • RouteToRequestUrlFilter,将ip port转换为uri比如lb://mall-order
  • LoadBalancerClientFilter负载均衡,这里根据lb://mall-order服务名进行负载均衡。nacos client获取对应微服务的实例列表,然后ribbon通过负载均衡算法选出一个服务实例。绑定url到请求的上下文。
  • NettyRoutingFilter,通过HttpClient发送请求到下游微服务

2.4 handleResult

调试的时候,这个没有走到。

2.5 post过滤器

问题:Gateway微服务返回响应后走得过滤器链是啥?

  • 例如NettyWriteResponseFilter#filter,会在获取到下游微服务的响应后,将响应发送给请求方

NettyWriteResponseFilter#filter

    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added
        // until the NettyRoutingFilter is run
        // @formatter:off
        return chain.filter(exchange)
                .doOnError(throwable -> cleanup(exchange))
                .then(Mono.defer(() -> {
                    Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);

                    if (connection == null) {
                        return Mono.empty();
                    }
                    if (log.isTraceEnabled()) {
                        log.trace("NettyWriteResponseFilter start inbound: "
                                + connection.channel().id().asShortText() + ", outbound: "
                                + exchange.getLogPrefix());
                    }
                    ServerHttpResponse response = exchange.getResponse();

                    // TODO: needed?
                    final Flux<DataBuffer> body = connection
                            .inbound()
                            .receive()
                            .retain()
                            .map(byteBuf -> wrap(byteBuf, response));

                    MediaType contentType = null;
                    try {
                        contentType = response.getHeaders().getContentType();
                    }
                    catch (Exception e) {
                        if (log.isTraceEnabled()) {
                            log.trace("invalid media type", e);
                        }
                    }
                    return (isStreamingMediaType(contentType)
                            ? response.writeAndFlushWith(body.map(Flux::just))
                            : response.writeWith(body));
                })).doOnCancel(() -> cleanup(exchange));
        // @formatter:on
    }

参考

相关文章

网友评论

      本文标题:Gateway源码剖析

      本文链接:https://www.haomeiwen.com/subject/bohkrdtx.html