美文网首页
Netty的HttpObjectAggregator解析

Netty的HttpObjectAggregator解析

作者: 雕兄L | 来源:发表于2018-11-09 20:59 被阅读0次

分析聚合Request对象的过程

类图
从继承关系可以看出,该类继承了几个抽象类,其中MessageAggregator、MessageToMessageDecoderd是Netty为了解码其他协议而抽出来的公共实现类,其代码并不是为了专门解析http协议而写

先从顶层的代码往下分析
首先看抽象类 MessageToMessageDecoderd

  @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          //这个对象作用就是一个List,是Netty自己封装的        
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            //判断是否是正常的msg 子类实现该方法,这里是由MessageAggregator实现
            //判断的代码是 (isContentMessage(in) || isStartMessage(in)) && !isAggregated(in)
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                    //子类实现解码过程
                    decode(ctx, cast, out);
                } finally {
                    ReferenceCountUtil.release(cast);
                }
            } else {
                out.add(msg);
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            int size = out.size();
            //注意最后这个for循环,当size大于0的时候,就会触发执行下一个handler的方法,
            // 所以这里需要看子类的decode实现,看看什么条件下list里才会被添加上对象,一旦被添加上对象,即聚合结束 
            // 其实size=1就结束聚合
            for (int i = 0; i < size; i ++) {
                ctx.fireChannelRead(out.getUnsafe(i));
            }
            out.recycle();
        }
    }

现在来看下MessageAggregator的decode()方法的实现,注意currentMessage是该handler的成员变量,每一个channel对应一个handler实例,这个currentMessage会存储多次decode迭代的结果,这是理解数据流转过程的一个关键

@Override
    protected void decode(final ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception {
       //看是否是起始信息,即头信息 子类HttpObjectAggregator实现该方法,当msg是HttpMessage(消息行消息头)对象时返回true
        if (isStartMessage(msg)) {
            handlingOversizedMessage = false;
            //刚刚开始解析消息头就有数据了,则不正常
            if (currentMessage != null) {
                currentMessage.release();
                currentMessage = null;
                throw new MessageAggregationException();
            }

            @SuppressWarnings("unchecked")
            S m = (S) msg;

            // Send the continue response if necessary (e.g. 'Expect: 100-continue' header)
            // Check before content length. Failing an expectation may result in a different response being sent.
            Object continueResponse = newContinueResponse(m, maxContentLength, ctx.pipeline());
            //这里还不太懂是干嘛,不过这里段逻辑也不是很重要,跳过
            if (continueResponse != null) {
                // Cache the write listener for reuse.
                ChannelFutureListener listener = continueResponseWriteListener;
                if (listener == null) {
                    continueResponseWriteListener = listener = new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                ctx.fireExceptionCaught(future.cause());
                            }
                        }
                    };
                }

                // Make sure to call this before writing, otherwise reference counts may be invalid.
                boolean closeAfterWrite = closeAfterContinueResponse(continueResponse);
                handlingOversizedMessage = ignoreContentAfterContinueResponse(continueResponse);

                final ChannelFuture future = ctx.writeAndFlush(continueResponse).addListener(listener);

                if (closeAfterWrite) {
                    future.addListener(ChannelFutureListener.CLOSE);
                    return;
                }
                if (handlingOversizedMessage) {
                    return;
                }
            } else if (isContentLengthInvalid(m, maxContentLength)) {
                // if content length is set, preemptively close if it's too large
                invokeHandleOversizedMessage(ctx, m);
                return;
            }
            //这里大概是判断解码上一层handler解码是否成功吧,这里还不好推断解码失败的情况是什么样
            if (m instanceof DecoderResultProvider && !((DecoderResultProvider) m).decoderResult().isSuccess()) {
                O aggregated;
                if (m instanceof ByteBufHolder) {
                    aggregated = beginAggregation(m, ((ByteBufHolder) m).content().retain());
                } else {
                    aggregated = beginAggregation(m, EMPTY_BUFFER);
                }
                finishAggregation(aggregated);
                out.add(aggregated);
                return;
            }

            //这里是初始化一个组合buffer
            // A streamed message - initialize the cumulative buffer, and wait for incoming chunks.
            CompositeByteBuf content = ctx.alloc().compositeBuffer(maxCumulationBufferComponents);
            //HttpMessage 没有实现该类
            if (m instanceof ByteBufHolder) {
                appendPartialContent(content, ((ByteBufHolder) m).content());
            }
            //获得一个AggregatedFullHttpRequest,并且把把头信息赋值进去
            //第一次头信息解析结束
            //给currentMessage赋值
            currentMessage = beginAggregation(m, content);
        } else if (isContentMessage(msg)) {//第二次ChannelRead()读取的数据走到这里,这里应该就是消息体对象,即DefaultHttpContent
            if (currentMessage == null) {
                // it is possible that a TooLongFrameException was already thrown but we can still discard data
                // until the begging of the next request/response.
                return;
            }

            //把上次解析中暂存的buffer取出,当第一次解析消息体的时候,该content是空的
            // Merge the received chunk into the content of the current message.
            CompositeByteBuf content = (CompositeByteBuf) currentMessage.content();

            @SuppressWarnings("unchecked")
            final C m = (C) msg;
            // Handle oversized message.
            if (content.readableBytes() > maxContentLength - m.content().readableBytes()) {
                // By convention, full message type extends first message type.
                @SuppressWarnings("unchecked")
                S s = (S) currentMessage;
                invokeHandleOversizedMessage(ctx, s);
                return;
            }

            //把新读取的buffer合并到旧的content中
            //content是currentMessage对象的一个属性,这里等于同时更新了currentMessage里buffer内容
            // Append the content of the chunk.
            appendPartialContent(content, m.content());

            //truncked协议会用到该方法,参见HttpObjectAggregator的实现
            // Give the subtypes a chance to merge additional information such as trailing headers.
            aggregate(currentMessage, m);

            //下面的逻辑是判断消息是否发送完毕,如果isLastContentMessage返回true,则表示消息体已经读取完了
            //msg是LastHttpContent对象,则表示到了消息尾
            final boolean last;
            if (m instanceof DecoderResultProvider) {
                DecoderResult decoderResult = ((DecoderResultProvider) m).decoderResult();
                if (!decoderResult.isSuccess()) {
                    if (currentMessage instanceof DecoderResultProvider) {
                        ((DecoderResultProvider) currentMessage).setDecoderResult(
                                DecoderResult.failure(decoderResult.cause()));
                    }
                    last = true;
                } else {
                    last = isLastContentMessage(m);
                }
            } else {
                last = isLastContentMessage(m);
            }

            if (last) {
                //聚合结束,把content-length添加到header中,值就是消息体的长度
                finishAggregation(currentMessage);

                //已经是最后一个消息了,则把AggregatedFullHttpRequest对象添加到List进去
                //看到这里就已经拟清楚上一个抽象类写的逻辑了,可以再回看下上一个抽象类的逻辑
                // All done
                out.add(currentMessage);
                currentMessage = null;
            }
        } else {
            throw new MessageAggregationException();
        }
    }

