美文网首页Spring Coud
Zuul2 FilterChain 原理分析

Zuul2 FilterChain 原理分析

作者: 绝尘驹 | 来源:发表于2018-06-02 22:18 被阅读0次

    背景

    Netflix 公司是业界微服务的典范,网关项目zuul 更是明星项目,在天朝广受好评,业界用的现在应该都是zuul1是不完全支持异步,大名鼎鼎的Hystrix的组件就是在同步模型下,用来帮助后端服务限流和熔断的,大家期盼已久的zuul2 开源了,作为目前正在负责网关项目的俺,立即把代码拉下来,就此窃开了zuul2的神秘面纱。

    Netflix 公司号称升级到zuul2以来吞吐量提升20%,cpu利用率还降20%,牛气吗,牛气就得学习。

    Zuul2 的架构

    zuul2 和1版本的核心区别是用netty 实现了接入端和后端服务的调用,全部是事件驱动,异步化了

    架构图

    zuul-http2.png

    从这个架构图可以看,很简单,有的同学会说,你没有搞错吧,是的,就是这么简单。核心就在处理接收请求的Netty Request Handler 和InboundFilter,outboundFilter 比较简单。我们这里主要分析Zuul2 的fiter模型,由于请求是先经过Netty http2 的编解码后再进入filter链处理的,所以下面先介绍下zuul2的Netty http2 的用法

    Netty Http2 编解码

    Netty的http2 协议栈我们这里就不详细分析,后面再写专门的文章来分析。需要注意的是zuul2 用了一个降级的handler 来把http2 协议的贞frame 转化为http1,和后端的通信还是http1,后端服务都不需升级,降级的handler 为:

    Http2StreamFrameToHttpObjectCodec
    

    Http2StreamFrameToHttpObjectCodec 把http2 转换为http1的请求,但是并没有聚合的功能,即等一个完整请求都到达后才开始进入filter的处理,zuul2 也没有用聚合,通过下面的分析就知道zuul2为啥不要聚合。

    Zuul2的Filter

    Zuul filter 机制也是借鉴netty 的 handler 链思想并基于handler 实现了filter 链。

    Filter 在handler的位置

    Zuul2 初始化完netty http2 编解码的handler后,就通过addZuulHandlers 方法添加zuul 自己的handler。

    protected void addZuulHandlers(final ChannelPipeline pipeline)
        {
            pipeline.addLast("logger", nettyLogger);
            pipeline.addLast(new ClientRequestReceiver(sessionContextDecorator));
            pipeline.addLast(passportLoggingHandler);
            addZuulFilterChainHandler(pipeline);
            pipeline.addLast(new ClientResponseWriter(requestCompleteHandler, registry));
        }
    

    ClientRequestReceiver 是接收经过对http2降级后的http request 请求, 并创建zuul 自己封装的zuulRequest,传给后面的ZuulFilterChainHandler 处理,addZuulFilterChainHandler 就是初始化Filter handler 的入口,代码如下:

    protected void addZuulFilterChainHandler(final ChannelPipeline pipeline) {
            final ZuulFilter<HttpResponseMessage, HttpResponseMessage>[] responseFilters = getFilters(
                    new OutboundPassportStampingFilter(FILTERS_OUTBOUND_START),
                    new OutboundPassportStampingFilter(FILTERS_OUTBOUND_END));
    
            // response filter chain
            final ZuulFilterChainRunner<HttpResponseMessage> responseFilterChain = getFilterChainRunner(responseFilters,
                    filterUsageNotifier);
    
            // endpoint | response filter chain
            final FilterRunner<HttpRequestMessage, HttpResponseMessage> endPoint = getEndpointRunner(responseFilterChain,
                    filterUsageNotifier, filterLoader);
    
            final ZuulFilter<HttpRequestMessage, HttpRequestMessage>[] requestFilters = getFilters(
                    new InboundPassportStampingFilter(FILTERS_INBOUND_START),
                    new InboundPassportStampingFilter(FILTERS_INBOUND_END));
    
            // request filter chain | end point | response filter chain
            final ZuulFilterChainRunner<HttpRequestMessage> requestFilterChain = getFilterChainRunner(requestFilters,
                    filterUsageNotifier, endPoint);
    
            pipeline.addLast(new ZuulFilterChainHandler(requestFilterChain, responseFilterChain));
        }
    
    

    上面的代码有三个核心的概念:

    responseFilterChain

    包含所有的处理响应的filter,可以通过配置文件指定

    requestFilterChain

    所以的请求filter 管道。一个一个执行,requestFilterChain里面引用了endPoint

    endPoint

    就是一个中转站,由requestFilter 处理完后,交给endPoint,endPoint 负责把请求发到后端服务,接收到响应后,再把响应传递给responseFilterChain,(其实endPoint 也是一个filter)。

    通过ZuulFilterChainHandler 把requestFilterChain 和responseFilterChain 给包装起来了。ZuulFilterChainHandler 是ClientRequestReceiver 后面的handler,

    ZuulFilterChainHandler 代码如下:

    @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //http header 部分
            if (msg instanceof HttpRequestMessage) {
                zuulRequest = (HttpRequestMessage)msg;
    
                //Replace NETTY_SERVER_CHANNEL_HANDLER_CONTEXT in SessionContext
                final SessionContext zuulCtx = zuulRequest.getContext();
                zuulCtx.put(NETTY_SERVER_CHANNEL_HANDLER_CONTEXT, ctx);
                zuulCtx.put(ZUUL_FILTER_CHAIN, requestFilterChain);
    
                requestFilterChain.filter(zuulRequest);
            }
            //http body 部分,如果有多个,那最后一个就是LastHttpContent
            else if ((msg instanceof HttpContent)&&(zuulRequest != null)) {
                requestFilterChain.filter(zuulRequest, (HttpContent) msg);
            }
            else {
                LOG.debug("Received unrecognized message type. " + msg.getClass().getName());
                ReferenceCountUtil.release(msg);
            }
        }
    

    通过上面可以看出,http request 是 执行 filter方法,和http content 是执行带http content的filter 方法,我们先看fiter方法,即处理http requet。

    @Override
        public void filter(final T inMesg) {
            runFilters(inMesg, initRunningFilterIndex(inMesg));
        }
    

    initRunningFilterIndex 是在当前zuul request 请求上下文记录一个filter的索引runningFilterIdx,初始化的时候是0,代表从第一个filter开始执行,后面需要知道当前请求执行到那个filter了. runFilters 的代码如下:

    private final void runFilters(final T mesg, final AtomicInteger runningFilterIdx) {
            T inMesg = mesg;
            String filterName = "-";
            try {
                Preconditions.checkNotNull(mesg, "Input message")
                int i = runningFilterIdx.get();
                while (i < filters.length) {
                    final ZuulFilter<T, T> filter = filters[i];
                    filterName = filter.filterName();
                    final T outMesg = filter(filter, inMesg);
                    //如过filter是异步处理的,因为zuul不会等,那就变成阻塞了,会返回null,后面通过异步通知继续执行
                    if (outMesg == null) {
                        return; //either async filter or waiting for the message body to be buffered
                    }
                    inMesg = outMesg;
                    i = runningFilterIdx.incrementAndGet();
                }
    
                //Filter chain has reached its end, pass result to the next stage
                // Filter 执行完后,进入到上面提到的endpoint处理。这里的
                // endPoint 是ZuulEndPointRunner。
                invokeNextStage(inMesg);
            }
            catch (Exception ex) {
                handleException(inMesg, filterName, ex);
            }
        }
    

    filter的核心代码如下:

                ExecutionStatus filterRunStatus = null;
                if (filter.filterType() == INBOUND && inMesg.getContext().shouldSendErrorResponse()) {
                    // Pass request down the pipeline, all the way to error endpoint if error response needs to be generated
                    filterRunStatus = SKIPPED;
                }
    
                if (shouldSkipFilter(inMesg, filter)) {
                    filterRunStatus = SKIPPED;
                }
    
                if (filter.isDisabled()) {
                    filterRunStatus = DISABLED;
                }
    
                if (filterRunStatus != null) {
                    recordFilterCompletion(filterRunStatus, filter, startTime, inMesg, snapshot);
                    return filter.getDefaultOutput(inMesg);
                }
    
                //这里是关键方法。如果filter需要对body进行处理,那就需要检查body是否全部读完,
                // 如果因为半包等原因没有读完,那需要等,这里等不是说线程会block,
                // 而且等底层io读事件触发后继续处理。这里返回null很重要,
                // filter chain 就不会执行后面的filter了,并做个标记在请求上下文。
                if (!isMessageBodyReadyForFilter(filter, inMesg)) {
                    setFilterAwaitingBody(inMesg, true);
                    LOG.debug("Filter {} waiting for body, UUID {}", filter.filterName(), inMesg.getContext().getUUID());
                    return null;  //wait for whole body to be buffered
                }
                //如果不需要body,则把标志清楚,因为有可能是上次等待设置的,现在来了,就可以清楚了。
                setFilterAwaitingBody(inMesg, false);
    
                if (snapshot != null) {
                    Debug.addRoutingDebug(inMesg.getContext(), "Filter " + filter.filterType().toString() + " " + filter.filterOrder() + " " + filter.filterName());
                }
    
                //run body contents accumulated so far through this filter
                //如果是http request header 部分请求request filter都执行完了,到endpoint 执行时回啥都不做。而是通过下面的apply方法来建立连接和后端,并发送请求相关的信息。
                inMesg.runBufferedBodyContentThroughFilter(filter);
    
                //同步filter 直接执行filter的apply方法,
                if (filter.getSyncType() == FilterSyncType.SYNC) {
                    final SyncZuulFilter<I, O> syncFilter = (SyncZuulFilter) filter;
                    final O outMesg = syncFilter.apply(inMesg);
                    recordFilterCompletion(SUCCESS, filter, startTime, inMesg, snapshot);
                    return (outMesg != null) ? outMesg : filter.getDefaultOutput(inMesg);
                }
    
                // async filter
                filter.incrementConcurrency();
                resumer = new FilterChainResumer(inMesg, filter, snapshot, startTime);
                filter.applyAsync(inMesg)
                    //注意这里,observeOn 方法是指定由那个线程来通知观察者,这里是和请求同一个线程
                    .observeOn(Schedulers.from(getChannelHandlerContext(inMesg).executor()))
                    .doOnUnsubscribe(resumer::decrementConcurrency)
                    .subscribe(resumer);
                //异步时,返回null,
                return null;  //wait for the async filter to finish
            }
    

    通过上面的代码,我们知道,假如我们的fitler都不需要对body进行处理,那对于http request 的header 收到的时候,所有的filter会执行完,每个filter的逻辑在apply方法中实现,执行完所有的request filter时,endpoint 的apply 方法如下:

        @Override
        public HttpResponseMessage apply(final HttpRequestMessage input) {
            // If no Origin has been selected, then just return a 404 static response.
            // handle any exception here
            try {
    
                if (origin == null) {
                    handleNoOriginSelected();
                    return null;
                }
    
                origin.getProxyTiming(zuulRequest).start();
                origin.onRequestExecutionStart(zuulRequest, 1);
                //代理请求到后端服务。这里就不详细分析了,不然写不完了
                proxyRequestToOrigin();
                //Doesn't return origin response to caller, calls invokeNext() internally in response filter chain
                return null;
            } catch (Exception ex) {
                handleError(ex);
                return null;
            }
        }
    
    

    endpoint filter 会为每个请求创建一个ProxyEndpoint,是非线程安全的,在http request header 的时候,就和后端建立了链接。等到http content来的时候就直接发送了,下面我们看endpoint是怎么处理http content的。

                 //因为前面处理http request header时就创建了endpoint,所以当http content 来时,直接从上下文取。
                ZuulFilter<HttpRequestMessage, HttpResponseMessage> endpoint = Preconditions.checkNotNull(
                        getEndpoint(zuulReq), "endpoint");
                endpointName = endpoint.filterName();
                //这里就直接通过endpoint也就是ProxyEndpoint 把http content直接发送给后端服务,并返回null
                final HttpContent newChunk = endpoint.processContentChunk(zuulReq, chunk);
                if (newChunk != null) {
                    //Endpoints do not directly forward content chunks to next stage in the filter chain.
                    zuulReq.bufferBodyContents(newChunk);
                    //deallocate original chunk if necessary
                    if (newChunk != chunk) {
                        chunk.release();
                    }
                    if (isFilterAwaitingBody(zuulReq) && zuulReq.hasCompleteBody() && !(endpoint instanceof ProxyEndpoint)) {
                        //whole body has arrived, resume filter chain
                        invokeNextStage(filter(endpoint, zuulReq));
                    }
               }
    

    需要注意的是,http content 到达时,还是回把request filter chain中的所有filter 都执行一遍。然后在进入endpoint fitler,直接发送到后端服务。

    我们网关实现一般都会聚合,即等http request header 和request content 都到达后再经过处理统一发给后端服务。zuul2 的实现还是比较激进,即来一块content 就发一块,特别适合trunk 模式的请求,做到流试处理。

    下面是一个图filter chain 处理请求

    zuulFilter.png

    相关文章

      网友评论

        本文标题:Zuul2 FilterChain 原理分析

        本文链接:https://www.haomeiwen.com/subject/lltdsftx.html