美文网首页Spring-Boot
Netty之八核心源码剖析

Netty之八核心源码剖析

作者: Java及SpringBoot | 来源:发表于2020-03-31 19:50 被阅读0次

个人专题目录


1. Netty之核心源码剖析

1.1 Netty 启动过程源码剖析

  1. 源码需要剖析到Netty 调用doBind 方法, 追踪到NioServerSocketChannel 的doBind
  2. 并且要Debug 程序到NioEventLoop 类的run 代码,无限循环,在服务器端运行。
@Override
protected void run() {
    for (;;) {
        try {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:
                    // fall-through to SELECT since the busy-wait is not supported with NIO

                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                handleLoopException(e);
                continue;
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}
  1. demo 源码的基本理解
//服务器启动类源码
public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    //创建业务线程池
    //这里我们就创建2个子线程
    static final EventExecutorGroup group = new DefaultEventExecutorGroup(2);

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoServerHandler());
                     //说明: 如果我们在addLast 添加handler ,前面有指定
                     //EventExecutorGroup, 那么该handler 会优先加入到该线程池中

                     //p.addLast(group, new EchoServerHandler());
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

说明:
1) 先看启动类:main 方法中,首先创建了关于SSL 的配置类。
2) 重点分析下创建了两个EventLoopGroup 对象:
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    (1) 这两个对象是整个Netty 的核心对象,可以说,整个Netty 的运作都依赖于他们。bossGroup 用于接受
Tcp 请求,他会将请求交给workerGroup ,workerGroup 会获取到真正的连接,然后和连接进行通信,比如读
写解码编码等操作。
    (2) EventLoopGroup 是事件循环组(线程组) 含有多个EventLoop, 可以注册channel ,用于在事件循
环中去进行选择(和选择器相关).。[debug 看]
    (3) new NioEventLoopGroup(1); 这个1 表示bossGroup 事件组有1 个线程你可以指定, 如果new
NioEventLoopGroup() 会含有默认个线程cpu 核数*2, 即可以充分的利用多核的优势,【可以dubug 一把】
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    会创建EventExecutor 数组children = new EventExecutor[nThreads]; //debug 一下
    每个元素的类型就是NIOEventLoop, NIOEventLoop 实现了EventLoop 接口和Executor 接口
try 块中创建了一个ServerBootstrap 对象,他是一个引导类,用于启动服务器和引导整个程序的初始化(看下源
码allows easy bootstrap of {@link ServerChannel} )。它和ServerChannel 关联, 而ServerChannel 继承了Channel,有一些方法remoteAddress 等[可以Debug 下]
随后,变量b 调用了group 方法将两个group 放入了自己的字段中,用于后期引导使用【debug 下group 方
法/**
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
*/】。
    (4) 然后添加了一个channel , 其中参数一个Class 对象, 引导类将通过这个Class 对象反射创建
ChannelFactory。然后添加了一些TCP 的参数。[说明:Channel 的创建在bind 方法,可以Debug 下bind ,会找
到channel = channelFactory.newChannel(); ]
    (5) 再添加了一个服务器专属的日志处理器handler。
    (6) 再添加一个SocketChannel(不是ServerSocketChannel)的handler。
    (7) 然后绑定端口并阻塞至连接成功。
    (8) 最后main 线程阻塞等待关闭。
    (9) finally 块中的代码将在服务器关闭时优雅关闭所有资源
//服务器端处理器源码
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    // group 就是充当业务线程池,可以将任务提交到该线程池
    // 这里我们创建了16个线程
    static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("EchoServer Handler 的线程是=" + Thread.currentThread().getName());

        //按照原来的方法处理耗时任务
        /*
        //解决方案1 用户程序自定义的普通任务

        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(5 * 1000);
                    //输出线程名
                    System.out.println("EchoServerHandler execute 线程是=" + Thread.currentThread().getName());
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));

                } catch (Exception ex) {
                    System.out.println("发生异常" + ex.getMessage());
                }
            }
        });

        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {

                try {
                    Thread.sleep(5 * 1000);
                    //输出线程名
                    System.out.println("EchoServerHandler execute 线程2是=" + Thread.currentThread().getName());
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));

                } catch (Exception ex) {
                    System.out.println("发生异常" + ex.getMessage());
                }
            }
        });*/

        /*
        //将任务提交到 group线程池
        group.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {

                //接收客户端信息
                ByteBuf buf = (ByteBuf) msg;
                byte[] bytes = new byte[buf.readableBytes()];
                buf.readBytes(bytes);
                String body = new String(bytes, "UTF-8");
                //休眠10秒
                Thread.sleep(10 * 1000);
                System.out.println("group.submit 的  call 线程是=" + Thread.currentThread().getName());
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
                return null;

            }
        });

        //将任务提交到 group线程池
        group.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {

                //接收客户端信息
                ByteBuf buf = (ByteBuf) msg;
                byte[] bytes = new byte[buf.readableBytes()];
                buf.readBytes(bytes);
                String body = new String(bytes, "UTF-8");
                //休眠10秒
                Thread.sleep(10 * 1000);
                System.out.println("group.submit 的  call 线程是=" + Thread.currentThread().getName());
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
                return null;

            }
        });


        //将任务提交到 group线程池
        group.submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {

                //接收客户端信息
                ByteBuf buf = (ByteBuf) msg;
                byte[] bytes = new byte[buf.readableBytes()];
                buf.readBytes(bytes);
                String body = new String(bytes, "UTF-8");
                //休眠10秒
                Thread.sleep(10 * 1000);
                System.out.println("group.submit 的  call 线程是=" + Thread.currentThread().getName());
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
                return null;

            }
        });*/


        //普通方式
        //接收客户端信息
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        String body = new String(bytes, "UTF-8");
        //休眠10秒
        Thread.sleep(10 * 1000);
        System.out.println("普通调用方式的 线程是=" + Thread.currentThread().getName());
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));

        System.out.println("go on ");

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        //cause.printStackTrace();
        ctx.close();
    }
}

说明:
1) 这是一个普通的处理器类,用于处理客户端发送来的消息,在我们这里,我们简单的解析出客户端传过
来的内容,然后打印,最后发送字符串给客户端。
2) 大致讲解了我们的demo 源码的作用。后面的debug 的时候会详细
  1. 分析EventLoopGroup 的过程
