美文网首页深入浅出Netty源码剖析Java 杂谈
【第5篇】Netty的NioEventLoopGroup与Ser

【第5篇】Netty的NioEventLoopGroup与Ser

作者: 爱学习的蹭蹭 | 来源:发表于2019-05-16 12:23 被阅读3次

1、NioEventLoopGroup

  • NioEventLoopGroup属于一个死循环的线程组,是一个执行流程的入口进行分析

  • 当NioEventLoopGroup(1)的构造方法1表示指定一个线程来执行

  • NioEventLoopGroup事件循环中的发生,事件的连接建立,而且NioExcecutorGroup会注册到Channel

NioEventLoop、EventLoopGroup、NioEventLoopGroup关系图

  • NioEventLoop
  • EventLoopGroup
  • NioEventLoopGroup

2、 ChannelFuture与ChannelPromise

  • ChannelFuture将Channel注册到EventLoop中
  • ChannelPromise继承ChannelFuture,里面是包含了一个Channel的方法的引用(难点)

ChannelFuture与ChannelPromise关系图

  • ChannelFuture
  • ChannelPromise

3、SelectorProvider

  • SelectorProvider创建Selector选择器

4、 static关键字

  • 用于修饰方法或变量静态使用
  • static静态代码块在Class类被加载JVM之前会初始化,不用等待实例化才被使用

5、EventExecutor

  • EventExecutor继承EventExecutorGroup类,提供一些便捷的方法操作


    EventExecutor

6、Netty中的bossGroup为什么使用线程池的原因大家众所纷纭

the creator of Netty says multiple boss threads are useful if we share NioEventLoopGroup between different server bootstraps, but I don’t see the reason for it.
意思就是说:netty作者说:我们在不同的服务器引导之间共享NioEventLoopGroup,多个boss线程是有用的,但我没有看到它的

7、NioEventLoopGroup源码剖析笔录

  • NioEventLoopGroup构造方法
   //创建一个实例,使用默认线程数、默认ThreadFactory和SelectorProvider()返回的SelectorProvider.provider()创建一个新实例。
 public NioEventLoopGroup() {
      this(0);
 }
  • 指定线程数
// 创建一个实例,指定线程数、默认ThreadFactory和SelectorProvider()返回的SelectorProvider.provider()创建一个新实例。
 public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
 }

8、MultithreadEventLoopGroup源码剖析笔录

  • 处理CPU核心数
  //默认事件循环线程
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
        //根据可用进程的CPU进行相乘
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        //启动debug就打印log信息
        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
  }
  • MultithreadEventExecutorGroup的构造器:DefaultEventExecutorChooserFactory.INSTANCE 默认事件执行选择器工程实例
  /**
     * 创建一个实例
     * @param nThreads         该实例将使用的线程数
     * @param executor           将要使用的executor, 默认为null
     * @param args  参数将传递给每个newChild(Executor, Object…)调用
     */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
     this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
 }

/**
 * 最终的创建实例构造器
 *
 * @param nThreads          该实例将使用的线程数
 * @param executor          将要使用的executor, 默认为null
 * @param chooserFactory    将要使用的EventExecutorChooserFactory
 * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
 */
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    /** 1.初始化线程池 */
    //参数校验nThread合法性,
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }
    //executor校验非空, 如果为空就创建ThreadPerTaskExecutor, 该类实现了 Executor接口
    // 这个executor 是用来执行线程池中的所有的线程,也就是所有的NioEventLoop,其实从
    //NioEventLoop构造器中也可以知道,NioEventLoop构造器中都传入了executor这个参数。
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    //这里的children数组, 其实就是线程池的核心实现,线程池中就是通过指定的线程数组来实现线程池;
    //数组中每个元素其实就是一个EventLoop,EventLoop是EventExecutor的子接口。
    children = new EventExecutor[nThreads];

    //for循环实例化children数组,NioEventLoop对象
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            //newChild(executor, args) 函数在NioEventLoopGroup类中实现了, 
            // 实质就是就是存入了一个 NIOEventLoop类实例
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            //如果构造失败, 就清理资源
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }
                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }//end foreach

    /** 2.实例化线程工厂执行器选择器: 根据children获取选择器 */
    chooser = chooserFactory.newChooser(children);

    /** 3.为每个EventLoop线程添加 线程终止监听器*/
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    /** 4. 将children 添加到对应的set集合中去重, 表示只可读。*/
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

