美文网首页netty程序员
netty源码分析-注册及连接

netty源码分析-注册及连接

作者: 数齐 | 来源:发表于2017-09-09 22:05 被阅读49次

    线程池都准备好了,我们需要利用起来了。我们一客户端的connect为例讲述这个过程。下面是我们触发了链接这个动作

    ChannelFuture f = b.connect(host, port).sync();
    

    他里面是怎样的逻辑呢?

    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        //利用反射创建channel类,并且初始化它
        final ChannelFuture regFuture = initAndRegister(); 
        ... 
        //真正的链接服务端
        return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); 
        }
    }
    

    中间省略了好多代码,只留下关键的代码。首先我们回忆一下NIO的经典操作。首先创建一个channel,然后在selector上注册,并指明感兴趣的事件,随后selector就select了,等待感兴趣的事件到来,事件到达,处理请求。这是原生的NIO的处理过程,既然netty是基于nio的,顶多是帮助我们封装了这些操作而已,让我们可以更加舒服的利用netty的api处理网络的请求。看看上面的注释,基本上和我们的了解一致,至于是不是真的一致,那么久得继续往下看了。

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        channel = channelFactory.newChannel();  //利用反射创建对象
        init(channel);  //初始化,添加逻辑处理器,设置channel的Option与属性Attribute
        ...
        ChannelFuture regFuture = config().group().register(channel);  
        ...
        return regFuture;
    }
    

    利用反射创建了代码中我们指定的channel,init初始化,添加逻辑处理器,设置channel的Option与属性Attribute。我们更为关键的是看一下如果进行注册上篇文章也介绍了groupMultithreadEventLoopGroup的实例。

    ### io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    

    这个next方法就是我们的选择器发挥作用了,选择一个孩子来进行处理(负载均衡的考虑)。具体的是NioEventLoop的事例进行的register操作,他没有复写父类的方法,所以由父类SingleThreadEventLoop来具体处理

    ### io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    

    将channel包装成了DefaultChannelPromise的对象进行操作。

    ### io.netty.channel.AbstractChannel.AbstractUnsafe#register
    @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        ...  
        AbstractChannel.this.eventLoop = eventLoop;
        ...
        eventLoop.execute(new Runnable() {  //有具体的线程池进行处理,参数传递过来的
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
          ...
        }
    }
    

    老样子,省略好多代码,只留下重点。eventLoop是NioEnevtLoop的实例,所以看一下他的execute,同样的他没有复写这个方法,所以还是由父类提供

    ### io.netty.util.concurrent.SingleThreadEventExecutor#execute
    @Override
    public void execute(Runnable task) {
            ...
            startThread();   //开启线程
            addTask(task);   //处理请求
            ...  
    }
    
    ### io.netty.util.concurrent.SingleThreadEventExecutor#startThread
    private void startThread() {
       ...
                doStartThread();
        ...
    }
    
    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {  //重点关注这个executor
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
    
                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();  //SingleThreadEventExecutor.this是NioEventLoop的事例
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }
    
                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }
    
                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                                "non-empty task queue (" + taskQueue.size() + ')');
                            }
    
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }
    

    这里有一个细节点不能忽略就是executor.execute,我们要知道这个executor是啥,再创建NioEventLoopGroup时,有这样的逻辑

    ### io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
     
    protected ThreadFactory newDefaultThreadFactory() {
        return new DefaultThreadFactory(getClass());
    }
    
    ### io.netty.util.concurrent.DefaultThreadFactory
    @Override
    public Thread newThread(Runnable r) {
        Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
        try {
            if (t.isDaemon() != daemon) {
                t.setDaemon(daemon);
            }
    
            if (t.getPriority() != priority) {
                t.setPriority(priority);
            }
        } catch (Exception ignored) {
            // Doesn't matter even if failed to set.
        }
        return t;
    }
     
    private static final class DefaultRunnableDecorator implements Runnable {
    
        private final Runnable r;
    
        DefaultRunnableDecorator(Runnable r) {
            this.r = r;
        }
    
        @Override
        public void run() {
            try {
                r.run();
            } finally {
                FastThreadLocal.removeAll();
            }
        }
    }
    

    线程工厂创建线程的逻辑,线程池里面设置了线程工厂,那么线程池运行多线程任务的时候,其实是利用线程工厂创建线程来运行

    ### io.netty.util.concurrent.ThreadPerTaskExecutor
    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
            this.threadFactory = threadFactory;
        }
    
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();
        }
    }
    

    当线程池有任务过来时,会调用线程工厂创建线程,并且启动该线程来处理,我们看一下NioEventLoop的run方法

    
    @Override
    protected void run() {
        for (;;) {
             ...
             processSelectedKeys();   //处理Nio中的SelectedKeys   
             ...          
        }
    }
    ### 
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }
    
        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {  //如何是链接的请求,调用unsafe的finishConnect
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                unsafe.finishConnect();
            }
    
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
    
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {  
                unsafe.read();  //读取数据
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    
    

    好像终于和我们的NIO有点联系了。无非也就是等感兴趣的事件来了就处理,调用unsafe来处理,首先我们说一下unsafe,他是NioSocketChannelUnsafe的事例,而这个类继承了NioByteUnsafe,并且大部分的方法都是在NioByteUnSafe,我们比较关心她的读取数据的过程

    @Override
        public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);
    
            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        break;
                    }
    
                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);     //触发pipeline的生命周期方法,接收消息,处理消息
                    byteBuf = null;
                } while (allocHandle.continueReading());
    
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();
    
                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }
    

    调用pipeline的生命周期方,同时将数据传递过去,handler开始处理了。以上皆是处理了SelectionKey的过程。注册搞好了,我们就可以开始连接。在我们追踪下来,connect核心的代码

    doConnect(remoteAddress, localAddress)
    
    ### io.netty.channel.socket.nio.NioSocketChannel#doConnect
    @Override
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            doBind0(localAddress);
        }
    
        boolean success = false;
        try {
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }
    

    socket 链接远程服务器,因为是异步链接,所以connected为false,那么就注册了OP_CONNECT事件,这样,当连接事件做好之后,在线程组中会有无限循环,查询准备好的事件,连接事件好了,就会进行处理,同时触发声明周期的方法,进行流程的流转。
    以上。

    相关文章

      网友评论

        本文标题:netty源码分析-注册及连接

        本文链接:https://www.haomeiwen.com/subject/sppajxtx.html