美文网首页
netty源码分析(八) - 编码

netty源码分析(八) - 编码

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-10-27 19:55 被阅读0次

概述

前面两节分析了编码器,本节来看下netty的编码;以及netty是如何把JAVA对象转成字节流写出去的

示例代码

先来看一个简单的代码示例

服务端EchoServerHandler.channelRead构造对象返回
public class EchoServerHandler extends ChannelDuplexHandler {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        User user = new User("zzzliu", 18);
        ctx.channel().writeAndFlush(user);
    }
}

创建EchoServerEncodeHandler,编码格式为 length(4) + age(4) + age(可变长度)
/**
 * +---+----+------+----+
 * |    4   |  4   |  ?  |
 * +---+----+------+----+
 * | length | age | name |
 * +---+----+------+----+
 */
public class EchoServerEncodeHandler extends MessageToByteEncoder<User> {
    @Override
    protected void encode(ChannelHandlerContext ctx, User user, ByteBuf out) {
        out.writeInt(4 + user.getName().getBytes().length);
        out.writeInt(user.getAge());
        out.writeBytes(user.getName().getBytes());
    }
}

服务端启动加入handler
p.addLast(new EchoServerEncodeHandler(),  new EchoServerHandler());
  1. EchoServerHandler.channelRead接收客户端数据不做处理直接构造一个User对象交给pipeline传递writeAndFlush事件;
  2. EchoServerEncodeHandler继承MessageToByteEncoder<User>在重写encode里面实现编码逻辑
  3. EchoServerHandler中ctx.channel().writeAndFlush(user);开始写出逻辑

写出入口writeAndFlush

ctx.channel().writeAndFlush(user);从pipeline尾部节点TailContext开始传播

public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
    //通过MASK找到MASK_ONLY_OUTBOUND handler
    final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        //flush默认true
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }
}

void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        //相当于先调用write,再调用flush
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

总结一下就是先传播write事件(先ChannelOutboundBuffer的Entry链表上)紧接着再传播flush事件(再遍历链表调用jdkchannel写出到内核socket发送缓冲区中),下面分别对pipeline中重要的write和flush节点进行分析

MessageToByteEncoder.write

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        // 通过msg类型判断当前Handelr是否能处理写入的消息
        if (acceptOutboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            //强转成MessageToByteEncoder泛型
            I cast = (I) msg;
            // 分配ButeBuf
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
                // 调用子类encode(具体的编码逻辑)
                encode(ctx, cast, buf);
            } finally {
                // cast对象已经转换成ByteBuf了,释放掉
                ReferenceCountUtil.release(cast);
            }
            // 如果buf中写入了数据,就把buf传到下一个节点
            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
                // 否则,释放buf,将空数据传到下一个节点
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            // 如果当前节点不能处理传入的对象,直接扔给下一个节点处理
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        // 当buf在pipeline中处理完之后,释放
        if (buf != null) {
            buf.release();
        }
    }
}
  1. acceptOutboundMessage(msg)通过msg(MessageToByteEncoder的泛型)类型判断当前Handelr是否能处理写入的消息,不能处理的话直接传递到下一节点
  2. allocateBuffer(ctx, cast, preferDirect);分配ByteBuf(preferDirect默认为true即分配directBuf,否则分配heapBuf)
  3. 调用子类encode(示例代码中EchoServerEncodeHandler.encode)执行具体编码逻辑,把Java对象按照自定义字节流写到ByteBuf中
  4. ByteBuf有数据则传递到下一节点处理,否则释放ByteBuf
  5. buf.release();最后不论是否执行成功都要释放ByteBuf
  6. write事件最终传播到HeadContext节点

HeadContext.write

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

直接委托给unsafe处理AbstractUnsafe.write

public final void write(Object msg, ChannelPromise promise) {
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    int size;
    try {
        //把非直接内存转换成直接内存DirectBuffer
        msg = filterOutboundMessage(msg);
        //估算出需要写入的ByteBuf的size
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }
    //msg先插入outboundBuffer中队列
    outboundBuffer.addMessage(msg, size, promise);
}
  1. filterOutboundMessage把非直接内存转换成直接内存DirectBuffer
  2. pipeline.estimatorHandle().size(msg);估算出需要写入的ByteBuf的size
  3. addMessagemsg先插入outboundBuffer中队列

ChannelOutboundBuffer