2.1 构造器方法
public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}
2.2 上面的this(nThreads, (Executor) null); 调用构造器(通过alt+d 看即可)
public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}
2.3 上面的this(nThreads, executor, SelectorProvider.provider()); 调用下面构造器
public NioEventLoopGroup(
        int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
2.4 上面的this ()...调用构造器(alt+d)
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
2.5 上面的super() .. 的方法是父类: MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
2.6 追踪到源码抽象类MultithreadEventExecutorGroup 的构造器方法MultithreadEventExecutorGroup 才是NioEventLoopGroup 真正的构造方法, 这里可以看成是一个模板方法,使用了设计模式的模板模式, 所以,我们就需要好好分析MultithreadEventExecutorGroup 方法了
2.7 分析MultithreadEventExecutorGroup
参数说明:
@param nThreads 使用的线程数,默认为core *2 [可以追踪源码]
@param executor 执行器:如果传入null,则采用Netty 默认的线程工厂和默认的执行器ThreadPerTaskExecutor
@param chooserFactory 单例new DefaultEventExecutorChooserFactory()
@param args args 在创建执行器的时候传入固定参数
    
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }
    //如果传入的执行器是空的则采用默认的线程工厂和默认的执行器
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    //创建指定线程数的执行器数组
    children = new EventExecutor[nThreads];
    //初始化线程数组
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            //创建new NioEventLoop
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            // 如果创建失败,优雅关闭
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };
    //为每一个单例线程池添加一个关闭监听器
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    //将所有的单例线程池添加到一个HashSet 中。
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}


说明:
1) 如果executor 是null,创建一个默认的ThreadPerTaskExecutor,使用Netty 默认的线程工厂。
2) 根据传入的线程数(CPU*2)创建一个线程池(单例线程池)数组。
3) 循环填充数组中的元素。如果异常,则关闭所有的单例线程池。
4) 根据线程选择工厂创建一个线程选择器。
5) 为每一个单例线程池添加一个关闭监听器。
6) 将所有的单例线程池添加到一个HashSet 中。
  1. ServerBootstrap 创建和构造过程
3.1 ServerBootstrap 是个空构造,但是有默认的成员变量
private final Map<ChannelOption<?>, Object> childOptions = new ConcurrentHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
//config 对象,会在后面起很大作用
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
3.2 分析一下ServerBootstrap 基本使用情况
ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
     .channel(NioServerSocketChannel.class)
     .option(ChannelOption.SO_BACKLOG, 100)
     .handler(new LoggingHandler(LogLevel.INFO))
     .childHandler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             if (sslCtx != null) {
                 p.addLast(sslCtx.newHandler(ch.alloc()));
             }
             //p.addLast(new LoggingHandler(LogLevel.INFO));
             p.addLast(new EchoServerHandler());
             //说明: 如果我们在addLast 添加handler ,前面有指定
             //EventExecutorGroup, 那么该handler 会优先加入到该线程池中

             //p.addLast(group, new EchoServerHandler());
         }
     });

说明:
1) 链式调用:group 方法,将boss 和worker 传入,boss 赋值给parentGroup 属性,worker 赋值给childGroup
属性
2) channel 方法传入NioServerSocketChannel class对象。会根据这个class 创建channel 对象。
3) option 方法传入TCP 参数,放在一个LinkedHashMap 中。
4) handler 方法传入一个handler 中,这个hanlder 只专属于ServerSocketChannel 而不是SocketChannel
5) childHandler 传入一个hanlder ,这个handler 将会在每个客户端连接的时候调用。供SocketChannel使用
    
  1. 绑定端口的分析
4.1 服务器就是在这个bind 方法里启动完成的
4.2 bind 方法代码, 追踪到创建了一个端口对象,并做了一些空判断, 核心代码doBind,我们看看
public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
4.3 doBind 源码剖析, 核心是两个方法initAndRegister 和doBind0
private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        //说明:执行doBind0 方法,完成对端口的绑定
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}

4.4 分析说明initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
    // 说明: channelFactory.newChannel() 方法的作用通过ServerBootstrap 的通道工厂反射创建一个NioServerSocketChannel, 具体追踪源码可以得到下面结论
    (1) 通过NIO 的SelectorProvider 的openServerSocketChannel 方法得到JDK 的channel。目
的是让Netty 包装JDK 的channel。
    (2) 创建了一个唯一的ChannelId,创建了一个NioMessageUnsafe,用于操作消息,创建了一
个DefaultChannelPipeline 管道,是个双向链表结构,用于过滤所有的进出的消息。
    (3) 创建了一个NioServerSocketChannelConfig 对象,用于对外展示一些配置。
    channel = channelFactory.newChannel();//NioServerSocketChannel
    //说明:init 初始化这个NioServerSocketChannel, 具体追踪源码可以得到如下结论
    (1) init 方法,这是个抽象方法(AbstractBootstrap 类的),由ServerBootstrap 实现(可以追一下源码//setChannelOptions(channel, options, logger);)。
    (2) 设置NioServerSocketChannel 的TCP 属性。
    (3) 由于LinkedHashMap 是非线程安全的,使用同步进行处理。
    (4) 对NioServerSocketChannel 的ChannelPipeline 添加ChannelInitializer 处理器。
    (5) 可以看出, init 的方法的核心作用在和ChannelPipeline 相关。
    (6) 从NioServerSocketChannel 的初始化过程中,我们知道,pipeline 是一个双向链表,并且,他本身就初始化了head 和tail,这里调用了他的addLast 方法,也就是将整个handler 插入到tail的前面,因为tail 永远会在后面,需要做一些系统的固定工作。
    init(channel);
} catch (Throwable t) {
    if (channel != null) {
        // channel can be null if newChannel crashed (eg SocketException("too many open files"))
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}

ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
    if (channel.isRegistered()) {
        channel.close();
    } else {
        channel.unsafe().closeForcibly();
    }
}

// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
//    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
//    added to the event loop's task queue for later execution.
//    i.e. It's safe to attempt bind() or connect() now:
//         because bind() or connect() will be executed *after* the scheduled registration task is executed
//         because register(), bind(), and connect() are all bound to the same thread.

return regFuture;
}
说明:
1) 基本说明: initAndRegister() 初始化NioServerSocketChannel 通道并注册各个handler,返回一个future
2) 通过ServerBootstrap 的通道工厂反射创建一个NioServerSocketChannel。
3) init 初始化这个NioServerSocketChannel。
4) config().group().register(channel) 通过ServerBootstrap 的bossGroup 注册NioServerSocketChannel。
5) 最后,返回这个异步执行的占位符即regFuture。
   
4.5 init 方法会调用addLast, 现在进入到addLast 方法内查看
    
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);

        newCtx = newContext(group, filterName(name, handler), handler);

        addLast0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventLoop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}
