美文网首页面试netty
Netty理论三:Netty线程模型

Netty理论三:Netty线程模型

作者: 雪飘千里 | 来源:发表于2018-12-11 15:00 被阅读35次

    1、Reactor模式:NIO网络框架的典型模式

    Reactor是网络编程中的一种设计模式,reactor会解耦并发请求的服务并分发给对应的事件处理器来处理。目前,许多流行的开源框架都用到了reactor模式,如:netty、node.js、Cindy等,包括java的nio。

    何为Reactor线程模型?

    Reactor模式是事件驱动的,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers;这个Service Handler会同步的将输入的请求(Event)多路复用的分发给相应的Request Handler

    image.png

    Reactor模式的三种形式
    1、单线程Reactor 模式:

    image.png

    这种实现方式,和第一章java NIO中单线程NIO实现是一样的,一个Reactor处理所有的事情。

    image.png

    2、多线程 Reactor 模式:
    编解码及业务处理使用线程池,这样的话,可以避免IO阻塞(IO阻塞的代价是非常大的)。

    image.png image.png

    3、多Reactors 模式:
    把Reactor分为两个,一个负责接收,一个负责读写,业务处理还是用线程池(也可以选择不用线程池,这个看具体业务需求)

    image.png image.png

    2、Netty中如何使用Reactor模式

    • 单线程Reactor 模式
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,bossGroup )
    
    • 多线程 Reactor 模式
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,bossGroup)
    //Handler使用线程池进行处理
    
    • 多Reactors 模式
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,workerGroup )
    

    注:new NioEventLoopGroup()默认创建cpu核数*2的线程数

    3、Netty EventLoop源码解析

    1、NioEventLoopGroup整体结构

    image.png

    EventExecutorGroup视图

    image.png

    new NioEventLoopGroup源码

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
            //EventExecutorGroup里面有一个EventExecutor数组,保存了多个EventExecutor;
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    //初始化EventExecutor数组,数组是NioEventLoop,见下面
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            //EventExecutorChooser.next()定义选择EventExecutor的策略;
            chooser = chooserFactory.newChooser(children);
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    
        @Override
        public EventExecutor next() {
            return chooser.next();
        }
    

    NioEventLoopGroup.class

        @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
        }
    

    NioEventLoop.class

        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
            provider = selectorProvider;
            final SelectorTuple selectorTuple = openSelector();
            //创建selector
            selector = selectorTuple.selector;
            unwrappedSelector = selectorTuple.unwrappedSelector;
            selectStrategy = strategy;
        }
    
    1. EventExecutorGroup里面有一个EventExecutor数组,保存了多个EventExecutor(NIOEventLoop);
    2. EventExecutorGroup是不干什么事情的,当收到一个请后,他就调用next()获得一个它里面的EventExecutor,再调用这个executor的方法;
    3. EventExecutorChooserFactory.EventExecutorChooser.next()定义选择EventExecutor的策略(有两种,都是轮询);

    2、NioEventLoopGroup创建分析

    bossGroup

    image.png

    workerGrop

    image.png

    3、ServerBootstrap启动流程分析

    image.png

    4、ServerBootstrap执行流程分析

    image.png
            // 配置服务端的NIO线程组
            // 主线程组, 用于接受客户端的连接,但是不做任何具体业务处理,像老板一样,
            //负责接待客户,不具体服务客户
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            // 工作线程组, 老板线程组会把任务丢给他,让手下线程组去做任务,服务客户
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap(); // (2)
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class) // (3)
                 //欲加到NioServerSocketChannel Pipeline的handler
                 .handler(new LoggingHandler(LogLevel.INFO))
                  //欲加到NioSocketChannel(accept()返回的)Pipeline的handler
                 .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
                         ch.pipeline().addLast("decoder", new StringDecoder());
                         ch.pipeline().addLast("encoder", new StringEncoder());
                         ch.pipeline().addLast(new EchoServerHandler());
     
                     }
                 })
                 .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                 .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
                // 绑定端口,开始接收进来的连接
                ChannelFuture f = b.bind(port).sync(); // (7)
    
    image.png

    相关文章

      网友评论

        本文标题:Netty理论三:Netty线程模型

        本文链接:https://www.haomeiwen.com/subject/ogbxcqtx.html