美文网首页
Netty 模型

Netty 模型

作者: 笨比乔治 | 来源:发表于2020-12-18 15:59 被阅读0次

5.8.1工作原理示意图 1-简单版

Netty 主要基于主从 Reactors 多线程模型(如图)做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor


image.png

5.8.2对上图说明

  1. BossGroup 线程维护 Selector , 只关注 Accecpt
  2. 当接收到 Accept 事件,获取到对应的 SocketChannel, 封装成 NIOScoketChannel 并注册到 Worker 线程(事件循 环), 并进行维护
  3. 当 Worker 线程监听到 selector 中通道发生自己感兴趣的事件后,就进行处理(就由 handler), 注意 handler 已 经加入到通道

5.8.3工作原理示意图 2-进阶版

image.png

5.8.4工作原理示意图-详细版

image.png

5.8.5对上图的说明小结

  1. Netty 抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写
  2. BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup
  3. NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 ,每一个事件循环是 NioEventLoop
  4. NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个 NioEventLoop 都有一个 selector , 用于监听绑 定在其上的 socket 的网络通讯
  5. NioEventLoopGroup 可以有多个线程, 即可以含有多个 NioEventLoop
  6. 每个 Boss NioEventLoop 循环执行的步骤有 3 步
     轮询 accept 事件
     处理 accept 事件 , 与 client 建立连接 , 生成 NioScocketChannel , 并将其注册到某个 worker NIOEventLoop 上 的 selector
     处理任务队列的任务 , 即 runAllTasks
  7. 每个 Worker NIOEventLoop 循环执行的步骤
     轮询 read, write 事件
     处理 i/o 事件, 即 read , write 事件,在对应 NioScocketChannel 处理
     处理任务队列的任务 , 即 runAllTasks
  8. 每个Worker NIOEventLoop 处理业务时,会使用pipeline(管道), pipeline 中包含了 channel , 即通过pipeline 可以获取到对应通道, 管道中维护了很多的 处理器

5.8.6Netty 快速入门实例-TCP 服务

实例要求:使用 IDEA 创建 Netty 项目

  1. Netty 服务器在 6668 端口监听,客户端能发送消息给服务器 "hello, 服务器~"
  2. 服务器可以回复消息给客户端 "hello, 客户端~"
  3. 目的:对 Netty 线程模型 有一个初步认识, 便于理解 Netty 模型理论
  4. 看老师代码演示
    5.1 编写服务端 5.2 编写客户端 5.3 对 netty 程序进行分析,看看 netty 模型特点 说明: 创建 Maven 项目,并引入 Netty 包
  <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.43.Final</version>
        </dependency>

NettyServer

package com.qiz.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @author qiz
 */
public class NettyServer {
    public static void main(String[] args)  {

        //创建BossGroup 和 WorkGroup
        //创建两个线程组
        //boosGroup只是处理连接请求,真正和客户端业务处理的是workGroup
        //两个都是无限循环
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        //创建服务器端的启动对象,配置参数
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程来进行设置
            bootstrap.group(bossGroup,workGroup)//设置两个线程组
                    .channel(NioServerSocketChannel.class)//使用NioSocketChannel 作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG,128)//设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道测试对象(匿名对象)
                        //给pipeline 设置处理器
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });//给我们的workGroup 的 EventLoop 对应的管道记录处理器
            System.out.println("..........服务器 is ready");
            //绑定一个端口并且同步,生成了一个ChannelFuture对象
            //启动服务器(并绑定端口)
            ChannelFuture cf = bootstrap.bind(6668).sync();
            //对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

NettyServerHandler

package com.qiz.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author qiz
 * 自定义一个handler需要继承netty 规定好的某个HandlerAdapter
 * 这时我们自定义一个Handler,才能称为一个handler
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    //读取数据完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //将数据写入到缓冲,并刷新
        //一般讲,我们对这个发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8));
    }

    //处理异常,一般是需要关闭通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    //读取数据事件(这里我们可以读取客户端发送的消息)
    //上下文对象,含有管道pipeline,通道 地址
    //msg 客户端发送的数据 默认object
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {


        System.out.println("server ctx =" + ctx);
        //将 msg 转成一个ByteBuf
        //ByteBuf 是 Netty 提供的,不是NIO 的ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println(""+buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:"+ctx.channel().remoteAddress());

    }
}

NettyClient

package com.qiz.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author qiz
 */
public class NettyClient {
    public static void main(String[] args) {

        //客户端需要一个事件循环组
        EventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            //创建客户端启动对象
            //客户端使用BootStrap  服务端使用ServerBootStrap
            Bootstrap bootstrap = new Bootstrap();

            //设置相关参数
            bootstrap.group(eventExecutors) //设置线程组
                    .channel(NioSocketChannel.class) //设置客户端通道的实现类(反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });

            System.out.println("客户端 ok..");

            //启动客户端去连接服务器端
            //关于ChannelFuture 要分析,涉及到netty的异步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            //给关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
          //
        }finally {
            eventExecutors.shutdownGracefully();
        }

    }
}

NettyClientHandler

package com.qiz.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author qiz
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    //当通道就绪就会触发该方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client" + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server:<>", CharsetUtil.UTF_8));
    }

    //当通道有读取事件时会触发
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("服务器回复的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器的地址" +ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

5.8.7任务队列中的 Task 有 3 种典型使用场景

  1. 用户程序自定义的普通任务 [举例说明]
  2. 用户自定义定时任务
  3. 非当前 Reactor 线程调用 Channel 的各种方法 例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到任务队列中后被异步消费

解决方案 1 用户程序自定义的普通任务 阻塞的线程会后执行,先往下执行

ctx.channel().eventLoop().execute(new Runnable() { 
 @Override 
 public void run() { 
 try {
  Thread.sleep(5 * 1000); 
 ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵 2", 
 CharsetUtil.UTF_8)); 
 System.out.println("channel code=" + ctx.channel().hashCode()); 
}
 catch (Exception ex) { 
System.out.println("发生异常" + ex.getMessage()); 
} } }); 

ctx.channel().eventLoop().execute(new Runnable() { 
@Override 
public void run() {
try {Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵 3", CharsetUtil.UTF_8)); 
System.out.println("channel code=" + ctx.channel().hashCode()); 
} catch (Exception ex) { 
System.out.println("发生异常" + ex.getMessage());
 } } });
IMG_2238.PNG

解决方案 2 : 用户自定义定时任务 -》 该任务是提交到 scheduledTaskQueue 中

ctx.channel().eventLoop().schedule(new Runnable() { 
@Override public void run() { 
try {Thread.sleep(5 * 1000); 
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵 4", CharsetUtil.UTF_8)); 
System.out.println("channel code=" + ctx.channel().hashCode()); 
} catch (Exception ex) { 
System.out.println("发生异常" + ex.getMessage()); 
} } }, 5, TimeUnit.SECONDS);

5.8.8方案再说明

  1. Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。
  2. NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定 在其上的 socket 网络通道。
  3. NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责
     NioEventLoopGroup 下包含多个 NioEventLoop
     每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
     每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel
     每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
     每个 NioChannel 都绑定有一个自己的 ChannelPipeline

相关文章

网友评论

      本文标题:Netty 模型

      本文链接:https://www.haomeiwen.com/subject/botziktx.html