美文网首页
netty-unsafe组件

netty-unsafe组件

作者: leiwingqueen | 来源:发表于2019-07-12 00:36 被阅读0次

    一、unsafe是什么?

    在netty中一个很核心的组件,封装了java底层的socket操作,作为连接netty和java 底层nio的重要桥梁。

    二、unsafe初始化

    我们还是从NioServerSocketChannel(服务端channel的NIO实现)开始看,channel会初始化unsafe来和socket打交道。


    NioServerSocketChannel类图

    我们跟踪NioServerSocketChannel的构造函数,需要调用一个newUnsafe的方法构造unsafe的对象。

    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    ...
    @Override
        public Unsafe unsafe() {
            return unsafe;
        }
    
        /**
         * Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
         */
        protected abstract AbstractUnsafe newUnsafe();
    ...
    

    调用子类AbstractNioMessageChannel

    @Override
        protected AbstractNioUnsafe newUnsafe() {
            return new NioMessageUnsafe();
        }
    

    最后我们看到unsafe的内部类实现。

    private final class NioMessageUnsafe extends AbstractNioUnsafe {
    
            private final List<Object> readBuf = new ArrayList<Object>();
    
            @Override
            public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                allocHandle.reset(config);
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        do {
                            int localRead = doReadMessages(readBuf);
                            if (localRead == 0) {
                                break;
                            }
                            if (localRead < 0) {
                                closed = true;
                                break;
                            }
    
                            allocHandle.incMessagesRead(localRead);
                        } while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
    
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    
        @Override
        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            final SelectionKey key = selectionKey();
            final int interestOps = key.interestOps();
    
            for (;;) {
                Object msg = in.current();
                if (msg == null) {
                    // Wrote all messages.
                    if ((interestOps & SelectionKey.OP_WRITE) != 0) {
                        key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
                    }
                    break;
                }
                try {
                    boolean done = false;
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
                        if (doWriteMessage(msg, in)) {
                            done = true;
                            break;
                        }
                    }
    
                    if (done) {
                        in.remove();
                    } else {
                        // Did not write all messages.
                        if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                            key.interestOps(interestOps | SelectionKey.OP_WRITE);
                        }
                        break;
                    }
                } catch (Exception e) {
                    if (continueOnWriteError()) {
                        in.remove(e);
                    } else {
                        throw e;
                    }
                }
            }
        }
    

    代码有点长,我们这里先不深究。

    三、unsafe实现

    NioMessageUnsafe类图

    先分析unsafe的接口

    /**
         * <em>Unsafe</em> operations that should <em>never</em> be called from user-code. These methods
         * are only provided to implement the actual transport, and must be invoked from an I/O thread except for the
         * following methods:
         * <ul>
         *   <li>{@link #localAddress()}</li>
         *   <li>{@link #remoteAddress()}</li>
         *   <li>{@link #closeForcibly()}</li>
         *   <li>{@link #register(EventLoop, ChannelPromise)}</li>
         *   <li>{@link #deregister(ChannelPromise)}</li>
         *   <li>{@link #voidPromise()}</li>
         * </ul>
         */
        interface Unsafe {
    
            /**
             * Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when
             * receiving data.
             */
            RecvByteBufAllocator.Handle recvBufAllocHandle();
    
            /**
             * Return the {@link SocketAddress} to which is bound local or
             * {@code null} if none.
             */
            SocketAddress localAddress();
    
            /**
             * Return the {@link SocketAddress} to which is bound remote or
             * {@code null} if none is bound yet.
             */
            SocketAddress remoteAddress();
    
            /**
             * Register the {@link Channel} of the {@link ChannelPromise} and notify
             * the {@link ChannelFuture} once the registration was complete.
             */
            void register(EventLoop eventLoop, ChannelPromise promise);
    
            /**
             * Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify
             * it once its done.
             */
            void bind(SocketAddress localAddress, ChannelPromise promise);
    
            /**
             * Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.
             * If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just
             * pass {@code null} to it.
             *
             * The {@link ChannelPromise} will get notified once the connect operation was complete.
             */
            void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    
            /**
             * Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the
             * operation was complete.
             */
            void disconnect(ChannelPromise promise);
    
            /**
             * Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the
             * operation was complete.
             */
            void close(ChannelPromise promise);
    
            /**
             * Closes the {@link Channel} immediately without firing any events.  Probably only useful
             * when registration attempt failed.
             */
            void closeForcibly();
    
            /**
             * Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the
             * {@link ChannelPromise} once the operation was complete.
             */
            void deregister(ChannelPromise promise);
    
            /**
             * Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the
             * {@link ChannelPipeline}.  If there's already a pending read operation, this method does nothing.
             */
            void beginRead();
    
            /**
             * Schedules a write operation.
             */
            void write(Object msg, ChannelPromise promise);
    
            /**
             * Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.
             */
            void flush();
    
            /**
             * Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.
             * It will never be notified of a success or error and so is only a placeholder for operations
             * that take a {@link ChannelPromise} as argument but for which you not want to get notified.
             */
            ChannelPromise voidPromise();
    
            /**
             * Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
             */
            ChannelOutboundBuffer outboundBuffer();
        }
    

    跟socket交互的核心接口,不建议业务代码直接调用(不然netty还有什么意义呢?)。
    我们选择register的接口来看下。

    void register(EventLoop eventLoop, ChannelPromise promise);

    AbstractChannel.AbstractUnsafe

    @Override
            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                ...
                AbstractChannel.this.eventLoop = eventLoop;
    
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                      ...
                    }
                }
            }
    
            private void register0(ChannelPromise promise) {
                try {
                   ...
                    boolean firstRegistration = neverRegistered;
    //我们关注这个方法的实现
                    doRegister();
                    neverRegistered = false;
                    registered = true;
    ...
            }
    

    AbstractNioChannel.doRegister。这个方法最后调用的是java nio的register方法。

    @Override
        protected void doRegister() throws Exception {
            ...
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
             ...
        }
    

    四、总结

    unsafe是netty的核心组件,负责跟java nio底层打交道。unsafe以内部类的方式实现,同时避免了上层业务的调用。

    相关文章

      网友评论

          本文标题:netty-unsafe组件

          本文链接:https://www.haomeiwen.com/subject/udfskctx.html