说明:
1) addLast 方法,在DefaultChannelPipeline 类中
2) addLast 方法这就是pipeline 方法的核心
3) 检查该handler 是否符合标准。
4) 创建一个AbstractChannelHandlerContext 对象, 这里说一下, ChannelHandlerContext 对象是
ChannelHandler 和ChannelPipeline 之间的关联,每当有ChannelHandler 添加到Pipeline 中时,都会创建
Context。Context 的主要功能是管理他所关联的Handler 和同一个Pipeline 中的其他Handler 之间的交互。
5) 将Context 添加到链表中。也就是追加到tail 节点的前面。
6) 最后,同步或者异步或者晚点异步的调用callHandlerAdded0 方法
    
4.6 前面说了dobind 方法有2 个重要的步骤,initAndRegister 说完,接下来看doBind0 方法, 代码如下

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                //bind 方法这里下断点,这里下断点!!
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}
说明:
1)该方法的参数为initAndRegister的future,NioServerSocketChannel,端口地址,NioServerSocketChannel 的promise
2) 这里就可以根据前面下的断点,一直debug:
将调用LoggingHandler 的invokeBind 方法, 最后会追到
//DefaultChannelPipeline 类的bind
//然后进入到unsafe.bind 方法debug , 注意要追踪到
//unsafe.bind , 要debug 第二圈的时候,才能看到.
@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.bind(localAddress, promise);
}
继续追踪AbstractChannel 的
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();
    try {
        //可以看到,这里最终的方法就是doBind 方法,执行成功后,执行通道的fireChannelActive 方法,告诉所有的handler,已经成功绑定。
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}
3) 最终doBind 就会追踪到NioServerSocketChannel 的doBind, 说明Netty 底层使用的是Nio
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}
4.7 回到bind方法(alt+v),最后一步:safeSetSuccess(promise),告诉promise 任务成功了。其可以执行监听器的
方法了。到此整个启动过程已经结束了,ok 了

  1. 继续atl+V 服务器就回进入到(NioEventLoop 类)一个循环代码,进行监听
@Override
protected void run() {
for (;;) {
try {
}

Netty启动过程梳理

  1. 创建2个 EventLoopGroup 线程池数组。数组默认大小CPU*2,方便chooser选择线程池时提高性能

  2. BootStrap 将 boss 设置为 group属性,将 worker 设置为 childer 属性

  3. 通过 bind 方法启动,内部重要方法为 initAndRegister 和 dobind 方法

  4. initAndRegister 方法会反射创建 NioServerSocketChannel 及其相关的 NIO 的对象, pipeline , unsafe,同时也为 pipeline 初始了 head 节点和 tail 节点。

  5. 在register0 方法成功以后调用在 dobind 方法中调用 doBind0 方法,该方法会 调用 NioServerSocketChannel 的 doBind 方法对 JDK 的 channel 和端口进行绑定,完成 Netty 服务器的所有启动,并开始监听连接事件

1.2 Netty 接受请求过程源码剖析

说明:

  1. 从之前服务器启动的源码中,我们得知,服务器最终注册了一个Accept 事件等待客户端的连接。我们也知道,
    NioServerSocketChannel 将自己注册到了boss 单例线程池(reactor 线程)上,也就是EventLoop 。

  2. EventLoop 的作用是一个死循环,而这个循环中做3 件事情:

    1. 有条件的等待Nio 事件。
    2. 处理Nio 事件。
    3. 处理消息队列中的任务。
    4. 仍用前面的项目来分析:进入到NioEventLoop 源码中后,在private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) 方法开始调试最终我们要分析到AbstractNioChannel 的doBeginRead 方法, 当到这
      个方法时,针对于这个客户端的连接就完成了,接下来就可以监听读事件了
  1. 断点位置NioEventLoop 的如下方法processSelectedKey
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();//断点位置
}
  1. 执行浏览器http://localhost:8007/,客户端发出请求
  2. 从的断点我们可以看到, readyOps 是16 ,也就是Accept 事件。说明浏览器的请求已经进来了。
  3. 这个unsafe 是boss 线程中NioServerSocketChannel 的AbstractNioMessageChannelNioMessageUnsafe 对象。我们进入到AbstractNioMessageChannelNioMessageUnsafe 的read 方法中
  4. read 方法代码并分析:
@Override
public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }

                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (exception != null) {
            closed = closeOnReadError(exception);

            pipeline.fireExceptionCaught(exception);
        }

        if (closed) {
            inputShutdown = true;
            if (isOpen()) {
                close(voidPromise());
            }
        }
    } finally {
        // Check if there is a readPending which was not processed yet.
        // This could be for two reasons:
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
        //
        // See https://github.com/netty/netty/issues/2254
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

说明:
1) 检查该eventloop 线程是否是当前线程。assert eventLoop().inEventLoop()
2) 执行doReadMessages 方法,并传入一个readBuf 变量,这个变量是一个List,也就是容器。
3) 循环容器,执行pipeline.fireChannelRead(readBuf.get(i));
4) doReadMessages 是读取boss 线程中的NioServerSocketChannel 接受到的请求。并把这些请求放进容器,
一会我们debug 下doReadMessages 方法.
5) 循环遍历容器中的所有请求,调用pipeline 的fireChannelRead 方法,用于处理这些接受的请求或者其
他事件,在read 方法中,循环调用ServerSocket 的pipeline 的fireChannelRead 方法, 开始执行管道中的
handler 的ChannelRead 方法(debug 进入)
  1. 追踪一下doReadMessages 方法, 就可以看得更清晰
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}
说明:
1) 通过工具类,调用NioServerSocketChannel 内部封装的serverSocketChannel 的accept 方法,这是Nio 做法。
2) 获取到一个JDK 的SocketChannel,然后,使用NioSocketChannel 进行封装。最后添加到容器中
3) 这样容器buf 中就有了NioSocketChannel [如果有兴趣可以追一下NioSocketChannel 是如何创建的,我就不追
了]
  1. 回到read 方法,继续分析循环执行pipeline.fireChannelRead 方法
1) 前面分析doReadMessages 方法的作用是通过ServerSocket 的accept 方法获取到Tcp 连接,然后封装成
Netty 的NioSocketChannel 对象。最后添加到容器中
2) 在read 方法中,循环调用ServerSocket 的pipeline 的fireChannelRead 方法, 开始执行管道中的handler
的ChannelRead 方法(debug 进入)
3) 经过dubug (多次),可以看到会反复执行多个handler 的ChannelRead ,我们知道,pipeline 里面又4个handler ,分别是Head,LoggingHandler,ServerBootstrapAcceptor,Tail。
4) 我们重点看看ServerBootstrapAcceptor。debug 之后,断点会进入到ServerBootstrapAcceptor 中来。我们来看看ServerBootstrapAcceptor 的channelRead 方法(要多次debug 才可以)
5) channelRead 方法
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        //将客户端连接注册到worker 线程池
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

