美文网首页小白架构师之路
Nety线程模型2(Accept篇)

Nety线程模型2(Accept篇)

作者: linking12 | 来源:发表于2016-08-22 17:49 被阅读239次

概述

Netty的线程模型没什么出彩的地方,旧瓶装新酒,其就是基于Reactor模式

Reactor模式结构

首先用一张许多人都看过的图来开始说明Reactor模式

Netty Reactor.jpeg

这张图估计在许多博客和帖子都会看到,但是许多博客却没有详细说明及解释这张图在netty的架构上反应出来

  • Reactor模式角色定义
    1:MainReactor: 负责响应client的连接请求,并建立连接,可以有1个或者多个,维护一个独立的NIO Selector
    2:SubReactor: 负责对client的读写请求进行处理,可以1个或者多个,并也维护一个独立的NIO Selector
    3:Acceptor: 负责MainReactor和SubReactor的桥梁左右,已经准备好的连接转发到SubReactor中进行处理

  • Netty中Reactor模式角色定义
    1:MainReactorEventLoopGroup,在Netty4以前叫做BossGroup;
    2:SubReactor:EventLoopGroup, 在Netty4以前叫做WorkGroup;
    3:Acceptor:ServerBootstrapAcceptor:一个系统自带的ChannelInboundHandler事件拦截器,真正的将已准备好的Channel注册到SubReactor中;

SubReactor:EventLoopGroup

subReactor的任务比较简单,接收Acceptor的Channel,后将Channel重新进行注册,并触发自定义的Handler来处理逻辑
1)接收Acceptor传递过来的Channel通道
2)注册到相应的selector。

private void register0(ChannelPromise promise) {
            try {
                doRegister();
                registered = true;
                promise.setSuccess();
                pipeline.fireChannelRegistered();
                if (isActive()) {
                    pipeline.fireChannelActive();
                }
            } catch (Throwable t) {
            }
        }

3)调用eventLoop.execute用以执行注册任务
4)启动子线程。即启动了subReactor

注意:pipeline.fireChannelRegistered()触发的事件,其实现原理就是初始化不同的ChannelInitializer对象,对不同类型的Channel添加不同的拦截处理

  • 启动时,Channel是NioServerSocketChannel,调用的是ServerBootstrapAcceptor
  • 连接时,Channel是NioSocketChannel,调用的是用户自定义的InboundHandler

例如:

 ServerBootstrap.childHandler(new GearmanServerInitializer())//此用户自定义的的ChannelInitializer

public class GearmanServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        pipeline.addLast("decoder", new Decoder());
        pipeline.addLast("encoder", new Encoder());
        pipeline.addLast("handler", new PacketHandler(networkManager));
    }
}
Acceptor:ServerBootstrapAcceptor
public void channelRead(ChannelHandlerContext ctx, Object msg) {
            Channel child = (Channel) msg;
            child.pipeline().addLast(childHandler);
            // ...... 省略无关代码
            try {
                   childGroup.register(child);
            } catch (Throwable t) {
              // ......省略无关代码
            }
        }

说明:
当Channel已经Ready后,就会ServerBootstrapAcceptor#channelRead
1:首先把用户自定义的handler注册到pipleline中
2:将已准备好的Channel与childGroup,触发点就是childGroup.register(child);

如果看了上篇文章,会发现两者都是存在注册通道的原理,其实是不同的

  1. 在server启动时,通过回调bind的监听会把Selector注册事件改为electionKey.OP_ACCEPT
  2. 而当有连接进来的时候,通过重新注册又把Selector注册事件改为了0
    在这一点有点劳民伤财的味道。(其实不是,在下面会专门有注意点提到这点)
MainReactor:EventLoopGroup

