美文网首页
EchoServer启动(2)

EchoServer启动(2)

作者: 追梦小蜗牛 | 来源:发表于2020-06-14 18:12 被阅读0次
    image.png

    代码分析

    接着上次代码继续梳理,上一次分析到如下这个方法:

        public static void fireChannelOpen(Channel channel) {
            // Notify the parent handler.
            if (channel.getParent() != null) {
                fireChildChannelStateChanged(channel.getParent(), channel);
            }
    
            channel.getPipeline().sendUpstream(@1
                    new UpstreamChannelStateEvent(
                            channel, ChannelState.OPEN, Boolean.TRUE));
        }
    

    @1处的代码其实就是在展现着事件将在UpstreamHandler里面按照顺序开始流动,从head开始一直到tail...

     * <pre>
     *
     *                                            I/O Request
     *                                          via {@link Channel} or
     *                                      {@link ChannelHandlerContext}
     *                                                |
     *  +---------------------------------------------+--------------------+
     *  |                       ChannelPipeline       |                    |
     *  |                                            \|/                   |
     *  |       +----------------------+  +-----------+------------+       |
     *  |  LAST | Upstream Handler  N  |  | Downstream Handler  M  | LAST  |
     *  |   .   +----------+-----------+  +-----------+------------+   .   |
     *  |   .             /|\                         |                .   |
     *  |   .              |                         \|/               .   |
     *  |   .   +----------+-----------+  +-----------+------------+   .   |
     *  |   .   | Upstream Handler N-1 |  | Downstream Handler M-1 |   .   |
     *  |   .   +----------+-----------+  +-----------+------------+   .   |
     *  |   .             /|\                         .                .   |
     *  |   .              .                          .                .   |
     *  |   .      [ Going UPSTREAM ]        [ Going DOWNSTREAM ]      .   |
     *  |   .              .                          .                .   |
     *  |   .              .                         \|/               .   |
     *  |   .   +----------+-----------+  +-----------+------------+   .   |
     *  |   .   | Upstream Handler  2  |  | Downstream Handler  2  |   .   |
     *  |   .   +----------+-----------+  +-----------+------------+   .   |
     *  |   .             /|\                         |                .   |
     *  |   .              |                         \|/               .   |
     *  |   .   +----------+-----------+  +-----------+------------+   .   |
     *  | FIRST | Upstream Handler  1  |  | Downstream Handler  1  | FIRST |
     *  |       +----------+-----------+  +-----------+------------+       |
     *  |                 /|\                         |                    |
     *  +------------------+--------------------------+--------------------+
     *                     |                         \|/
     *  +------------------+--------------------------+--------------------+
     *  |             I/O Threads (Transport Implementation)               |
     *  +------------------------------------------------------------------+
    

    进入到Pipeline的sendUpstream方法如下:

        void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
            try {
                ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);@2
            } catch (Throwable t) {
                notifyHandlerException(e, t);@3
            }
        }
    

    @2就是从DefaultChannelHandlerContext 上面获取与之关联的handler,然后调用对应的handleUpstream方法,就当前这个例子来说,第一个UpstreamHandler是EchoHandler,第二个是Binder,所以首先调用EchoHandler的handleUpstream方法,由于EchoHandler继承的是SimpleChannelUpstreamHandler,所以也就是调用SimpleChannelUpstreamHandler的handleUpstream方法,进入到handleUpstream方法内,如下:

    public void handleUpstream(
                ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
    
            if (e instanceof MessageEvent) {
                messageReceived(ctx, (MessageEvent) e);
            } else if (e instanceof WriteCompletionEvent) {
                WriteCompletionEvent evt = (WriteCompletionEvent) e;
                writeComplete(ctx, evt);
            } else if (e instanceof ChildChannelStateEvent) {
                ChildChannelStateEvent evt = (ChildChannelStateEvent) e;
                if (evt.getChildChannel().isOpen()) {
                    childChannelOpen(ctx, evt);
                } else {
                    childChannelClosed(ctx, evt);
                }
            } else if (e instanceof ChannelStateEvent) {
                ChannelStateEvent evt = (ChannelStateEvent) e;
                switch (evt.getState()) {
                case OPEN:
                    if (Boolean.TRUE.equals(evt.getValue())) {@4
                        channelOpen(ctx, evt);
                    } else {
                        channelClosed(ctx, evt);
                    }
                    break;
                case BOUND:
                    if (evt.getValue() != null) {
                        channelBound(ctx, evt);
                    } else {
                        channelUnbound(ctx, evt);
                    }
                    break;
                case CONNECTED:
                    if (evt.getValue() != null) {
                        channelConnected(ctx, evt);
                    } else {
                        channelDisconnected(ctx, evt);
                    }
                    break;
                case INTEREST_OPS:
                    channelInterestChanged(ctx, evt);
                    break;
                default:
                    ctx.sendDownstream(e);
                }
            } else if (e instanceof ExceptionEvent) {
                exceptionCaught(ctx, (ExceptionEvent) e);
            } else {
                ctx.sendUpstream(e);
            }
        }
    

    走的是@4分支,继续进去:

            public void sendUpstream(ChannelEvent e) {
                DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);@5
                if (next != null) {
                    DefaultChannelPipeline.this.sendUpstream(next, e);@6
                }
            }
    

    @5获取当前Pipeline的下一个handler也就是Binder,然后继续执行sendUpstream方法,当前的state仍是open,会调用到Binder的channelOpen方法,代码如下:

    @Override
            public void channelOpen(
                    ChannelHandlerContext ctx,
                    ChannelStateEvent evt) {
    
                try {
                    evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
    
                    // Split options into two categories: parent and child.
                    Map<String, Object> allOptions = getOptions();
                    Map<String, Object> parentOptions = new HashMap<String, Object>();
                    for (Entry<String, Object> e: allOptions.entrySet()) {
                        if (e.getKey().startsWith("child.")) {
                            childOptions.put(
                                    e.getKey().substring(6),
                                    e.getValue());
                        } else if (!e.getKey().equals("pipelineFactory")) {
                            parentOptions.put(e.getKey(), e.getValue());
                        }
                    }
    
                    // Apply parent options.
                    evt.getChannel().getConfig().setOptions(parentOptions);
                } finally {
                    ctx.sendUpstream(evt);@7
                }
    
                boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));@8
                assert finished;
            }
    

    @7处的代码是继续传递这个事件,@8处的代码就是把open的channel绑定到指定的url,然后把结果放到队列里面。进入到bind方法里面如下,这个方法主要是把bind这个事件发送给ChannelDownstreamHandler处理:

        public static ChannelFuture bind(Channel channel, SocketAddress localAddress) {
            if (localAddress == null) {
                throw new NullPointerException("localAddress");
            }
            ChannelFuture future = future(channel);
            channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(@9
                    channel, future, ChannelState.BOUND, localAddress));
            return future;
        }
    

    进入到@9sendDownstream方法里面如下:

        public void sendDownstream(ChannelEvent e) {
            DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
            if (tail == null) {
                try {
                    getSink().eventSunk(this, e);@10
                    return;
                } catch (Throwable t) {
                    notifyHandlerException(e, t);
                    return;
                }
            }
    
            sendDownstream(tail, e);
        }
    

    当前实例是没有DownstreamHandler的,所以代码会执行@10这里,然后进入到eventSunk和handleServerSocket方法:

        public void eventSunk(
                ChannelPipeline pipeline, ChannelEvent e) throws Exception {
            Channel channel = e.getChannel();
            if (channel instanceof NioServerSocketChannel) {
                handleServerSocket(e);
            } else if (channel instanceof NioSocketChannel) {
                handleAcceptedSocket(e);
            }
        }
    
    private void handleServerSocket(ChannelEvent e) {
            if (!(e instanceof ChannelStateEvent)) {
                return;
            }
    
            ChannelStateEvent event = (ChannelStateEvent) e;
            NioServerSocketChannel channel =
                (NioServerSocketChannel) event.getChannel();
            ChannelFuture future = event.getFuture();
            ChannelState state = event.getState();
            Object value = event.getValue();
    
            switch (state) {
            case OPEN:
                if (Boolean.FALSE.equals(value)) {
                    close(channel, future);
                }
                break;
            case BOUND:
                if (value != null) {
                    bind(channel, future, (SocketAddress) value);@11
                } else {
                    close(channel, future);
                }
                break;
            }
        }
    

    分支会走到@11分支,当前代码还在NioServerSocketPipelineSink类里面,然后进入到@11的方法如下:

    private void bind(
                NioServerSocketChannel channel, ChannelFuture future,
                SocketAddress localAddress) {
    
            boolean bound = false;
            boolean bossStarted = false;
            try {
                channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
                bound = true;
    
                future.setSuccess();@12
                fireChannelBound(channel, channel.getLocalAddress());@13
    
                Executor bossExecutor =
                    ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
                bossExecutor.execute(
                        new IoWorkerRunnable(
                                new ThreadRenamingRunnable(
                                        new Boss(channel),@14
                                        "New I/O server boss #" + id +
                                        " (channelId: " + channel.getId() +
                                        ", " + channel.getLocalAddress() + ')')));
                bossStarted = true;
            } catch (Throwable t) {
                future.setFailure(t);
                fireExceptionCaught(channel, t);
            } finally {
                if (!bossStarted && bound) {
                    close(channel, future);
                }
            }
        }
    

    @12处代码:Marks this future as a success and notifies all listeners
    @14处代码最终会运行Boss线程里面的run方法,内容如下:

    public void run() {
                final Thread currentThread = Thread.currentThread();
    
                for (;;) {
                    try {
                        if (selector.select(1000) > 0) {
                            selector.selectedKeys().clear();
                        }
    
                        SocketChannel acceptedSocket = channel.socket.accept();@15
                        if (acceptedSocket != null) {
                            registerAcceptedChannel(acceptedSocket, currentThread);@16
                        }
                    } catch (SocketTimeoutException e) {
                        // Thrown every second to get ClosedChannelException
                        // raised.
                    } catch (CancelledKeyException e) {
                        // Raised by accept() when the server socket was closed.
                    } catch (ClosedSelectorException e) {
                        // Raised by accept() when the server socket was closed.
                    } catch (ClosedChannelException e) {
                        // Closed as requested.
                        break;
                    } catch (IOException e) {
                        logger.warn(
                                "Failed to accept a connection.", e);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e1) {
                            // Ignore
                        }
                    }
                }
    
                closeSelector();
            }
    

    @15用来接收客户端来的请求,转换成代码就是一个SocketChannel 。
    进入@16registerAcceptedChannel的代码,内容如下:

    private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
                try {
                    ChannelPipeline pipeline =
                        channel.getConfig().getPipelineFactory().getPipeline();
                    NioWorker worker = nextWorker();@17
                    worker.register(new NioAcceptedSocketChannel(@18
                            channel.getFactory(), pipeline, channel,
                            NioServerSocketPipelineSink.this, acceptedSocket,
                            worker, currentThread), null);
                } catch (Exception e) {
                    logger.warn(
                            "Failed to initialize an accepted socket.", e);
                    try {
                        acceptedSocket.close();
                    } catch (IOException e2) {
                        logger.warn(
                                "Failed to close a partially accepted socket.",
                                e2);
                    }
                }
            }
    

    @17处的代码是根据一定的规则从NioWorker数组里面获取一个线程,来处理当前请求。
    @18进入register的代码里面:

    void register(NioSocketChannel channel, ChannelFuture future) {
    
            boolean server = !(channel instanceof NioClientSocketChannel);
            Runnable registerTask = new RegisterTask(channel, future, server);
            Selector selector;
    
            synchronized (startStopLock) {
                if (!started) {
                    // Open a selector if this worker didn't start yet.
                    try {
                        this.selector = selector = Selector.open();@19
                    } catch (Throwable t) {
                        throw new ChannelException(
                                "Failed to create a selector.", t);
                    }
    
                    // Start the worker thread with the new Selector.
                    String threadName =
                        (server ? "New I/O server worker #"
                                : "New I/O client worker #") + bossId + '-' + id;
    
                    boolean success = false;
                    try {
                        executor.execute(
                                new IoWorkerRunnable(
                                        new ThreadRenamingRunnable(this, threadName)));@20
                        success = true;
                    } finally {
                        if (!success) {
                            // Release the Selector if the execution fails.
                            try {
                                selector.close();
                            } catch (Throwable t) {
                                logger.warn("Failed to close a selector.", t);
                            }
                            this.selector = selector = null;
                            // The method will return to the caller at this point.
                        }
                    }
                } else {
                    // Use the existing selector if this worker has been started.
                    selector = this.selector;
                }
    
                assert selector != null && selector.isOpen();
    
                started = true;
                boolean offered = registerTaskQueue.offer(registerTask);
                assert offered;
            }
    
            if (wakenUp.compareAndSet(false, true)) {
                selector.wakeup();
            }
        }
    

    @19说明:Open a selector if this worker didn't start yet
    @20:会执行this线程的run方法,也就是NioWorker的run方法,内容如下:

    public void run() {
            thread = Thread.currentThread();
    
            boolean shutdown = false;
            Selector selector = this.selector;
            for (;;) {
                wakenUp.set(false);
    
                if (CONSTRAINT_LEVEL != 0) {
                    selectorGuard.writeLock().lock();
                        // This empty synchronization block prevents the selector
                        // from acquiring its lock.
                    selectorGuard.writeLock().unlock();
                }
    
                try {
                    int selectedKeyCount = selector.select(500);
    
                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    //    'selector.select(...)'. (BAD)
                    // 2) Selector is waken up between 'selector.select(...)' and
                    //    'if (wakenUp.get()) { ... }'. (OK)
                    //
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...)' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...)' call to block
                    // unnecessarily.
                    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...).
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).
    
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
    
                    processRegisterTaskQueue();@21
                    processWriteTaskQueue();@22
    
                    if (selectedKeyCount > 0) {
                        processSelectedKeys(selector.selectedKeys());@23
                    }
    
                    // Exit the loop when there's nothing to handle.
                    // The shutdown flag is used to delay the shutdown of this
                    // loop to avoid excessive Selector creation when
                    // connections are registered in a one-by-one manner instead of
                    // concurrent manner.
                    if (selector.keys().isEmpty()) {
                        if (shutdown ||
                            executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
    
                            synchronized (startStopLock) {
                                if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
                                    started = false;
                                    try {
                                        selector.close();
                                    } catch (IOException e) {
                                        logger.warn(
                                                "Failed to close a selector.", e);
                                    } finally {
                                        this.selector = null;
                                    }
                                    break;
                                } else {
                                    shutdown = false;
                                }
                            }
                        } else {
                            // Give one more second.
                            shutdown = true;
                        }
                    } else {
                        shutdown = false;
                    }
                } catch (Throwable t) {
                    logger.warn(
                            "Unexpected exception in the selector loop.", t);
    
                    // Prevent possible consecutive immediate failures that lead to
                    // excessive CPU consumption.
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore.
                    }
                }
            }
        }
    

    @21和@22处理Queue里面的任务,@23处理具体的读或者写请求,也就是processSelectedKeys这个方法的内容如下:

    private static void processSelectedKeys(Set<SelectionKey> selectedKeys) {
            for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
                SelectionKey k = i.next();
                i.remove();
                try {
                    int readyOps = k.readyOps();
                    if ((readyOps & SelectionKey.OP_READ) != 0) {
                        if (!read(k)) {
                            // Connection already closed - no need to handle write.
                            continue;
                        }
                    }
                    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                        write(k);
                    }
                } catch (CancelledKeyException e) {
                    close(k);
                }
            }
        }
    

    processSelectedKeys这个方法里面主要是遍历SelectionKey,然后处理具体的read或者write请求,东西还不少,放在下一篇内容里面继续分析......

    总结:
    感觉看是一个简单的东西,其实没那么简单,底层做了太多太多的东西,设计者的思想都在底层,任何看起来简单的东西都不会太简单的,慢慢体会吧。

    相关文章

      网友评论

          本文标题:EchoServer启动(2)

          本文链接:https://www.haomeiwen.com/subject/ddzxxktx.html