说明:
1) msg 强转成Channel ,实际上就是NioSocketChannel 。
2) 添加NioSocketChannel 的pipeline 的handler,就是我们main 方法里面设置的childHandler方法里的。
3) 设置NioSocketChannel 的各种属性。
4) 将该NioSocketChannel 注册到childGroup 中的一个EventLoop 上,并添加一个监听器。
5) 这个childGroup 就是我们main 方法创建的数组workerGroup。
  1. 进入register 方法查看(步步追踪会到)
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    //进入到这里
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
继续进入到下面方法, 执行管道中可能存在的任务, 这里我们就不追了
  1. 最终会调用doBeginRead 方法,也就是AbstractNioChannel 类的方法
@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;//断点
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
  1. 这个地方调试时,请把前面的断点都去掉,然后启动服务器就会停止在doBeginRead(需要先放过该断点,然
    后浏览器请求,才能看到效果)
  2. 执行到这里时,针对于这个客户端的连接就完成了,接下来就可以监听读事件了

Netty 接受请求过程梳理

总体流程:接受连接----->创建一个新的NioSocketChannel----------->注册到一个worker EventLoop 上-------->
注册selecot Read 事件。

  1. 服务器轮询Accept 事件,获取事件后调用unsafe 的read 方法,这个unsafe 是ServerSocket 的内部类,该
    方法内部由2 部分组成
  2. doReadMessages 用于创建NioSocketChannel 对象,该对象包装JDK 的Nio Channel 客户端。该方法会像创
    建ServerSocketChanel 类似创建相关的pipeline , unsafe,config
  3. 随后执行执行pipeline.fireChannelRead 方法,并将自己绑定到一个chooser 选择器选择的workerGroup 中的
    一个EventLoop。并且注册一个0,表示注册成功,但并没有注册读(1)事件

1.3 Pipeline Handler HandlerContext 创建源码剖析

Netty 中的ChannelPipeline 、ChannelHandler 和ChannelHandlerContext 是非常核心的组件, 我们从源码来
分析Netty 是如何设计这三个核心组件的,并分析是如何创建和协调工作的.

  1. ChannelPipeline | ChannelHandler | ChannelHandlerContext 介绍
1.1 三者关系
1) 每当ServerSocket 创建一个新的连接,就会创建一个Socket,对应的就是目标客户端。
2) 每一个新创建的Socket 都将会分配一个全新的ChannelPipeline(以下简称pipeline)
3) 每一个ChannelPipeline 内部都含有多个ChannelHandlerContext(以下简称Context)
4) 他们一起组成了双向链表,这些Context 用于包装我们调用addLast 方法时添加的ChannelHandler(以下简称
handler)
    
1) ChannelSocket 和ChannelPipeline 是一对一的关联关系,而pipeline 内部的多个Context 形成了链
表,Context 只是对Handler 的封装。
2) 当一个请求进来的时候,会进入Socket 对应的pipeline,并经过pipeline 所有的handler,对,就是设计模式
中的过滤器模式。
    
1.2 ChannelPipeline 作用及设计
1) pipeline 的接口设计
    idea中可以看到该接口继承了inBound,outBound,Iterable 接口,表示他可以调用数据出站的方法和入站的方法,同时也能遍历内部的链表, 看看他的几个代表性的方法,基本上都是针对handler 链表的插入,追加,删除,替换操作,类似是一个LinkedList。同时,也能返回channel(也就是socket)
1) 在pipeline 的接口文档上,提供了一幅图
 * <pre>
 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+
 * </pre>
对上图的解释说明:
* 这是一个handler 的list,handler 用于处理或拦截入站事件和出站事件,pipeline 实现了过滤器的高级形
式,以便用户控制事件如何处理以及handler 在pipeline 中如何交互。
* 上图描述了一个典型的handler 在pipeline 中处理I/O 事件的方式,IO 事件由inboundHandler 或者
outBoundHandler 处理,并通过调用ChannelHandlerContext.fireChannelRead 方法转发给其最近的处理程序。
* 入站事件由入站处理程序以自下而上的方向处理,如图所示。入站处理程序通常处理由图底部的I / O 线程生成
入站数据。入站数据通常从如SocketChannel.read(ByteBuffer) 获取。
* 通常一个pipeline 有多个handler,例如,一个典型的服务器在每个通道的管道中都会有以下处理程序
协议解码器- 将二进制数据转换为Java 对象。
协议编码器- 将Java 对象转换为二进制数据。
业务逻辑处理程序- 执行实际业务逻辑(例如数据库访问)
* 你的业务程序不能将线程阻塞,会影响IO 的速度,进而影响整个Netty 程序的性能。如果你的业务程序很快,
就可以放在IO 线程中,反之,你需要异步执行。或者在添加handler 的时候添加一个线程池,例如:
// 下面这个任务执行的时候,将不会阻塞IO 线程,执行的线程来自group 线程池
pipeline.addLast(group,“handler”,new MyBusinessLogicHandler());

1.3 ChannelHandler 作用及设计
1) 源码
public interface ChannelHandler {
    //当把ChannelHandler 添加到pipeline 时被调用
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    //当从pipeline 中移除时调用
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
    // 当处理过程中在pipeline 发生异常时调用
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
2) ChannelHandler 的作用就是处理IO 事件或拦截IO 事件,并将其转发给下一个处理程序ChannelHandler。
Handler 处理事件时分入站和出站的,两个方向的操作都是不同的,因此,Netty 定义了两个子接口继承
ChannelHandler
2) ChannelInboundHandler 入站事件接口.idea中可查看
* channelActive 用于当Channel 处于活动状态时被调用;
* channelRead 当从Channel 读取数据时被调用等等方法。
* 程序员需要重写一些方法,当发生关注的事件,需要在方法中实现我们的业务逻辑,因为当事件发生时,Netty 会
回调对应的方法。
3) ChannelOutboundHandler 出站事件接口,idea中可查看关系图
* bind 方法,当请求将Channel 绑定到本地地址时调用
* close 方法,当请求关闭Channel 时调用等等
* 出站操作都是一些连接和写出数据类似的方法。
4) ChannelDuplexHandler 处理出站和入站事件,idea中可查看
* ChannelDuplexHandler 间接实现了入站接口并直接实现了出站接口。
* 是一个通用的能够同时处理入站事件和出站事件的类。

