美文网首页
Netty初识

Netty初识

作者: bingoc | 来源:发表于2016-04-18 22:32 被阅读487次

    话不多说直接从代码入手

    服务器端

    NioEventLoopGroup group = new NioEventLoopGroup();//1
    try {    
      ServerBootstrap bootstrap = new ServerBootstrap();//2
      bootstrap.group(group,group)            //3
               .channel(NioServerSocketChannel.class) //4           
                .localAddress(new InetSocketAddress(port)) //5           
                .childHandler(new ChannelInitializer<SocketChannel>() {//6                
                      @Override                
                      protected void initChannel(SocketChannel socketChannel) throws Exception {                    
                          socketChannel.pipeline().addLast(new EchoServerHandler());                
                      }           
                     });   
       ChannelFuture channelFuture = bootstrap.bind().sync();    //7
       System.out.println(EchoServer.class.getName() + "started and listen on " + channelFuture);    
        channelFuture.channel().closeFuture().sync();
    } finally {    
          group.shutdownGracefully().sync();//8
    }
    

    以上是用netty实现最简单的服务器程序的一段代码,相比于用NIO实现相应功能,这样的代码不能再简洁了。
    下面来分析一下这几句代码各自封装了什么功能:

    1、此处声明了一个EventLoopGroup,顾名思义就是一个eventloop。

    查看其继承关系EventLoopGroup->MultithreadEventLoopGroup ->MultithreadEventExecutorGroup
    MultithreadEventExecutorGroup中有以下几个属性

    private final EventExecutor[] children;
    private final Set<EventExecutor> readonlyChildren;
    private final AtomicInteger childIndex = new AtomicInteger();
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    private final EventExecutorChooser chooser;
    

    其主要属性即为EventExecutor[] children;
    定位到其创建的地方

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {    
    super(parent);    
    if (executor == null) {       
     throw new NullPointerException("executor");    
    }    
    this.addTaskWakesUp = addTaskWakesUp;    
    this.executor = executor;    
    taskQueue = newTaskQueue();
    }
    

    这里管理了一个taskQueue用于保存将要执行的任务。并在线程中不断poll队列中的task并执行。

    protected Runnable takeTask() {    
    assert inEventLoop();    
    if (!(taskQueue instanceof BlockingQueue)) {        
    throw new UnsupportedOperationException();    
    }    
    BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;   
     for (;;) {        
    ScheduledFutureTask<?> scheduledTask = peekScheduledTask();        
    if (scheduledTask == null) {            
    Runnable task = null;           
     try {              
      task = taskQueue.take();                
    if (task == WAKEUP_TASK) {                    
    task = null;               
     }           
     } catch (InterruptedException e) {                
    // Ignore         
       }        
        return task;       
     } else {          
      long delayNanos = scheduledTask.delayNanos();          
      Runnable task = null;          
      if (delayNanos > 0) {                
    try {                   
     task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);          
          } catch (InterruptedException e) {                    // Waken up.            
            return null;             
       }        
     }          
      if (task == null) {                // We need to fetch the scheduled tasks now as otherwise there may be a chance that                // scheduled tasks are never executed if there is always one task in the taskQueue.                // This is for example true for the read task of OIO Transport                // See https://github.com/netty/netty/issues/1614                fetchFromScheduledTaskQueue();               
     task = taskQueue.poll();           
     }           
     if (task != null) {                
    return task;           
     }       
     }    
    }}
    

    总而言之,Netty中的EventLoopGroup就是建立了一个EventLoop数组。并在其中不断处理新的事务,其中包括selector的轮询操作和一些用户自定义的Task。

    2、此处申明了一个Bootstrap

    查看其代码,发现其只有两个构造函数,一个无参构造函数,一个复制构造函数。

    《Netty权威指南》中对其解释如下:
    其根本原因为它的参数太多了,而且未来也可能会发生变化,为了解决这个问题,就需要引入Builder模式。

    这个类是Netty的一个辅助类,提供方法设置启动相关的参数。

    3、绑定group

    从下面的代码可以看出应该有两个group来完成Server端,parent负责acceptor,child作为client,而当其只传一个group时,这个group需要完成两件事情。

      /**
         * Specify the {@link EventLoopGroup} which is used for the parent (acceptor) and the child (client).
         */
        @Override
        public ServerBootstrap group(EventLoopGroup group) {
            return group(group, group);
        }
    
        /**
         * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
         * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
         * {@link Channel}'s.
         */
        public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
            super.group(parentGroup);
            if (childGroup == null) {
                throw new NullPointerException("childGroup");
            }
            if (this.childGroup != null) {
                throw new IllegalStateException("childGroup set already");
            }
            this.childGroup = childGroup;
            return this;
        }
    
    

    4,5、绑定相应的channel,并为其绑定端口

    这里通过反射的工厂方法建立了一个NIOServerSocketChannel

        /**
         * The {@link Class} which is used to create {@link Channel} instances from.
         * You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
         * {@link Channel} implementation has no no-args constructor.
         */
        public B channel(Class<? extends C> channelClass) {
            if (channelClass == null) {
                throw new NullPointerException("channelClass");
            }
            return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
        }
    
    

    6、设置处理handle,在pipeline中添加相应的回调函数

        private class EchoServerHandler extends ChannelHandlerAdapter {
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                super.exceptionCaught(ctx, cause);
            }
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                super.channelRead(ctx, msg);
            }
    
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                super.channelReadComplete(ctx);
            }
        }
    

    7、最后一步绑定本地端口,启动服务

     private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();//用工厂方法创建Channel
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            if (regFuture.isDone()) {
                // At this point we know that the registration was complete and successful.
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                        } else {
                            // Registration was successful, so set the correct executor to use.
                            // See https://github.com/netty/netty/issues/2586
                            promise.executor = channel.eventLoop();
                        }
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                });
                return promise;
            }
        }
    

    在newChannel后对channel进行初始化,这个方法由ServerBootstrap实现,代码如下

     @Override
        void init(Channel channel) throws Exception {
            final Map<ChannelOption<?>, Object> options = options();
            synchronized (options) {
                channel.config().setOptions(options);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs();
            synchronized (attrs) {
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();//设置socket相应的属性
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();//将AbstractBootstrap的handler添加到NioServerSocektChannel的ChannelPipeline中
            if (handler() != null) {
                p.addLast(handler());
            }
    
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions;
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
            synchronized (childOptions) {
                currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
            }
            synchronized (childAttrs) {
                currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
            }
    
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));//将注册的ServerBootstrapAcceptor注册到pipeline中
                }
            });
        }
    

    相关文章

      网友评论

          本文标题:Netty初识

          本文链接:https://www.haomeiwen.com/subject/gxualttx.html