public void addMessage(Object msg, int size, ChannelPromise promise) {
    //msg包装成Entry组成一个单向链表
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
    }
    tailEntry = entry;
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }
    //entry.pendingSize = size + 96
    //统计当前有多少字节需要被写出
    incrementPendingOutboundBytes(entry.pendingSize, false);
}
//第一个被写到操作系统Socket缓冲区中的节点
private Entry flushedEntry;
//第一个未被写入到操作系统Socket缓冲区中的节点
private Entry unflushedEntry;
//ChannelOutboundBuffer缓冲区的最后一个节点
private Entry tailEntry;
  1. msg包装成Entry组成一个单向链表,有多个Entry被添加之后链表flushedEntry = null; unflushedEntry到tailEntry之间为等待flush的msg
  2. incrementPendingOutboundBytes统计当前有多少字节需要被写出,如果已经超过阈值, 告诉调用者请不要再写入了,等flush后,会将阈值递减恢复,并发出可写事件

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }

    // 采用CAS更新newWriteBufferSize字段,
    // 判断当前是否超过阈值,超过则触发ChannelWritabilityChanged事件。
    //TOTAL_PENDING_SIZE_UPDATER:当前缓冲区有多少待写的字节
    //channel.config().getWriteBufferHighWaterMark():buf字节容量最大值,默认64K
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        setUnwritable(invokeLater);
    }
}

private void setUnwritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue | 1;
        //设置不可写状态为不可写
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue == 0 && newValue != 0) {
                //传播ChannelWritabilityChanged事件,表示buf已经写满,需要进行处理
                fireChannelWritabilityChanged(invokeLater);
            }
            break;
        }
    }
}

累加待flush字节数(newWriteBufferSize),如果大于阈值(默认64K),则设置不可写状态为不可写并触发fireChannelWritabilityChanged事件,表示buf已经写满,需要进行处理

HeadContext.flush

flush流程跟write类似,最终都是通过HeadContext进行处理;HeadContext委托到AbstractUnsafe.flush

public void flush(ChannelHandlerContext ctx) {
    unsafe.flush();
}

public final void flush() {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    //调整unFushedEntry、fushedEntry指针,变成unFushedEntry = null,fushedEntry -> tailEntry中间节点表示可以flush
    //说明当前outboundBuffer中所有Entry都可以flush
    outboundBuffer.addFlush();
    //进行flush
    flush0();
}
protected void flush0() {
    .........
    doWrite(outboundBuffer);
    .........
}

ChannelOutboundBuffer

public void addFlush() {
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                // Was cancelled so make sure we free up memory and notify about the freed bytes
                int pending = entry.cancel();
                //计数并判断是否需要改变可写状态
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);

        // All flushed so reset unflushedEntry
        unflushedEntry = null;
    }
}
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
    if (size == 0) {
        return;
    }
    //channel.config().getWriteBufferLowWaterMark():默认32K
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
        //设置不可写状态为可写,并传播fireChannelWritabilityChanged事件
        setWritable(invokeLater);
    }
}
public boolean remove() {
    //第一个flushedEntry节点
    Entry e = flushedEntry;
    if (e == null) {
        clearNioBuffers();
        return false;
    }
    Object msg = e.msg;

    ChannelPromise promise = e.promise;
    int size = e.pendingSize;
    //把当前Entry移除,同时flush指针后移
    removeEntry(e);

    if (!e.cancelled) {
        // only release message, notify and decrement if it was not canceled before.
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);
        decrementPendingOutboundBytes(size, false, true);
    }

    // 回收Entry
    e.recycle();

    return true;
}

AbstractNioByteChannel

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    //自旋锁次数,默认16次
    int writeSpinCount = config().getWriteSpinCount();
    do {
        //msg:flushedEntry指针
        Object msg = in.current();
        if (msg == null) {
            // 所有数据都flush完成,那么就将其写标志删除
            clearOpWrite();
            // Directly return here so incompleteWrite(...) is not called.
            return;
        }
        writeSpinCount -= doWriteInternal(in, msg);
    } while (writeSpinCount > 0);

    incompleteWrite(writeSpinCount < 0);
}


private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
        //buf内没有数据,直接remove
        if (!buf.isReadable()) {
            in.remove();
            return 0;
        }

        //将ByteBuf写出到jdk nio的Channel即通过jdk channel写到内核socket缓冲区,返回写入字节数
        final int localFlushedAmount = doWriteBytes(buf);
        if (localFlushedAmount > 0) {
            in.progress(localFlushedAmount);
            //buf数据写完了,从ChannelOutboundBuffer的Entry链表中删除
            if (!buf.isReadable()) {
                in.remove();
            }
            return 1;
        }
    }
}

相关文章

网友评论

      本文标题:netty源码分析(八) - 编码

      本文链接:https://www.haomeiwen.com/subject/azntvktx.html