1.4 ChannelHandlerContext 作用及设计
1) ChannelHandlerContext UML图,idea中查看
ChannelHandlerContext 继承了出站方法调用接口和入站方法调用接口
1) ChannelOutboundInvoker 和ChannelInboundInvoker 部分源码
* 这两个invoker 就是针对入站或出站方法来的,就是在入站或出站handler 的外层再包装一层,达到在方法前
后拦截并做一些特定操作的目的
2) ChannelHandlerContext 部分源码
* ChannelHandlerContext 不仅仅时继承了他们两个的方法,同时也定义了一些自己的方法
* 这些方法能够获取Context 上下文环境中对应的比如channel,executor,handler ,pipeline,内存分配器,关
联的handler 是否被删除。
* Context 就是包装了handler 相关的一切,以方便Context 可以在pipeline 方便的操作handler
  1. ChannelPipeline | ChannelHandler | ChannelHandlerContext 创建过程
分为3 个步骤来看创建的过程:
* 任何一个ChannelSocket 创建的同时都会创建一个pipeline。
* 当用户或系统内部调用pipeline 的add*** 方法添加handler 时,都会创建一个包装这handler 的Context。
* 这些Context 在pipeline 中组成了双向链表。

2.1 Socket 创建的时候创建pipeline
在SocketChannel 的抽象父类AbstractChannel 的构造方法中
protected AbstractChannel(Channel parent) {
    this.parent = parent; //断点测试
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
Debug 一下, 可以看到代码会执行到这里, 然后继续追踪到
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}
说明:
1)将channel 赋值给channel 字段,用于pipeline 操作channel。
2)创建一个future 和promise,用于异步回调使用。
3)创建一个inbound 的tailContext,创建一个既是inbound 类型又是outbound 类型的headContext.
4)最后,将两个Context 互相连接,形成双向链表。
5)tailContext 和HeadContext 非常的重要,所有pipeline 中的事件都会流经他们

2.2 在add添加处理器的时候创建Context**
看下DefaultChannelPipeline 的addLast 方法如何创建的Context,代码如下
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    if (handlers == null) {//断点
        throw new NullPointerException("handlers");
    }

    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, null, h);
    }

    return this;
    }
继续Debug
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);

        newCtx = newContext(group, filterName(name, handler), handler);

        addLast0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventLoop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}
说明
1) pipeline 添加handler,参数是线程池,name 是null, handler 是我们或者系统传入的handler。Netty 为了防止
多个线程导致安全问题,同步了这段代码,步骤如下:
2) 检查这个handler 实例是否是共享的,如果不是,并且已经被别的pipeline 使用了,则抛出异常。
3) 调用newContext(group, filterName(name, handler), handler) 方法,创建一个Context。从这里可以看出来了,
每次添加一个handler 都会创建一个关联Context。
4) 调用addLast 方法,将Context 追加到链表中。
5) 如果这个通道还没有注册到selecor 上,就将这个Context 添加到这个pipeline 的待办任务中。当注册好了以
后,就会调用callHandlerAdded0 方法(默认是什么都不做,用户可以实现这个方法)。
6) 到这里,针对三对象创建过程,了解的差不多了,和最初说的一样,每当创建ChannelSocket 的时候都会创建
一个绑定的pipeline,一对一的关系,创建pipeline 的时候也会创建tail 节点和head 节点,形成最初的链表。tail
是入站inbound 类型的handler, head 既是inbound 也是outbound 类型的handler。在调用pipeline 的addLast
方法的时候,会根据给定的handler 创建一个Context,然后,将这个Context 插入到链表的尾端(tail 前面)。
到此就OK 了

Pipeline Handler HandlerContext 创建过程梳理

  1. 每当创建ChannelSocket 的时候都会创建一个绑定的pipeline,一对一的关系,创建pipeline 的时候也会创建
    tail 节点和head 节点,形成最初的链表。
  2. 在调用pipeline 的addLast 方法的时候,会根据给定的handler 创建一个Context,然后,将这个Context 插
    入到链表的尾端(tail 前面)。
  3. Context 包装handler,多个Context 在pipeline 中形成了双向链表
  4. 入站方向叫inbound,由head 节点开始,出站方法叫outbound ,由tail 节点开始

1.4 ChannelPipeline 调度handler 的源码剖析

  1. 当一个请求进来的时候,ChannelPipeline 是如何调用内部的这些handler 的呢?我们一起来分析下。
  2. 首先,当一个请求进来的时候,会第一个调用pipeline 的相关方法,如果是入站事件,这些方法由fire 开头,
    表示开始管道的流动。让后面的handler 继续处理

说明
当浏览器输入http://localhost:8007。可以看到会执行handler
在Debug 时,可以将断点下在DefaultChannelPipeline 类的

public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head); //断点
    return this;
}
  1. DefaultChannelPipeline 是如何实现这些fire 方法的
3.1 DefaultChannelPipeline 源码
说明:
可以看出来,这些方法都是inbound 的方法,也就是入站事件,调用静态方法传入的也是inbound 的类型head
handler。这些静态方法则会调用head 的ChannelInboundInvoker 接口的方法,再然后调用handler 的真正方
法

3.2 再看下piepline 的outbound 的fire方法实现源码
说明:
1) 这些都是出站的实现,但是调用的是outbound 类型的tail handler 来进行处理,因为这些都是outbound事件。
2) 出站是tail 开始,入站从head 开始。因为出站是从内部向外面写,从tail 开始,能够让前面的handler 进
行处理,防止handler 被遗漏,比如编码。反之,入站当然是从head 往内部输入,让后面的handler 能够处理这
些输入的数据。比如解码。因此虽然head 也实现了outbound 接口,但不是从head 开始执行出站任务
  1. 调度过程说明:
1) pipeline 首先会调用Context 的静态方法fireXXX,并传入Context
2) 然后,静态方法调用Context 的invoker 方法,而invoker 方法内部会调用该Context 所包含的
Handler 的真正的XXX 方法,调用结束后,如果还需要继续向后传递,就调用Context 的fireXXX2 方法,循环
往复。

ChannelPipeline 调度handler 梳理

  1. Context 包装handler,多个Context 在pipeline 中形成了双向链表,入站方向叫inbound,由head 节点开始,出站方法叫outbound ,由tail 节点开始。
  2. 而节点中间的传递通过AbstractChannelHandlerContext 类内部的fire 系列方法,找到当前节点的下一个节点
    不断的循环传播。是一个过滤器形式完成对handler 的调度

1.4 Netty 心跳(heartbeat)服务源码剖析

