美文网首页
Netty学习笔记(一)组件

Netty学习笔记(一)组件

作者: 云师兄 | 来源:发表于2018-10-28 17:29 被阅读8次

    Channel,EventLoop和ChannelFuture类构成了Netty网络抽象的代表:

    • Channel:对应Socket
    • EventLoop:对应控制流,多线程处理,并发
    • ChannelFuture:对应异步通知

    Channel接口

    Channel是对Socket的封装,大大降低了直接使用Socket的复杂性。

    EventLoop接口

    EventLoop用于处理连接的生命周期中所发生的事件。在服务端编程中,EventLoop起到了监听端口连接和数据读取的工作。

    ChannelFuture接口

    Netty中的所有IO操作都是异步的,一个操作不会立即返回,我们需要在执行操作之后的某个时间点确定其结果的方法,即ChannelFuture接口,其addListener方法注册了一个ChannelFutureListener,一遍在某个操作完成时得到通知。

    ChannelHandler和ChannelPipeline

    ChannelHandler充当了所有处理入站和出站数据的应用程序逻辑的容器。ChannelHandler常常实现了传入数据包拆分以及业务逻辑控制的功能。

    ChannelPipeline接口

    ChannelPipeline为ChannelHandler链提供容器,并定义了用于在该链上传播入站和出站事件流的API。即ChannelPipeline其实就是一个保存很多ChannelHandler的链表。

    BootStrap接口

    Netty的引导类为应用程序的网络层配置提供了容器。有两种类型的引导:一种用户客户端,称为Bootstrap,另一种称为用于服务器,称为ServerBootstrap。

    示例

    上面提到的这些接口在实际使用过程中并不是挨个使用的,而是使用BootStrap接口来将需要的接口组织在一起,下面是个基于netty的服务端的例子:

    public class Server {
        public static void main(String[] args) throws Exception {
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childOption(ChannelOption.TCP_NODELAY, true)
                        .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
                        .handler(new ServerHandler())
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) {
                            }
                        });
                ChannelFuture f = b.bind(8888).sync();
                f.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    public class ServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            System.out.println("channelActive");
        }
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) {
            System.out.println("channelRegistered");
        }
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) {
            System.out.println("handlerAdded");
        }
        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            super.channelRead(ctx, msg);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    // 耗时的操作
                    String result = loadFromDB();
                    ctx.channel().writeAndFlush(result);
                    ctx.executor().schedule(new Runnable() {
                        @Override
                        public void run() {
                            // ...
                        }
                    }, 1, TimeUnit.SECONDS);
                }
            }).start();
        }
        private String loadFromDB() {
            return "hello world!";
        }
    }
    

    下面就针对上面使用的ServerBootstrap类来分析Netty是如何来封装java nio相关的操作的。

    Netty 如何获取NIO中需要的Channel

    首先从ChannelFuture f = b.bind(8888).sync();中的bind方法开始分析。
    这个bind方法经过层层调用,最终会调用initAndRegister方法:

    private ChannelFuture doBind(final SocketAddress localAddress) {
      final ChannelFuture regFuture = initAndRegister();
      ...
    }
    final ChannelFuture initAndRegister() {
      Channel channel = null;
      try {
        channel = channelFactory.newChannel();
        ...
    

    从上述源码中可以看到channel来自channelFactory的newChannel方法。ChannelFactory是一个接口,我们只能从它的实现类ReflectiveChannelFactory来看下具体实现了:

        private final Class<? extends T> clazz;
      
        @Override
        public T newChannel() {
            try {
                return clazz.newInstance();
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }
    

    可以看出这个工厂方法的newChannel其实就是根据clazz属性来反射出具体的对象。那当前代码中的clazz属性值是多少呢?回到一开始示例执行的channel(NioServerSocketChannel.class)这句代码,这个channel方法实现如下:

    public B channel(Class<? extends C> channelClass) {
       return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }
    

    所以ServerBootstrapy引导类中使用的通道就是channel方法中设置的NioServerSocketChannel通道。

    NioServerSocketChannel实现

    接下来就要来看下Netty的NioServerSocketChannel内部是怎么调用java NIO中的ServerSocketChannel的。
    NioServerSocketChannel内部实现步骤如下:

    • newSocket() 通过jdk来创建底层jdk channel
    • AbstractNioChannel() 方法中调用configureBlocking(false) 设置通道为非阻塞模式
    • AbstractNioChannel() 方法创建id,unsafe,pipeline
      接下面就上面几个点来看下NioServerSocketChannel类的源码。
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    public NioServerSocketChannel() {
       this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
         return provider.openServerSocketChannel();
        } catch (IOException e) {
         throw new ChannelException("Failed to open a server socket.", e);
        }
    

    从NioServerSocketChannel的构造函数可以看出java NIO的SelectorProvider.provider().openServerSocketChannel()生成了ServerSocketChannel通道。
    接下来看第二个点,即在哪设置通道的非阻塞模式,NioServerSocketChannel构造函数如下:

    public NioServerSocketChannel(ServerSocketChannel channel) {
      super(null, channel, SelectionKey.OP_ACCEPT);
      ...
    }
    

    super方法调用了父类AbstractNioChannel的构造函数如下:

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
       super(parent);
       ...
       ch.configureBlocking(false);
       ...
    

    所以NioServerSocketChannel的构造函数中将通道设置了非阻塞模式。除了设置为非阻塞模式外,还调用了父类的构造函数super(parent),具体如下:

        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
        protected DefaultChannelPipeline newChannelPipeline() {
            return new DefaultChannelPipeline(this);
        }
        protected DefaultChannelPipeline(Channel channel) {
            this.channel = ObjectUtil.checkNotNull(channel, "channel");
            succeededFuture = new SucceededChannelFuture(channel, null);
            voidPromise =  new VoidChannelPromise(channel, true);
    
            tail = new TailContext(this);
            head = new HeadContext(this);
    
            head.next = tail;
            tail.prev = head;
        }
    

    从上面这段代码中可以看出在构造函数中创建了一个新的pipeline,并且这个链表一开始是空的。

    在介绍了ServerBootstrap中如何调用java NIO中的ServerSocketChannel的实现后,我们接下来看ServerBootstrap是如何初始化服务器Channel的。还是从示例中ChannelFuture f = b.bind(8888).sync();中bind方法中的initAndRegister方法中的代码开始分析:

    final ChannelFuture initAndRegister() {
      Channel channel = null;
      try {
        channel = channelFactory.newChannel();
        init(channel);
      }
      ...
    }
    abstract void init(Channel channel) throws Exception;
    

    之前我们分析了从channelFactory中获取ServerChannel通道的逻辑,这里再分析下ServerBootstrap中这个init方法:

    void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options0();
            synchronized (options) {
                channel.config().setOptions(options);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs0();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

    init方法首先将传入的childOption,childAttr属性保存到ServerBootstrap对象中;另外又把用户通过handler方法设置的handler对象添加到pipeline尾部,然后再在尾部添加一个ServerBootstrapAcceptor对象,里面包装了用户通过childHandler方法传入的对象。

    注册Selector

    上面主要学习了netty内部是如何封装了ServerSocketChannel等NIO相关的操作的,接下来再继续看下netty内部是如何封装NIO中注册选择器selector的。
    回到我们之前提到的initAndRegister方法内部:

    final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel();
                init(channel);
            } catch (Throwable t) {
                ......
            }
    
            ChannelFuture regFuture = config().group().register(channel);
            ......
        }
    

    我们已经讲过获取ServerSocketChannel和初始化通道这两个部分,接下来再看下面config().group().register(channel);内部是如何实现注册Selector的。
    由于EventLoopGroup是接口,在本例中实现类为NioEventLoop。

    EventLoopGroup接口继承关系
    NioEventLoop类实现的register方法:
    public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
            // 参数校验......
            try {
                ch.register(selector, interestOps, task);
            } catch (Exception e) {
                throw new EventLoopException("failed to register a channel", e);
            }
        }
    

    可以看出最终执行的SeverSocketChannel.register(selector,interestOps,task);方法,即还是执行了java NIO中通道注册到选择器selector的方法。

    端口绑定

    上面我们已经分析了AbstractBootstrap类里面doBind方法中initAndRegister方法的内部实现:创建ServerSocketChannel通道,初始化,注册通道到选择器selector上这三步。下面再继续分析doBind方法中后面关于端口绑定的实现。
    从下面代码中的doBind0方法开始解析:

    private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            if (regFuture.isDone()) {
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
                .......
    }
    
    private static void doBind0(
          final ChannelFuture regFuture, final Channel channel,
          final SocketAddress localAddress, final ChannelPromise promise) {
                channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }
    

    从上面源码中可以看出,在doBind0方法内部执行了ServerSocketChannel的bind方法,即进行端口的绑定操作。

    相关文章

      网友评论

          本文标题:Netty学习笔记(一)组件

          本文链接:https://www.haomeiwen.com/subject/zpzptqtx.html