如果看了上篇文章,应该知道在server启动时,会启动MainReactor,一直循环执行IO任务和非IO任务

 protected void run() {
        for (;;) {
            oldWakenUp = wakenUp.getAndSet(false);
            try {
                if (hasTasks()) {
                    selectNow();
                } else {
                    select();
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                }
                cancelledKeys = 0;
                final long ioStartTime = System.nanoTime();
                needsToSelectAgain = false;
                if (selectedKeys != null) {
                    processSelectedKeysOptimized(selectedKeys.flip());
                } else {
                    processSelectedKeysPlain(selector.selectedKeys());
                }
                final long ioTime = System.nanoTime() - ioStartTime;
                final int ioRatio = this.ioRatio;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                //..........省略无关代码
            } catch (Throwable t) {
               //...........省略无关代码
            }
        }
    }

在上篇文章主要提的是runAllTasks这个方法,主要执行非IO任务
这里主要是来说明下IO任务,selectedKeys不为空
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
执行processSelectedKeysOptimized(selectedKeys.flip());

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            final Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }

注意的地方就是 final Object a = k.attachment();这个attachment是从哪里来的,看如下selectionKey =javaChannel().register(eventLoop().selector, 0, this);

  protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
               
            }
        }
    }
  • 对于当server启动时,由于该当前对象是NioServerSocketChannel
  • 对于当连接进来是,由于当前对象是NioSocketChannel

在注册整个Selector选择器的时候,把当前通道(Channel)也注册进去了,上面那个劳民伤财其实是句玩笑,在这里体现出两次注册的用意来了

继续以上processSelectedKeysOptimized,其中processSelectedKey就是处理网络Io事件,把该事件发给Acceptor的主要触发点,而有点要

代码如下:

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    
        int readyOps = -1;
        try {
            readyOps = k.readyOps();
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    return;
                }
            }
            //........省略无关代码
        } catch (CancelledKeyException e) {
           //.......省略无关代码
        }
    }

而组装channel,bytebuffer等网络数据是在NioMessageUnsafe#read()中

        public void read() {
            //... 省略无关代码
            try {
                for (;;) {
                    int localRead = doReadMessages(readBuf);
                    // .......省略无关代码
                }
            } catch (Throwable t) {
            }
            for (int i = 0; i < readBuf.size(); i ++) {
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            pipeline.fireChannelReadComplete();
             // ..... 省略无关代码
        }
    }
  • doReadMessages(readBuf);
    把当前请求连接封装成一个Channel(其实就是NioSocketChannel)
  • pipeline.fireChannelRead(readBuf.get(i));
    通知Acceptor来读取,其实就是通知ServerBootstrapAcceptor#channelRead

相关文章

  • Nety线程模型2(Accept篇)

    概述 Netty的线程模型没什么出彩的地方,旧瓶装新酒,其就是基于Reactor模式 Reactor模式结构 首先...

  • Tomcat NIO 线程模型分析

    Tomcat7线程模型 tomcat 的nio 线程模型也是reactor 模型,由accept 线程负责接受连接...

  • 常见并发模型对比

    并发模型【UNP】对应多进程多线程阻塞IOIO复用长连接并发性多核开销互通顺序性线程数特点accept+read/...

  • web中nio的理解

    传统的bio线程模型是用户发起请求,accept得到一个socket,然后从业务线程池取出一个线程去处理该用户的请...

  • Reactor 学习随笔

    三种Reactor模型理解 第一种 单Reactor单线程 selector除了接受OP_ACCEPT事件之外,还...

  • 手写简易版Netty Reactor

    BossGroup 线程组只负责Accept, WorkerGroup线程组只负责R/W。workerGroup...

  • 线程之同步

    t1和t2 异步编程模型:t1线程执行t1的,t2线程执行t2的同步编程模型:t1线程和t2线程执行,当t1线程必...

  • RxJava 并行操作

    上一篇文章RxJava 线程模型分析详细介绍了RxJava的线程模型,被观察者(Observable、Flowab...

  • 深入浅出Netty read

    boss线程主要负责监听并处理accept事件,将socketChannel注册到work线程的selector,...

  • Reactor线程模型及其在Netty中的应用

    什么是Reactor线程模型 Java中线程模型大致可以分为: 单线程模型 多线程模型 线程池模型(executo...

网友评论

    本文标题:Nety线程模型2(Accept篇)

    本文链接:https://www.haomeiwen.com/subject/axfgsttx.html