Netty 作为一个网络框架,提供了诸多功能,比如编码解码等,Netty 还提供了非常重要的一个服务-----心跳
机制heartbeat。通过心跳检查对方是否有效,这是RPC 框架中是必不可少的功能。下面我们分析一下Netty 内部心
跳服务源码实现。

说明
Netty 提供了IdleStateHandler ,ReadTimeoutHandler,WriteTimeoutHandler 三个Handler 检测连接的有效性,
重点分析IdleStateHandler .

ReadTimeout 事件和WriteTimeout 事件都会自动关闭连接,而且,属于异常处理,所以,这里只是介绍以
下,我们重点看IdleStateHandler。

  1. IdleStateHandler分析
1.1 4 个属性
private final boolean observeOutput; //是否考虑出站时较慢的情况。默认值是false
private final long readerIdleTimeNanos;//读事件空闲时间,0 则禁用事件
private final long writerIdleTimeNanos;//写事件空闲时间,0 则禁用事件
private final long allIdleTimeNanos;//读或写空闲时间,0 则禁用事件

1.2 handlerAdded 方法
当该handler 被添加到pipeline 中时,则调用initialize 方法
private void initialize(ChannelHandlerContext ctx) {
    // Avoid the case where destroy() is called before scheduling timeouts.
    // See: https://github.com/netty/netty/issues/143
    switch (state) {
    case 1:
    case 2:
        return;
    }

    state = 1;
    initOutputChanged(ctx);

    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
        //这里的schedule 方法会调用eventLoop 的schedule 方法,将定时任务添加进队列中
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}
只要给定的参数大于0,就创建一个定时任务,每个事件都创建。同时,将state 状态设置为1,防止重复初始化。
调用initOutputChanged 方法,初始化“监控出站数据属性”。
  
1.3 该类内部的3 个定时任务类
1) 这3 个定时任务分别对应读,写,读或者写事件。共有一个父类(AbstractIdleTask)。这个父类提供了一
个模板方法
private abstract static class AbstractIdleTask implements Runnable {

    private final ChannelHandlerContext ctx;

    AbstractIdleTask(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }

    @Override
    public void run() {
        if (!ctx.channel().isOpen()) {
            return;
        }

        run(ctx);
    }

    protected abstract void run(ChannelHandlerContext ctx);
}
说明: 当通道关闭了,就不执行任务了。反之,执行子类的run 方法
  1. 读事件的run 方法(即ReaderIdleTimeoutTask 的run 方法)分析
1) 代码及其说明
@Override
protected void run(ChannelHandlerContext ctx) {
    long nextDelay = readerIdleTimeNanos;
    if (!reading) {
        nextDelay -= ticksInNanos() - lastReadTime;
    }

    if (nextDelay <= 0) {
        // Reader is idle - set a new timeout and notify the callback.
        // 用于取消任务promise
        readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

        boolean first = firstReaderIdleEvent;
        firstReaderIdleEvent = false;

        try {
            //再次提交任务
            IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
            //触发用户handler use
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Read occurred before the timeout - set a new timeout with shorter delay.
        readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}
}
说明:
1) 得到用户设置的超时时间。
2) 如果读取操作结束了(执行了channelReadComplete 方法设置) ,就用当前时间减去给定时间和最后一次读(执操作的时间行了channelReadComplete 方法设置),如果小于0,就触发事件。反之,继续放入队
列。间隔时间是新的计算时间。
3) 触发的逻辑是:首先将任务再次放到队列,时间是刚开始设置的时间,返回一个promise 对象,用于做
取消操作。然后,设置first 属性为false ,表示,下一次读取不再是第一次了,这个属性在channelRead 方
法会被改成true。
4) 创建一个IdleStateEvent 类型的写事件对象,将此对象传递给用户的UserEventTriggered 方法。完成触
发事件的操作。
5) 总的来说,每次读取操作都会记录一个时间,定时任务时间到了,会计算当前时间和最后一次读的时间
的间隔,如果间隔超过了设置的时间,就触发UserEventTriggered 方法。//前面介绍IdleStateHandler 说过,
可以看一下
  1. 写事件的run 方法(即WriterIdleTimeoutTask 的run 方法)分析
1) run 代码和分析
@Override
protected void run(ChannelHandlerContext ctx) {

    long lastWriteTime = IdleStateHandler.this.lastWriteTime;
    long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
    if (nextDelay <= 0) {
        // Writer is idle - set a new timeout and notify the callback.
        writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

        boolean first = firstWriterIdleEvent;
        firstWriterIdleEvent = false;

        try {
            if (hasOutputChanged(ctx, first)) {
                return;
            }

            IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Write occurred before the timeout - set a new timeout with shorter delay.
        writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}

说明:
   写任务的run 代码逻辑基本和读任务的逻辑一样, 唯一不同的就是有一个针对出站较慢数据的判断hasOutputChanged
  1. 所有事件的run 方法(即AllIdleTimeoutTask 的run 方法)分析
代码分析
@Override
protected void run(ChannelHandlerContext ctx) {

    long nextDelay = allIdleTimeNanos;
    if (!reading) {
        nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
    }
    if (nextDelay <= 0) {
        // Both reader and writer are idle - set a new timeout and
        // notify the callback.
        allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);

        boolean first = firstAllIdleEvent;
        firstAllIdleEvent = false;

        try {
            if (hasOutputChanged(ctx, first)) {
                return;
            }

            IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Either read or write occurred before the timeout - set a new
        // timeout with shorter delay.
        allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}
说明:
1) 表示这个监控着所有的事件。当读写事件发生时,都会记录。代码逻辑和写事件的的基本一致:
2) 需要大家注意的地方是
long nextDelay = allIdleTimeNanos;
if (!reading) {
    // 当前时间减去最后一次写或读的时间,若大于0,说明超时了
    nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}
3) 这里的时间计算是取读写事件中的最大值来的。然后像写事件一样,判断是否发生了写的慢的情况。

小结Netty 的心跳机制

