美文网首页
netty源码分析(八) - 编码

netty源码分析(八) - 编码

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-10-27 19:55 被阅读0次

    概述

    前面两节分析了编码器,本节来看下netty的编码;以及netty是如何把JAVA对象转成字节流写出去的

    示例代码

    先来看一个简单的代码示例

    服务端EchoServerHandler.channelRead构造对象返回
    public class EchoServerHandler extends ChannelDuplexHandler {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            User user = new User("zzzliu", 18);
            ctx.channel().writeAndFlush(user);
        }
    }
    
    创建EchoServerEncodeHandler,编码格式为 length(4) + age(4) + age(可变长度)
    /**
     * +---+----+------+----+
     * |    4   |  4   |  ?  |
     * +---+----+------+----+
     * | length | age | name |
     * +---+----+------+----+
     */
    public class EchoServerEncodeHandler extends MessageToByteEncoder<User> {
        @Override
        protected void encode(ChannelHandlerContext ctx, User user, ByteBuf out) {
            out.writeInt(4 + user.getName().getBytes().length);
            out.writeInt(user.getAge());
            out.writeBytes(user.getName().getBytes());
        }
    }
    
    服务端启动加入handler
    p.addLast(new EchoServerEncodeHandler(),  new EchoServerHandler());
    
    1. EchoServerHandler.channelRead接收客户端数据不做处理直接构造一个User对象交给pipeline传递writeAndFlush事件;
    2. EchoServerEncodeHandler继承MessageToByteEncoder<User>在重写encode里面实现编码逻辑
    3. EchoServerHandler中ctx.channel().writeAndFlush(user);开始写出逻辑

    写出入口writeAndFlush

    ctx.channel().writeAndFlush(user);从pipeline尾部节点TailContext开始传播

    public final ChannelFuture writeAndFlush(Object msg) {
        return tail.writeAndFlush(msg);
    }
    
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        //通过MASK找到MASK_ONLY_OUTBOUND handler
        final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            //flush默认true
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
            if (!safeExecute(executor, task, promise, m, !flush)) {
                task.cancel();
            }
        }
    }
    
    void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            //相当于先调用write,再调用flush
            invokeWrite0(msg, promise);
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }
    

    总结一下就是先传播write事件(先ChannelOutboundBuffer的Entry链表上)紧接着再传播flush事件(再遍历链表调用jdkchannel写出到内核socket发送缓冲区中),下面分别对pipeline中重要的write和flush节点进行分析

    MessageToByteEncoder.write

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            // 通过msg类型判断当前Handelr是否能处理写入的消息
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                //强转成MessageToByteEncoder泛型
                I cast = (I) msg;
                // 分配ButeBuf
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    // 调用子类encode(具体的编码逻辑)
                    encode(ctx, cast, buf);
                } finally {
                    // cast对象已经转换成ByteBuf了,释放掉
                    ReferenceCountUtil.release(cast);
                }
                // 如果buf中写入了数据,就把buf传到下一个节点
                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    // 否则,释放buf,将空数据传到下一个节点
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                // 如果当前节点不能处理传入的对象,直接扔给下一个节点处理
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            // 当buf在pipeline中处理完之后,释放
            if (buf != null) {
                buf.release();
            }
        }
    }
    
    1. acceptOutboundMessage(msg)通过msg(MessageToByteEncoder的泛型)类型判断当前Handelr是否能处理写入的消息,不能处理的话直接传递到下一节点
    2. allocateBuffer(ctx, cast, preferDirect);分配ByteBuf(preferDirect默认为true即分配directBuf,否则分配heapBuf)
    3. 调用子类encode(示例代码中EchoServerEncodeHandler.encode)执行具体编码逻辑,把Java对象按照自定义字节流写到ByteBuf中
    4. ByteBuf有数据则传递到下一节点处理,否则释放ByteBuf
    5. buf.release();最后不论是否执行成功都要释放ByteBuf
    6. write事件最终传播到HeadContext节点

    HeadContext.write

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }
    

    直接委托给unsafe处理AbstractUnsafe.write

    public final void write(Object msg, ChannelPromise promise) {
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        int size;
        try {
            //把非直接内存转换成直接内存DirectBuffer
            msg = filterOutboundMessage(msg);
            //估算出需要写入的ByteBuf的size
            size = pipeline.estimatorHandle().size(msg);
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            ReferenceCountUtil.release(msg);
            return;
        }
        //msg先插入outboundBuffer中队列
        outboundBuffer.addMessage(msg, size, promise);
    }
    
    1. filterOutboundMessage把非直接内存转换成直接内存DirectBuffer
    2. pipeline.estimatorHandle().size(msg);估算出需要写入的ByteBuf的size
    3. addMessagemsg先插入outboundBuffer中队列

    ChannelOutboundBuffer

    public void addMessage(Object msg, int size, ChannelPromise promise) {
        //msg包装成Entry组成一个单向链表
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
        }
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }
        //entry.pendingSize = size + 96
        //统计当前有多少字节需要被写出
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }
    
    //第一个被写到操作系统Socket缓冲区中的节点
    private Entry flushedEntry;
    //第一个未被写入到操作系统Socket缓冲区中的节点
    private Entry unflushedEntry;
    //ChannelOutboundBuffer缓冲区的最后一个节点
    private Entry tailEntry;
    
    1. msg包装成Entry组成一个单向链表,有多个Entry被添加之后链表flushedEntry = null; unflushedEntry到tailEntry之间为等待flush的msg
    2. incrementPendingOutboundBytes统计当前有多少字节需要被写出,如果已经超过阈值, 告诉调用者请不要再写入了,等flush后,会将阈值递减恢复,并发出可写事件

    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }
    
        // 采用CAS更新newWriteBufferSize字段,
        // 判断当前是否超过阈值,超过则触发ChannelWritabilityChanged事件。
        //TOTAL_PENDING_SIZE_UPDATER:当前缓冲区有多少待写的字节
        //channel.config().getWriteBufferHighWaterMark():buf字节容量最大值,默认64K
        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }
    
    private void setUnwritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue | 1;
            //设置不可写状态为不可写
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue == 0 && newValue != 0) {
                    //传播ChannelWritabilityChanged事件,表示buf已经写满,需要进行处理
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }
    

    累加待flush字节数(newWriteBufferSize),如果大于阈值(默认64K),则设置不可写状态为不可写并触发fireChannelWritabilityChanged事件,表示buf已经写满,需要进行处理

    HeadContext.flush

    flush流程跟write类似,最终都是通过HeadContext进行处理;HeadContext委托到AbstractUnsafe.flush

    public void flush(ChannelHandlerContext ctx) {
        unsafe.flush();
    }
    
    public final void flush() {
        assertEventLoop();
    
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            return;
        }
        //调整unFushedEntry、fushedEntry指针,变成unFushedEntry = null,fushedEntry -> tailEntry中间节点表示可以flush
        //说明当前outboundBuffer中所有Entry都可以flush
        outboundBuffer.addFlush();
        //进行flush
        flush0();
    }
    
    protected void flush0() {
        .........
        doWrite(outboundBuffer);
        .........
    }
    

    ChannelOutboundBuffer

    public void addFlush() {
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                flushed ++;
                if (!entry.promise.setUncancellable()) {
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    int pending = entry.cancel();
                    //计数并判断是否需要改变可写状态
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);
    
            // All flushed so reset unflushedEntry
            unflushedEntry = null;
        }
    }
    
    private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
        if (size == 0) {
            return;
        }
        //channel.config().getWriteBufferLowWaterMark():默认32K
        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
        if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
            //设置不可写状态为可写,并传播fireChannelWritabilityChanged事件
            setWritable(invokeLater);
        }
    }
    
    public boolean remove() {
        //第一个flushedEntry节点
        Entry e = flushedEntry;
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;
    
        ChannelPromise promise = e.promise;
        int size = e.pendingSize;
        //把当前Entry移除,同时flush指针后移
        removeEntry(e);
    
        if (!e.cancelled) {
            // only release message, notify and decrement if it was not canceled before.
            ReferenceCountUtil.safeRelease(msg);
            safeSuccess(promise);
            decrementPendingOutboundBytes(size, false, true);
        }
    
        // 回收Entry
        e.recycle();
    
        return true;
    }
    

    AbstractNioByteChannel

    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        //自旋锁次数,默认16次
        int writeSpinCount = config().getWriteSpinCount();
        do {
            //msg:flushedEntry指针
            Object msg = in.current();
            if (msg == null) {
                // 所有数据都flush完成,那么就将其写标志删除
                clearOpWrite();
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }
            writeSpinCount -= doWriteInternal(in, msg);
        } while (writeSpinCount > 0);
    
        incompleteWrite(writeSpinCount < 0);
    }
    
    
    private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            //buf内没有数据,直接remove
            if (!buf.isReadable()) {
                in.remove();
                return 0;
            }
    
            //将ByteBuf写出到jdk nio的Channel即通过jdk channel写到内核socket缓冲区,返回写入字节数
            final int localFlushedAmount = doWriteBytes(buf);
            if (localFlushedAmount > 0) {
                in.progress(localFlushedAmount);
                //buf数据写完了,从ChannelOutboundBuffer的Entry链表中删除
                if (!buf.isReadable()) {
                    in.remove();
                }
                return 1;
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:netty源码分析(八) - 编码

          本文链接:https://www.haomeiwen.com/subject/azntvktx.html