美文网首页netty
Netty源码分析-如何解决TCP粘包拆包问题

Netty源码分析-如何解决TCP粘包拆包问题

作者: yunkai_zhang | 来源:发表于2017-09-08 16:52 被阅读0次

    在实际网络应用中,我们接收和发送的数据都是以实际应用数据类型为单位的(比如一个Http数据体,或者一个ThriftObject)。而对于Socket而言,它处理的是TCP传输层的数据,在它接收或发送的一个TCP包中,可能正好对应一个ThriftObject,或者多个ThriftObject、ThriftObject的一部分,甚至可能由多个ThriftObject的多个部分组成。这就是TCP的粘包半包问题。

    Netty提供了一种机制可以帮助我们方便地处理TCP半包和粘包问题,它是通过嵌入ChannelHandler来实现的。

    下面以一段简单的代码实例来看一下,如何在Netty中处理TCP粘包和拆包问题。可以看到代码中添加了类型为FixedLengthFrameDecoder的ChannelHandler,添加这段代码的效果就是每次会截取定量长度为1024的字节数据作为下层ChannelHandler的input处理对象。

        public void bind(int port) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //处理TCP半包粘包问题
                            ch.pipeline().addLast(new FixedLengthFrameDecoder(1024));
                        }
                    })
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new SimpleNettyServerHandler());
                        }
                    });
                ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
                channelFuture.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    

    1. 源码分析

    我们就以FixedLengthFrameDecoder为例,它的实现非常简单,继承自类ByteToMessageDecoder,并对父类的抽象方法decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)进行了实现。所以,要分析Netty对TCP粘包拆包的处理,核心逻辑在于ByteToMessageDecoder

    public class FixedLengthFrameDecoder extends ByteToMessageDecoder{
     @Override
        protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
        {.....}
    }
    

    1.1 流程概述

    4个核心方法:
    ByteToMessageDecoder channelRead(..)
    ByteToMessageDecoder callDecode(..)
    ByteToMessageDecoder fireChannelRead(..)
    FixedLengthFrameDecoder decode(..)

    method Class 说明
    void channelRead( ChannelHandlerContext ctx, Object msg) ByteToMessageDecoder 入口方法,入参msg会是一个ByteBuf。主要过程就是整合一下组装成一个新的ByteBuf cumulation,然后对这个cumulation 进行callDecode
    void callDecode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out) ByteToMessageDecoder 对ByteBuf进行解码,解析成一组JavaObject到out中,会在该方法中调用实际的decode方法,并会调用fireChannelRead方法进行ChannelHandler的传递
    void fireChannelRead( ChannelHandlerContext ctx, List<Object> msgs, int numElements) ByteToMessageDecoder 该方法会对解析生成的JavaObject进行下层ChannelHandler的传递
    decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out) FixedLengthFrameDecoder 实际进行decode的方法,会将ByteBuf按照固定长度进行拆分成一组Object

    1.2 channelRead方法分析

    通过文章 Netty源码分析-ChannelPipeline 的分析,channelRead方法的调用是在ChannelPipeline中,入参msg会是一个ByteBuf。

    第一步:首先会实例化一个空的CodecOutputList,用于存放一会将要解码生成的对象。
    第二步:核心在于赋值cumulation,cumulation的类型也是ByteBuf,它与入参的msg会有什么不同呢?如果cumulation为null,会直接将msg的地址赋值给cumulation,否则会将cumulator.cumulate(ctx.alloc(), cumulation, data)方法返回值赋值给cumulation,看一下方法描述“Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.”,原来是将msg与cumulation进行一个merge。这就相当于将两个TCP包的数据进行了一个数据上的衔接。
    第三步:调用callDecode对cumulation进行解码。

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof ByteBuf) {
                CodecOutputList out = CodecOutputList.newInstance();
                try {
                    ByteBuf data = (ByteBuf) msg;
                    first = cumulation == null;
                    if (first) {
                        cumulation = data;
                    } else {
                        cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                    }
                    callDecode(ctx, cumulation, out);
                } catch (DecoderException e) {
                    throw e;
                } catch (Throwable t) {
                    throw new DecoderException(t);
                } finally {
                    if (cumulation != null && !cumulation.isReadable()) {
                        numReads = 0;
                        cumulation.release();
                        cumulation = null;
                    } else if (++ numReads >= discardAfterReads) {
                        // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                        // See https://github.com/netty/netty/issues/4275
                        numReads = 0;
                        discardSomeReadBytes();
                    }
    
                    int size = out.size();
                    decodeWasNull = !out.insertSinceRecycled();
                    fireChannelRead(ctx, out, size);
                    out.recycle();
                }
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    

    1.3 callDecode方法分析

    callDecode的核心流程就是对ByteBuf进行遍历,遍历的过程中不断调用decode方法解析出Object对象,并对解析出的对象执行fireChannelRead方法,保证Pipeline的往下传递。
    代码注释中会对过程进行分析。

        protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            try {
                while (in.isReadable()) {
                    int outSize = out.size();
                    if (outSize > 0) {
                        //说明有解码好的对象,对这些对象进行Pipeline的往下传递
                        fireChannelRead(ctx, out, outSize);
                        out.clear();
                        if (ctx.isRemoved()) {
                            break;
                        }
                        outSize = 0;
                    }
                    int oldInputLength = in.readableBytes();
                    //真正调用实际decode方法进行解码的地方
                    decodeRemovalReentryProtection(ctx, in, out);
                    if (ctx.isRemoved()) {
                        break;
                    }
                    // 说明没有解码到新对象,这时候如果ByteBuf没有移动,说明此次ByteBuf内容不足以解码,会直接break。
                    if (outSize == out.size()) {
                        if (oldInputLength == in.readableBytes()) {
                            break;
                        } else {
                            continue;
                        }
                    }
                    //解码到了对象,但是却没有移动ByteBuf,说明有问题
                    if (oldInputLength == in.readableBytes()) {
                        throw new DecoderException(
                                StringUtil.simpleClassName(getClass()) +
                                        ".decode() did not read anything but decoded a message.");
                    }
    
                    if (isSingleDecode()) {
                        break;
                    }
                }
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable cause) {
                throw new DecoderException(cause);
            }
        }
    

    1.4 fireChannelRead方法分析

    fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements)方法的入参是已经解码后产生的List<Object> ,会遍历这些Object,分别调用ctx.fireChannelRead(final Object msg)进行ChanelPipeline的往下传递。

    下边也列出了ctx.fireChannelRead(final Object msg)的代码实现,findContextInbound()会找到ctx的下一个AbstractChannelHandlerContext,将ChannelPipeline进行往后传递。

    static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
            if (msgs instanceof CodecOutputList) {
                fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
            } else {
                for (int i = 0; i < numElements; i++) {
                    ctx.fireChannelRead(msgs.get(i));
                }
            }
        }
    abstract class AbstractChannelHandlerContext{
        @Override
        public ChannelHandlerContext fireChannelRead(final Object msg) {
            invokeChannelRead(findContextInbound(), msg);
            return this;
        }
    }
    

    1.5 decode方法分析

    下面是类FixedLengthFrameDecoder的代码实现。可以看到非常简单:判断当前的ByteBuf长度够不够一个Frame的长度,如果不够不处理,否则会解码出一个Frame并添加至 List<Object> out 中,然后将ByteBuf指针前移frameLength长度。

        @Override
        protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            Object decoded = decode(ctx, in);
            if (decoded != null) {
                out.add(decoded);
            }
        }
        protected Object decode(
                @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            if (in.readableBytes() < frameLength) {
                return null;
            } else {
                return in.readRetainedSlice(frameLength);
            }
        }
    

    2. 其他实现

    上边的例子中,我们讲述了解决TCP粘包拆包的一个例子-分割成固定长度的Frame。在实际应用中,会根据应用层业务实体类型进行不同的decode解码,比如Http应用中需要解码出HttpRequest,thrift RPC调用中需要解码出ThriftObject等等。

    实际Netty已经帮助使用者做了非常多的工作,像常用的HttpRequestDecoder可以帮助我们解码出Http对象,XmlDecoder可以帮助我们解码出XML对象,类似的还有json对象解码、WebSocket对象解码等等。

    如果Netty已经提供的Decoder无法满足你的要求,你也可以实现自己的Decoder。过程非常简单,只需要继承ByteToMessageDecoder类并实现抽象方法decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out).

    相关文章

      网友评论

        本文标题:Netty源码分析-如何解决TCP粘包拆包问题

        本文链接:https://www.haomeiwen.com/subject/wszgjxtx.html