1) IdleStateHandler 可以实现心跳功能,当服务器和客户端没有任何读写交互时,并超过了给定的时间,则会
触发用户handler 的userEventTriggered 方法。用户可以在这个方法中尝试向对方发送信息,如果发送失败,则关
闭连接。
2) IdleStateHandler 的实现基于EventLoop 的定时任务,每次读写都会记录一个值,在定时任务运行的时候,
通过计算当前时间和设置时间和上次事件发生时间的结果,来判断是否空闲。
3) 内部有3 个定时任务,分别对应读事件,写事件,读写事件。通常用户监听读写事件就足够了。
4) 同时,IdleStateHandler 内部也考虑了一些极端情况:客户端接收缓慢,一次接收数据的速度超过了设置的
空闲时间。Netty 通过构造方法中的observeOutput 属性来决定是否对出站缓冲区的情况进行判断。
5) 如果出站缓慢,Netty 不认为这是空闲,也就不触发空闲事件。但第一次无论如何也是要触发的。因为第一
次无法判断是出站缓慢还是空闲。当然,出站缓慢的话,可能造成OOM , OOM 比空闲的问题更大。
6) 所以,当你的应用出现了内存溢出,OOM 之类,并且写空闲极少发生(使用了observeOutput 为true),
那么就需要注意是不是数据出站速度过慢。
7) 还有一个注意的地方:就是ReadTimeoutHandler ,它继承自IdleStateHandler,当触发读空闲事件的时候,
就触发ctx.fireExceptionCaught 方法,并传入一个ReadTimeoutException,然后关闭Socket。
8) 而WriteTimeoutHandler 的实现不是基于IdleStateHandler 的,他的原理是,当调用write 方法的时候,会
创建一个定时任务,任务内容是根据传入的promise 的完成情况来判断是否超出了写的时间。当定时任务根据指
定时间开始运行,发现promise 的isDone 方法返回false,表明还没有写完,说明超时了,则抛出异常。当write
方法完成后,会打断定时任务。

1.6 Netty 核心组件EventLoop 源码剖析

Echo 第一行代码就是: EventLoopGroup bossGroup = new NioEventLoopGroup(1); 下面分析其最核心的组件
EventLoop。

  1. EventLoop 介绍
1.1 首先看看NioEventLoop 的继承图,idea查看
说明重点:
1) ScheduledExecutorService 接口表示是一个定时任务接口,EventLoop 可以接受定时任务。
2) EventLoop 接口:Netty 接口文档说明该接口作用:一旦Channel 注册了,就处理该Channel 对应的所有
I/O 操作。
3) SingleThreadEventExecutor 表示这是一个单个线程的线程池
4) EventLoop 是一个单例的线程池,里面含有一个死循环的线程不断的做着3 件事情:监听端口,处理端口
事件,处理队列事件。每个EventLoop 都可以绑定多个Channel,而每个Channel 始终只能由一个EventLoop 来
处理
  1. NioEventLoop 的使用- execute 方法
2.1 execute 源码剖析
在EventLoop 的使用, 一般就是eventloop.execute(task); 看下execute方法的实现( 在SingleThreadEventExecutor 类中)
@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    addTask(task);
    if (!inEventLoop) {
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

说明:
1) 首先判断该EventLoop 的线程是否是当前线程,如果是,直接添加到任务队列中去,如果不是,则尝试
启动线程(但由于线程是单个的,因此只能启动一次),随后再将任务添加到队列中去。
2) 如果线程已经停止,并且删除任务失败,则执行拒绝策略,默认是抛出异常。
3) 如果addTaskWakesUp 是false,并且任务不是NonWakeupRunnable 类型的,就尝试唤醒selector。这
个时候,阻塞在selecor 的线程就会立即返回
4) 可以下断点来追踪

2.2 我们debug addTask 和offerTask 方法源码
protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (!offerTask(task)) {
        reject(task);
    }
}
final boolean offerTask(Runnable task) {
    if (isShutdown()) {
        reject();
    }
    return taskQueue.offer(task);
}
  1. NioEventLoop 的父类SingleThreadEventExecutor 的startThread 方法
3.1 当执行execute 方法的时候,如果当前线程不是EventLoop 所属线程,则尝试启动线程,也就是startThread 方
法,dubug 代码如下:
private void startThread() {
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}
说明:
该方法首先判断是否启动过了,保证EventLoop 只有一个线程,如果没有启动过,则尝试使用Cas 将state 状
态改为ST_STARTED,也就是已启动。然后调用doStartThread 方法。如果失败,则进行回滚
看下doStartThread 方法
private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                for (;;) {
                    int oldState = state;
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }

                // Check if confirmShutdown() was called at the end of the loop.
                if (success && gracefulShutdownStartTime == 0) {
                    if (logger.isErrorEnabled()) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                "be called before run() implementation terminates.");
                    }
                }

                try {
                    // Run all remaining tasks and shutdown hooks.
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                        cleanup();
                    } finally {
                        // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                        // the future. The user may block on the future and once it unblocks the JVM may terminate
                        // and start unloading classes.
                        // See https://github.com/netty/netty/issues/6596.
                        FastThreadLocal.removeAll();

                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        threadLock.countDown();
                        if (logger.isWarnEnabled() && !taskQueue.isEmpty()) {
                            logger.warn("An event executor terminated with " +
                                    "non-empty task queue (" + taskQueue.size() + ')');
                        }
                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });
}
说明:
1) 首先调用executor 的execute 方法,这个executor 就是在创建Event LoopGroup 的时候创建的
ThreadPerTaskExecutor 类。该execute 方法会将Runnable 包装成Netty 的FastThreadLocalThread。
2) 任务中,首先判断线程中断状态,然后设置最后一次的执行时间。
3) 执行当前NioEventLoop 的run 方法,注意:这个方法是个死循环,是整个EventLoop 的核心
4) 在finally 块中,使用CAS 不断修改state 状态,改成ST_SHUTTING_DOWN。也就是当线程Loop 结
束的时候。关闭线程。最后还要死循环确认是否关闭,否则不会break。然后,执行cleanup 操作,更新状
态为
5) ST_TERMINATED,并释放当前线程锁。如果任务队列不是空,则打印队列中还有多少个未完成的任务。
并回调terminationFuture 方法。
6) 其实最核心的就是Event Loop 自身的run 方法。再继续深入run 方法
  1. EventLoop 中的Loop 是靠run 实现的, 我们分析下run 方法(该方法在NioEventLoop)
@Override
protected void run() {
    for (;;) {
        try {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:
                    // fall-through to SELECT since the busy-wait is not supported with NIO

                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));

                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    //    'selector.select(...)'. (BAD)
                    // 2) Selector is waken up between 'selector.select(...)' and
                    //    'if (wakenUp.get()) { ... }'. (OK)
                    //
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...)' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...)' call to block
                    // unnecessarily.
                    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...).
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                handleLoopException(e);
                continue;
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

