美文网首页Spring Cloud
聊聊spring cloud gateway的streaming

聊聊spring cloud gateway的streaming

作者: go4it | 来源:发表于2018-06-07 19:06 被阅读57次

    本文主要研究下spring cloud gateway的streaming-media-types属性

    配置

    配置说明

        {
          "sourceType": "org.springframework.cloud.gateway.config.GatewayProperties",
          "name": "spring.cloud.gateway.streaming-media-types",
          "type": "java.util.List<org.springframework.http.MediaType>"
        }
    

    GatewayProperties

    spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/config/GatewayProperties.java

    @ConfigurationProperties("spring.cloud.gateway")
    @Validated
    public class GatewayProperties {
    
        /**
         * List of Routes
         */
        @NotNull
        @Valid
        private List<RouteDefinition> routes = new ArrayList<>();
    
        /**
         * List of filter definitions that are applied to every route.
         */
        private List<FilterDefinition> defaultFilters = new ArrayList<>();
    
        private List<MediaType> streamingMediaTypes = Arrays.asList(MediaType.TEXT_EVENT_STREAM,
                MediaType.APPLICATION_STREAM_JSON);
    
        public List<RouteDefinition> getRoutes() {
            return routes;
        }
    
        public void setRoutes(List<RouteDefinition> routes) {
            this.routes = routes;
        }
    
        public List<FilterDefinition> getDefaultFilters() {
            return defaultFilters;
        }
    
        public void setDefaultFilters(List<FilterDefinition> defaultFilters) {
            this.defaultFilters = defaultFilters;
        }
    
        public List<MediaType> getStreamingMediaTypes() {
            return streamingMediaTypes;
        }
    
        public void setStreamingMediaTypes(List<MediaType> streamingMediaTypes) {
            this.streamingMediaTypes = streamingMediaTypes;
        }
    
        @Override
        public String toString() {
            return "GatewayProperties{" +
                    "routes=" + routes +
                    ", defaultFilters=" + defaultFilters +
                    ", streamingMediaTypes=" + streamingMediaTypes +
                    '}';
        }
    }
    

    可以看到默认是MediaType.TEXT_EVENT_STREAM(text/event-stream)、MediaType.APPLICATION_STREAM_JSON(application/stream+json)

    使用

    GatewayAutoConfiguration

    spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/config/GatewayAutoConfiguration.java

        @Configuration
        @ConditionalOnClass(HttpClient.class)
        protected static class NettyConfiguration {
            @Bean
            @ConditionalOnMissingBean
            public HttpClient httpClient(@Qualifier("nettyClientOptions") Consumer<? super HttpClientOptions.Builder> options) {
                return HttpClient.create(options);
            }
    
            //......
    
            @Bean
            public HttpClientProperties httpClientProperties() {
                return new HttpClientProperties();
            }
    
            @Bean
            public NettyRoutingFilter routingFilter(HttpClient httpClient,
                                                    ObjectProvider<List<HttpHeadersFilter>> headersFilters) {
                return new NettyRoutingFilter(httpClient, headersFilters);
            }
    
            @Bean
            public NettyWriteResponseFilter nettyWriteResponseFilter(GatewayProperties properties) {
                return new NettyWriteResponseFilter(properties.getStreamingMediaTypes());
            }
    
            @Bean
            public ReactorNettyWebSocketClient reactorNettyWebSocketClient(@Qualifier("nettyClientOptions") Consumer<? super HttpClientOptions.Builder> options) {
                return new ReactorNettyWebSocketClient(options);
            }
        }
    

    这里的NettyWriteResponseFilter使用到了properties.getStreamingMediaTypes()

    NettyWriteResponseFilter

    spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/filter/NettyWriteResponseFilter.javac

    public class NettyWriteResponseFilter implements GlobalFilter, Ordered {
    
        private static final Log log = LogFactory.getLog(NettyWriteResponseFilter.class);
    
        public static final int WRITE_RESPONSE_FILTER_ORDER = -1;
    
        private final List<MediaType> streamingMediaTypes;
    
        public NettyWriteResponseFilter(List<MediaType> streamingMediaTypes) {
            this.streamingMediaTypes = streamingMediaTypes;
        }
    
        @Override
        public int getOrder() {
            return WRITE_RESPONSE_FILTER_ORDER;
        }
    
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added
            // until the WebHandler is run
            return chain.filter(exchange).then(Mono.defer(() -> {
                HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);
    
                if (clientResponse == null) {
                    return Mono.empty();
                }
                log.trace("NettyWriteResponseFilter start");
                ServerHttpResponse response = exchange.getResponse();
    
                NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
                //TODO: what if it's not netty
    
                final Flux<NettyDataBuffer> body = clientResponse.receive()
                        .retain() //TODO: needed?
                        .map(factory::wrap);
    
                MediaType contentType = response.getHeaders().getContentType();
                return (isStreamingMediaType(contentType) ?
                        response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body));
            }));
        }
    
        //TODO: use framework if possible
        //TODO: port to WebClientWriteResponseFilter
        private boolean isStreamingMediaType(@Nullable MediaType contentType) {
            return (contentType != null && this.streamingMediaTypes.stream()
                            .anyMatch(contentType::isCompatibleWith));
        }
    
    }
    

    可以看到这里根据isStreamingMediaType方法判断是否是stream类型,如果是则采用writeAndFlushWith方法,不是则采用writeWith方法

    ReactiveHttpOutputMessage

    spring-web-5.0.6.RELEASE-sources.jar!/org/springframework/http/ReactiveHttpOutputMessage.java

    /**
     * A "reactive" HTTP output message that accepts output as a {@link Publisher}.
     *
     * <p>Typically implemented by an HTTP request on the client-side or an
     * HTTP response on the server-side.
     *
     * @author Arjen Poutsma
     * @author Sebastien Deleuze
     * @since 5.0
     */
    public interface ReactiveHttpOutputMessage extends HttpMessage {
    
        /**
         * Return a {@link DataBufferFactory} that can be used to create the body.
         * @return a buffer factory
         * @see #writeWith(Publisher)
         */
        DataBufferFactory bufferFactory();
    
        /**
         * Register an action to apply just before the HttpOutputMessage is committed.
         * <p><strong>Note:</strong> the supplied action must be properly deferred,
         * e.g. via {@link Mono#defer} or {@link Mono#fromRunnable}, to ensure it's
         * executed in the right order, relative to other actions.
         * @param action the action to apply
         */
        void beforeCommit(Supplier<? extends Mono<Void>> action);
    
        /**
         * Whether the HttpOutputMessage is committed.
         */
        boolean isCommitted();
    
        /**
         * Use the given {@link Publisher} to write the body of the message to the
         * underlying HTTP layer.
         * @param body the body content publisher
         * @return a {@link Mono} that indicates completion or error
         */
    
        Mono<Void> writeWith(Publisher<? extends DataBuffer> body);
    
        /**
         * Use the given {@link Publisher} of {@code Publishers} to write the body
         * of the HttpOutputMessage to the underlying HTTP layer, flushing after
         * each {@code Publisher<DataBuffer>}.
         * @param body the body content publisher
         * @return a {@link Mono} that indicates completion or error
         */
        Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body);
    
        /**
         * Indicate that message handling is complete, allowing for any cleanup or
         * end-of-processing tasks to be performed such as applying header changes
         * made via {@link #getHeaders()} to the underlying HTTP message (if not
         * applied already).
         * <p>This method should be automatically invoked at the end of message
         * processing so typically applications should not have to invoke it.
         * If invoked multiple times it should have no side effects.
         * @return a {@link Mono} that indicates completion or error
         */
        Mono<Void> setComplete();
    
    }
    

    从接口的注释可以看到,writeWith与writeAndFlushWith的参数泛型不同,一个是Publisher<? extends DataBuffer>,一个是Publisher<? extends Publisher<? extends DataBuffer>>。而writeAndFlushWith则是在每个Publisher<DataBuffer>写入之后就flush。

    小结

    NettyWriteResponseFilter根据spring.cloud.gateway.streaming-media-types配置的类型来判断是writeAndFlushWith还是writeWith,如果是指定类型则选择用writeAndFlushWith写入response。默认该配置指定了MediaType.TEXT_EVENT_STREAM(text/event-stream)、MediaType.APPLICATION_STREAM_JSON(application/stream+json)这两种类型。

    doc

    相关文章

      网友评论

        本文标题:聊聊spring cloud gateway的streaming

        本文链接:https://www.haomeiwen.com/subject/rahnsftx.html