贴一下HttpObjectAggregator实现的几个方法

  @Override
    protected FullHttpMessage beginAggregation(HttpMessage start, ByteBuf content) throws Exception {
        assert !(start instanceof FullHttpMessage);

        HttpUtil.setTransferEncodingChunked(start, false);

        AggregatedFullHttpMessage ret;
        if (start instanceof HttpRequest) {
            ret = new AggregatedFullHttpRequest((HttpRequest) start, content, null);
        } else if (start instanceof HttpResponse) {
            ret = new AggregatedFullHttpResponse((HttpResponse) start, content, null);
        } else {
            throw new Error();
        }
        return ret;
    }

    @Override
    protected void aggregate(FullHttpMessage aggregated, HttpContent content) throws Exception {
        if (content instanceof LastHttpContent) {
            // Merge trailing headers into the message.
            ((AggregatedFullHttpMessage) aggregated).setTrailingHeaders(((LastHttpContent) content).trailingHeaders());
        }
    }

    @Override
    protected void finishAggregation(FullHttpMessage aggregated) throws Exception {
        // Set the 'Content-Length' header. If one isn't already set.
        // This is important as HEAD responses will use a 'Content-Length' header which
        // does not match the actual body, but the number of bytes that would be
        // transmitted if a GET would have been used.
        //
        // See rfc2616 14.13 Content-Length
        if (!HttpUtil.isContentLengthSet(aggregated)) {
            aggregated.headers().set(
                    CONTENT_LENGTH,
                    String.valueOf(aggregated.content().readableBytes()));
        }
    }

相关文章

网友评论

      本文标题:Netty的HttpObjectAggregator解析

      本文链接:https://www.haomeiwen.com/subject/hbowxqtx.html