说明:
1) 从上面的步骤可以看出,整个run 方法做了3 件事情:
select 获取感兴趣的事件。
processSelectedKeys 处理事件。
runAllTasks 执行队列中的任务。
2) 上面的三个方法,我们就追一下select 方法(体现非阻塞)
核心select 方法解析
private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
        if (nextWakeupTime != normalizedDeadlineNanos) {
            nextWakeupTime = normalizedDeadlineNanos;
        }

        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
            // Selector#wakeup. So we need to check task queue again before executing select operation.
            // If we don't, the task might be pended until select operation was timed out.
            // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            //否则阻塞给定时间,默认一秒
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;
            // 如果1 秒后返回,有返回值|| select 被用户唤醒|| 任务队列有任务|| 有定时任务即将被执行; 则跳出循环
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // - Selected something,
                // - waken up by user, or
                // - the task queue has a pending task.
                // - a scheduled task is ready for processing
                break;
            }
            if (Thread.interrupted()) {
                // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                // As this is most likely a bug in the handler of the user or it's client library we will
                // also log it.
                //
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // timeoutMillis elapsed without anything selected.
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The code exists in an extra method to ensure the method is not too big to inline as this
                // branch is not very likely to get hit very frequently.
                selector = selectRebuildSelector(selectCnt);
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                        selectCnt - 1, selector);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                    selector, e);
        }
        // Harmless exception - log anyway
    }
}
说明:
调用selector 的select 方法,默认阻塞一秒钟,如果有定时任务,则在定时任务剩余时间的基础上在加上0.5
秒进行阻塞。当执行execute 方法的时候,也就是添加任务的时候,唤醒selecor,防止selecotr 阻塞时间过
长

EventLoop 作为Netty 的核心的运行机制小结

每次执行ececute 方法都是向队列中添加任务。当第一次添加时就启动线程,执行run 方法,而run 方法
是整个EventLoop 的核心,就像EventLoop 的名字一样,Loop Loop ,不停的Loop ,Loop 做什么呢?做3 件
事情。

  • 调用selector 的select 方法,默认阻塞一秒钟,如果有定时任务,则在定时任务剩余时间的基础上在加上0.5
    秒进行阻塞。当执行execute 方法的时候,也就是添加任务的时候,唤醒selecor,防止selecotr 阻塞时间过
    长。
  • 当selector 返回的时候,回调用processSelectedKeys 方法对selectKey 进行处理。
  • 当processSelectedKeys 方法执行结束后,则按照ioRatio 的比例执行runAllTasks 方法,默认是IO 任务时间和非IO 任务时间是相同的,你也可以根据你的应用特点进行调优。比如非IO 任务比较多,那么你就将ioRatio 调小一点,这样非IO 任务就能执行的长一点。防止队列积攒过多的任务。

1.7 handler 中加入线程池和Context 中添加线程池的源码剖析

  1. 在Netty 中做耗时的,不可预料的操作,比如数据库,网络请求,会严重影响Netty 对Socket 的处理速度。

  2. 而解决方法就是将耗时任务添加到异步线程池中。但就添加线程池这步操作来讲,可以有2 种方式,而且这2
    种方式实现的区别也蛮大的。

  3. 处理耗时业务的第一种方式---handler 中加入线程池

  4. 处理耗时业务的第二种方式---Context 中添加线程池

  5. 处理耗时业务的第一种方式--handler 种加入线程池

1.1 对前面的Netty demo 源码进行修改,在EchoServerHandler 的channelRead 方法进行异步

说明:
1) 在channelRead 方法,模拟了一个耗时10 秒的操作,这里,我们将这个任务提交到了一个自定义的业
务线程池中,这样,就不会阻塞Netty的IO 线程。

1.2 说明:
1) 解释一下,当IO 线程轮询到一个socket 事件,然后,IO 线程开始处理,当走到耗时handler 的时
候,将耗时任务交给业务线程池。
2) 当耗时任务执行完毕再执行pipeline write 方法的时候,(代码中使用的是context 的write 方法,执行pipeline 方法, 是一个意思)会将任务这个任务交给IO 线程

1.3 write 方法的源码(在AbstractChannelHandlerContext 类)
private void write(Object msg, boolean flush, ChannelPromise promise) {
    ObjectUtil.checkNotNull(msg, "msg");
    try {
        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return;
        }
    } catch (RuntimeException e) {
        ReferenceCountUtil.release(msg);
        throw e;
    }

    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        final AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        if (!safeExecute(executor, task, promise, m)) {
            // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
            // and put it back in the Recycler for re-use later.
            //
            // See https://github.com/netty/netty/issues/8343.
            task.cancel();
        }
    }
}
说明:
1) 当判定下个outbound 的executor 线程不是当前线程的时候,会将当前的工作封装成task ,然后放入
mpsc 队列中,等待IO 任务执行完毕后执行队列中的任务。
2) 这里可以Debug 来验证(提醒:Debug 时,服务器端Debug ,客户端Run 的方式),当我们使用了
group.submit(new Callable<Object>(){} 在handler 中加入线程池, 就会进入到safeExecute(executor, task,promise, m); 如果去掉这段代码,而使用普通方式来执行耗时的业务,那么就不会进入到safeExecute(executor,task, promise, m); (说明:普通方式执行耗时代码,看我准备好的案例即可)
  1. 处理耗时业务的第二种方式-Context 中添加线程池
1.1 在添加pipeline 中的handler 时候,添加一个线程池

说明:
1) handler 中的代码就使用普通的方式来处理耗时业务。
2) 当我们在调用addLast 方法添加线程池后,handler 将优先使用这个线程池,如果不添加,将使用IO 线
程
3) 当走到AbstractChannelHandlerContext 的invokeChannelRead 方法的时候,executor.inEventLoop() 是不会通过的,因为当前线程是 IO 线程 Contex(t 也就是 Handler)的 executor 是业务线程,所以会异步执行, debug
下源码

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {//执行run
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
4) 验证时, 我们如果去掉p.addLast(group,new EchoServerHandler() ); 改成p.addLast new
EchoServerHandler() ); 你会发现代码不会进行异步执行
5) 后面的整个流程就变成和第一个方式一样了

两种方式的比较

  • 第一种方式在handler 中添加异步,可能更加的自由,比如如果需要访问数据库,那我就异步,如果不需
    要,就不异步,异步会拖长接口响应时间。因为需要将任务放进mpscTask 中。如果IO 时间很短,task 很多,可
    能一个循环下来,都没时间执行整个task,导致响应时间达不到指标。
  • 第二种方式是Netty 标准方式(即加入到队列),但是,这么做会将整个handler 都交给业务线程池。不论
    耗时不耗时,都加入到队列里,不够灵活。
  • 各有优劣,从灵活性考虑,第一种较好

相关文章

网友评论

    本文标题:Netty之八核心源码剖析

    本文链接:https://www.haomeiwen.com/subject/vxpguhtx.html