9、ServerBootstrap类的bind绑定端口的源码详解

  • bind绑定端口的方法详解
//1、从bind点击进去就跟踪到ChannelFuture 的bind方法
 ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8080)).sync();

//2、从bind方法的doBind看到处理SocketAddress 
  public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

//3、 doBind方法里面有initAndRegister初始化和注册方法可以跟踪到
private ChannelFuture doBind(final SocketAddress localAddress) {
       //初始化和注册ChannelFuture 
        final ChannelFuture regFuture = initAndRegister();
       // 从ChannelFuture 获取通道
        final Channel channel = regFuture.channel(); 
        if (regFuture.cause() != null) {
            return regFuture;
        }
 if (regFuture.isDone()) {
            // 此时,我们知道注册已经完成并成功。
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {   
        //但只是以防万一,注册的期望(未来)几乎总是已经完成了。
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            //这里是用到监听器进行检测ChannelFuture (管道期望完成状态结果)
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // EventLoop上的注册失败,因此通道承诺失败,直接导致
                        //一旦我们试图访问通道的EventLoop,就会出现IllegalStateException。
                        promise.setFailure(cause);
                    } else {
                        // 注册成功,因此设置要使用的正确执行程序。
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();//调用promise已经注册
                        //处理通道,ChannelFuture,SocketAddress,ChannelPromise
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
}

//4、从第3步doBind方法里面的initAndRegister跟踪到
final ChannelFuture initAndRegister() {
     ChannelFuture regFuture = config().group().register(channel);
}

// `此config()方法是Bootstrap类的config() ,而config() .group()是返回 AbstractBootstrapConfig#EventLoopGroup的对象`
//从 config().group().register(channel);此时跟踪到EventLoopGroup这个类
public final BootstrapConfig config() {
        return config;
}

  • 从 config().group().register(channel);此时跟踪到EventLoopGroup这个类,从 EventLoopGroup看出ChannelFuture register(Channel channel);
  • EventLoopGroup
EmbeddedEventLoop
MultithreadEventLoopGroup
SingleThreadEventLoop
ThreadPerChannelEventLoopGroup
  • 再跟踪到EmbeddedEventLoop类,的register(Channel channel) 这个方法,然后再跟踪到register
//跟到注册
 @Override
    public ChannelFuture register(Channel channel) {
        //此返回的register是下面的代码的 ChannelFuture register(ChannelPromise promise)
        return register(new DefaultChannelPromise(channel, this));
    }

   @Override
    public ChannelFuture register(ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

*此行代码 promise.channel().unsafe().register(this, promise);跟踪到如图

  • unsafe
  • AbstractChannel -->AbstractUnsafe#register 截图


    AbstractUnsafe的register 方法
  • AbstractChannel -->AbstractUnsafe#register代码

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 AbstractChannel.this.eventLoop = eventLoop;
 //判断事件循环是否在AbstractEventExecutor事件执行器里面
if (eventLoop.inEventLoop()) {
   //进行注册ChannelPromise(管道期望)
    register0(promise);
} else {
   try {
     //第一次调用事件循环执行eventLoop.execute()
      eventLoop.execute(new Runnable() {
      @Override
        public void run() {
               register0(promise);
             }
        });
   } catch (Throwable t) {
    logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);
      closeForcibly();
      closeFuture.setClosed();
      safeSetFailure(promise, t);
                }
            }
        }

相关文章

网友评论

    本文标题:【第5篇】Netty的NioEventLoopGroup与Ser

    本文链接:https://www.haomeiwen.com/subject/csnmaqtx.html