美文网首页
Netty基础(未完待续)

Netty基础(未完待续)

作者: 笔记本一号 | 来源:发表于2020-09-22 00:47 被阅读0次

不多BB,我们开门见山
Netty是一个提供了易于使用的API的NIO框架,具有高并发、高性能的特点

Netty的Reactor线程模型

https://blog.csdn.net/bingxuesiyang/article/details/89888664

单线程模型

所有的IO操作都是由一个NIO线程处理。

Reactor内部通过selector 监控连接事件,收到事件后通过dispatch进行分发,如果是连接建立的事件,则由Acceptor处理,Acceptor通过accept接受连接,并创建一个Handler来处理连接后续的各种事件,如果是读写事件,直接调用连接对应的Handler来处理。
Handler完成read->业务处理(decode->compute->encode)->send的全部流程。
这种模型好处是简单,坏处却很明显,当某个Handler阻塞时,会导致其他客户端的handler和accpetor都得不到执行,无法做到高性能,只适用于业务处理非常快速的场景。

缺点:单线程NIO负载过重,并发高时产生任务堆积,延迟过高,不适合并发高的场景

多线程模型

由一个NIO线程处理客户端连接,由一组NIO线程池处理IO操作

主线程中,Reactor对象通过selector监控连接事件,收到事件后通过dispatch进行分发,如果是连接建立事件,则由Acceptor处理,Acceptor通过accept接收连接,并创建一个Handler来处理后续事件,而Handler只负责响应事件,不进行业务操作,也就是只进行read读取数据和write写出数据,业务处理交给一个线程池进行处理。
线程池分配一个线程完成真正的业务处理,然后将响应结果交给主进程的Handler处理,Handler将结果send给client。
单Reactor承当所有事件的监听和响应,而当我们的服务端遇到大量的客户端同时进行连接,或者在请求连接时执行一些耗时操作,比如身份认证,权限检查等,这种瞬时的高并发就容易成为性能瓶颈。

缺点:连接处理能力有限,客户连接并发高时,会产生连接延迟过高

主从线程模型

一组NIO线程池处理连接,一组NIO线程池处理IO操作

存在多个Reactor,每个Reactor都有自己的selector选择器,线程和dispatch。
主线程中的mainReactor通过自己的selector监控连接建立事件,收到事件后通过Accpetor接收,将新的连接分配给某个子线程。
子线程中的subReactor将mainReactor分配的连接加入连接队列中通过自己的selector进行监听,并创建一个Handler用于处理后续事件
Handler完成read->业务处理->send的完整业务流程。

缺点:无

Netty的核心概念

EventLoop和EventLoopGroup

EventLoopGroup是一个线程组,包含了一组NIO线程,主要用于处理网络事件,例如客户端连接,Channel的事件。每个EventLoop负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个EventLoop。EventLoopGroup实际上就是Reactor线程组,Netty 的 IO 线程 NioEventLoop 由于聚合了多路复用器 Selector,一组EventLoopGroup可以同时并发处理成百上千个客户端连接,另一组EventLoopGroup可以处理客户端的通道事件

ChannelHandler和ChannelPipeline

ChannelHandler是一个接口,专门处理 I/O 或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。ChannelPipeline是ChannelHandler的容器,而每个Channel都会有一个ChannelPipeline,负责ChannelHandler的管理和事件拦截与调度。Channel会将事件扔到ChannelPipeline中,然后事件会被ChannelPipeline安排一系列的ChannelHandler拦截处理,例如编解码事件、TCP的粘包拆包事件、用户自定义Handler等,经过一系列加工后,事件的消息会被添加缓冲区中等待Channel的刷新和发送。

事件出站(Outbound)和入站(Inbound)

  • Inbound事件:通常由I/O线程触发,例如TCP链路连接、链路关闭、读事件、异常通知等,它们对应下图的左半部分,这些方法被封装在ChannelInboundHandler里面
  • Outbound事件:通常是用户主动发起的网络IO操作,例如发起连接操作、绑定操作、消息发送等,它们对应下图的右半部分,这些方法被封装在ChannelOutboundHandler里面

ChannelPipeline为ChannelHandler链提供了一个容器并定义了用于沿着链传播入站和出站事件流的API,当一个数据流进入 ChannlePipeline 时,它会从 ChannelPipeline 头部开始传给第一个 ChannelInboundHandler ,当第一个处理完后再传给下一个,一直传递到管道的尾部。
与之相对应的是,当数据被写出时,它会从管道的尾部开始,先经过管道尾部的 “最后” 一个ChannelOutboundHandler,当它处理完成后会传递给前一个 ChannelOutboundHandler 。

事件出站

ChannelHandlerContext

保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。I/O 事件由 ChannelInboundHandler 或 ChannelOutboundHandler 处理,并通过调用 ChannelHandlerContext 中定义的事件传播方法。

一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰。

ServerBootstrap和Bootstrap

ServerBootstrap是服务端的启动助手,Bootstrap是客户端的启动助手,它们的目的主要是降低开发的复杂度

TCP的粘包和拆包

在TCP协议中一个完整的数据包可能会被TCP拆分为多个包发送,或者将多个小的数据包封装成大的数据包发送,这就会发生TCP的粘包和拆包的问题。
产生原因:

  • 要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。
  • 待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。
  • 要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包。
  • 接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。
TCP的粘包和拆包的解决方法

粘包问题解决

  • FixedLengthFrameDecoder:定长协议解码器,我们可以指定固定的字节数算一个完整的报文
  • LineBasedFrameDecoder:行分隔符解码器,遇到\n或者\r\n,则认为是一个完整的报文
  • DelimiterBasedFrameDecoder:分隔符解码器,与LineBasedFrameDecoder类似,只不过分隔符可以自己指定
  • LengthFieldBasedFrameDecoder:长度编码解码器,将报文划分为报文头/报文体,根据报文头中的Length字段确定报文体的长度,因此报文提的长度是可变的
  • JsonObjectDecoder:json格式解码器,当检测到匹配数量的"{" 、”}”或”[””]”时,则认为是一个完整的json对象或者json数组。

Netty的默认线程数

Netty 默认是 CPU 处理器数的两倍,bind 完之后启动。

Netty高性能的原因

  • IO 线程模型:同步非阻塞,用最少的资源做更多的事。
  • 内存零拷贝:尽量减少不必要的内存拷贝,实现了更高效率的传输。
  • 内存池设计:申请的内存可以重用,主要指直接内存。内部实现是用一颗二叉查找树管理内存分配情况。
  • 串形化处理读写:避免使用锁带来的性能开销。
  • 高性能序列化协议:支持 protobuf 等高性能序列化协议。

Netty的序列化协议:

  • XML,优点:人机可读性好,可指定元素或特性的名称。缺点:序列化数据只包含数据本身以及类的结构,不包括类型标识和程序集信息;只能序列化公共属性和字段;不能序列化方法;文件庞大,文件格式复杂,传输占带宽。适用场景:当做配置文件存储数据,实时数据转换。

  • JSON,是一种轻量级的数据交换格式,优点:兼容性高、数据格式比较简单,易于读写、序列化后数据较小,可扩展性好,兼容性好、与XML相比,其协议比较简单,解析速度比较快。缺点:数据的描述性比XML差、不适合性能要求为ms级别的情况、额外空间开销比较大。适用场景(可替代XML):跨防火墙访问、可调式性要求高、基于Web browser的Ajax请求、传输数据量相对小,实时性要求相对低(例如秒级别)的服务。

  • Fastjson,采用一种“假定有序快速匹配”的算法。优点:接口简单易用、目前java语言中最快的json库。缺点:过于注重快,而偏离了“标准”及功能性、代码质量不高,文档不全。适用场景:协议交互、Web输出、Android客户端

  • Thrift,不仅是序列化协议,还是一个RPC框架。优点:序列化后的体积小, 速度快、支持多种语言和丰富的数据类型、对于数据字段的增删具有较强的兼容性、支持二进制压缩编码。缺点:使用者较少、跨防火墙访问时,不安全、不具有可读性,调试代码时相对困难、不能与其他传输层协议共同使用(例如HTTP)、无法支持向持久层直接读写数据,即不适合做数据持久化序列化协议。适用场景:分布式系统的RPC解决方案

  • Avro,Hadoop的一个子项目,解决了JSON的冗长和没有IDL的问题。优点:支持丰富的数据类型、简单的动态语言结合功能、具有自我描述属性、提高了数据解析速度、快速可压缩的二进制数据形式、可以实现远程过程调用RPC、支持跨编程语言实现。缺点:对于习惯于静态类型语言的用户不直观。适用场景:在Hadoop中做Hive、Pig和MapReduce的持久化数据格式。

  • Protobuf,将数据结构以.proto文件进行描述,通过代码生成工具可以生成对应数据结构的-POJO对象和Protobuf相关的方法和属性。优点:序列化后码流小,性能高、结构化数据存储格式(XML JSON等)、通过标识字段的顺序,可以实现协议的前向兼容、结构化的文档更容易管理和维护。缺点:需要依赖于工具生成代码、支持的语言相对较少,官方只支持Java 、C++ 、python。适用场景:对性能要求高的RPC调用、具有良好的跨防火墙的访问属性、适合应用层对象的持久化

  • protostuff 基于protobuf协议,但不需要配置proto文件,直接导包即可

  • Jboss marshaling 可以直接序列化java类, 无须实java.io.Serializable接口

  • MessagePack 一个高效的二进制序列化格式

  • Hessian 采用二进制协议的轻量级remoting onhttp工具

  • kryo 基于protobuf协议,只支持java语言,需要注册(Registration),然后序列化(Output),反序列化(Input)

Netty实战:1、利用Netty实现的聊天室 2、利用Netty实现Http协议 3、利用Netty实现Websocket协议

1、利用Netty实现的聊天室:

Sever端

public class ChatServer {
    private int port;

    public ChatServer(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup boss=new NioEventLoopGroup();
        EventLoopGroup works=new NioEventLoopGroup();
        try {
            ServerBootstrap boot=new ServerBootstrap();
            boot.group(boss,works)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            ChannelPipeline pipeline = sc.pipeline();
                            pipeline.addLast("line",new LineBasedFrameDecoder(1024));
                            pipeline.addLast("encode", new StringEncoder());
                            pipeline.addLast("decode", new StringDecoder());
                            pipeline.addLast(new ServerHander());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG,128)
                    .option(ChannelOption.SO_KEEPALIVE,true);
            ChannelFuture future = boot.bind(this.port).sync();
            System.out.println("服务已经启动...............");
            future.channel().closeFuture().sync();
            System.out.println("服务已经关闭...............");
        }catch (Exception e){

        }finally {
            boss.shutdownGracefully();
            works.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        new ChatServer(8888).start();
    }
}

SeverHander

public class ServerHander extends SimpleChannelInboundHandler<String> {
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 每当从客户端有消息写入时
     *
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        Channel inComing = channelHandlerContext.channel();
        for (Channel channel : channels) {
            if (channel != inComing) {
                channel.writeAndFlush("[用户" + inComing.remoteAddress() + " 说:]" + s + "\n");
            } else {
                channel.writeAndFlush("[我说:]" + s + "\n");
            }
        }
    }

    /**
     * 当有客户端连接时,handlerAdded会执行,就把该客户端的通道记录下来,加入队列
     *
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel inComing = ctx.channel();//获得客户端通道
        //通知其他客户端有新人进入
        for (Channel channel : channels) {
            if (channel != inComing) {
                channel.writeAndFlush("[欢迎: " + inComing.remoteAddress() + "] 进入聊天室!\n");
            }
        }
        channels.add(inComing);//加入队列
    }

    /**
     * 断开连接
     *
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel outComing = ctx.channel();//获得客户端通道
        //通知其他客户端有人离开
        for (Channel channel : channels) {
            if (channel != outComing) {
                channel.writeAndFlush("[再见: ]" + outComing.remoteAddress() + " 离开聊天室!\n");
            }
        }

        channels.remove(outComing);
    }


    /**
     * 当服务器监听到客户端活动时
     *
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel inComing = ctx.channel();
        System.out.println("[" + inComing.remoteAddress() + "]: 进入聊天室");
    }

    /**
     * 离线
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel inComing = ctx.channel();
        System.out.println("[" + inComing.remoteAddress() + "]: 离线");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel inComing = ctx.channel();
        System.out.println(inComing.remoteAddress() + "通讯异常!");
        ctx.close();
    }
}

Client端

public class ChatClient {
    private String host;
    private int port;

    public ChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() {
        EventLoopGroup works = new NioEventLoopGroup();
        try {
            Bootstrap boot = new Bootstrap();
            boot.group(works).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            ChannelPipeline pipeline = sc.pipeline();
                            pipeline.addLast("line",new LineBasedFrameDecoder(1024));
                            pipeline.addLast("encode", new StringEncoder());//编码器
                            pipeline.addLast("decode", new StringDecoder());//解码器
                            pipeline.addLast(new ClientHander());
                        }
                    });
            ChannelFuture future = boot.connect(this.host, this.port).sync();
            System.out.println("客户端已经连接");
            future.channel().closeFuture().sync();
            System.out.println("客户端已经关闭");
        } catch (Exception e) {

        } finally {
            works.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new ChatClient("localhost",8888).start();
    }
}

ClientHander

public class ClientHander extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        new Thread(()-> {
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()) {
                    String s = scanner.nextLine()+"\n";
                    ByteBuf buffer = Unpooled.buffer(s.length());
                    buffer.writeBytes(s.getBytes());
                    ctx.writeAndFlush(buffer);
                }
        }).start();
    }
}

启动Sever

image.png

启动一个Client

image.png

启动第二个Client

image.png image.png image.png image.png

2、利用Netty实现的Http协议:

Server端

public class HttpServer {

    public static void main(String[] args) throws Exception {

        // 定义一对线程组
        // 主线程组, 用于接受客户端的连接,但是不做任何处理,跟老板一样,不做事
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 从线程组, 老板线程组会把任务丢给他,让手下线程组去做任务
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            // netty服务器的创建, ServerBootstrap 是一个启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)           // 设置主从线程组
                            .channel(NioServerSocketChannel.class)
                            .childHandler(new HttpServerInitializer()); 
            
            // 启动server,并且设置8088为启动的端口号,同时启动方式为同步
            ChannelFuture channelFuture = serverBootstrap.bind(8088).sync();
            
            // 监听关闭的channel,设置位同步方式
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}

public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        // 通过SocketChannel去获得对应的管道
        ChannelPipeline pipeline = channel.pipeline();
        
        // 通过管道,添加handler
        // HttpServerCodec是由netty自己提供的助手类,可以理解为拦截器
        // 当请求到服务端,我们需要做解码,响应到客户端做编码
        pipeline.addLast("HttpServerCodec", new HttpServerCodec());
        
        // 添加自定义的助手类,返回 "hello netty~"
        pipeline.addLast("customHandler", new CustomHandler());
    }

}
public class CustomHandler extends SimpleChannelInboundHandler<HttpObject> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) 
            throws Exception {
        // 获取channel
        Channel channel = ctx.channel();
        
        if (msg instanceof HttpRequest) {
            // 显示客户端的远程地址
            System.out.println(channel.remoteAddress());
            
            // 定义发送的数据消息
            ByteBuf content = Unpooled.copiedBuffer("Hello netty~", CharsetUtil.UTF_8);
            
            // 构建一个http response
            FullHttpResponse response = 
                    new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
                            HttpResponseStatus.OK, 
                            content);
            // 为响应增加数据类型和长度
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
            
            // 把响应刷到客户端
            ctx.writeAndFlush(response);
        }
        
    }
/*
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel注册到NioEventLoop");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel取消和NioEventLoop的绑定");
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel准备就绪");
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel被关闭");
        super.channelInactive(ctx);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel读数据完成");
        super.channelReadComplete(ctx);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("用户事件触发。。。");
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel可写更改");
        super.channelWritabilityChanged(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("捕获到异常");
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("新事件被添加");
        super.handlerAdded(ctx);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("事件被移除");
        super.handlerRemoved(ctx);
    }
*/
}

3、利用Netty实现的Websocket协议:

由于Websocket的握手需要使用http,所以在pipline中需要注册支持http的事件HttpServerCodec
Server端

public class WSServer {

    public static void main(String[] args) throws Exception {
        
        EventLoopGroup mainGroup = new NioEventLoopGroup();
        EventLoopGroup subGroup = new NioEventLoopGroup();
        
        try {
            ServerBootstrap server = new ServerBootstrap();
            server.group(mainGroup, subGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new WSServerInitialzer());
            
            ChannelFuture future = server.bind(8088).sync();
            
            future.channel().closeFuture().sync();
        } finally {
            mainGroup.shutdownGracefully();
            subGroup.shutdownGracefully();
        }
    }
    
}

public class WSServerInitialzer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        
        // websocket 基于http协议,所以要有http编解码器
        pipeline.addLast(new HttpServerCodec());
        // 对写大数据流的支持 
        pipeline.addLast(new ChunkedWriteHandler());
        // 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
        // 几乎在netty中的网络编程,都会使用到此hanler
        pipeline.addLast(new HttpObjectAggregator(1024*64));

    // 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开
        // 如果是读空闲或者写空闲,不处理
        pipeline.addLast(new IdleStateHandler(8, 10, 12));
        // 自定义的空闲状态检测
        pipeline.addLast(new HeartBeatHandler());



        /**
         * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws
         * 本handler会帮你处理一些繁重的复杂的事
         * 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳
         * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
         */
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        
        // 自定义的handler
        pipeline.addLast(new ChatHandler());
    }

}

TextWebSocketFrame: 在netty中,是用于为websocket专门处理文本的对象,frame是消息的载体

public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    // 用于记录和管理所有客户端的channle
    private static ChannelGroup clients = 
            new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) 
            throws Exception {
        // 获取客户端传输过来的消息
        String content = msg.text();
        System.out.println("接受到的数据:" + content);
        
//      for (Channel channel: clients) {
//          channel.writeAndFlush(
//              new TextWebSocketFrame(
//                      "[服务器在]" + LocalDateTime.now() 
//                      + "接受到消息, 消息为:" + content));
//      }
        // 下面这个方法,和上面的for循环效果是一样的
        clients.writeAndFlush(
                new TextWebSocketFrame(
                        "[服务器在]" + LocalDateTime.now() 
                        + "接受到消息, 消息为:" + content));
        
    }

    /**
     * 当客户端连接服务端之后(打开连接)
     * 获取客户端的channle,并且放到ChannelGroup中去进行管理
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        clients.add(ctx.channel());
    }

      @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        
        String channelId = ctx.channel().id().asShortText();
        System.out.println("客户端被移除,channelId为:" + channelId);
         //System.out.println("客户端断开,channle对应的长id为:" + ctx.channel().id().asLongText());
        //System.out.println("客户端断开,channle对应的短id为:" + ctx.channel().id().asShortText());
        // 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel
        clients.remove(ctx.channel());
    }
/**
发生异常时进行捕获
*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        // 发生异常之后关闭连接(关闭channel),随后从ChannelGroup中移除
        ctx.channel().close();
        clients.remove(ctx.channel());
    }   
    
}

心跳检测

/**
 * @Description: 用于检测channel的心跳handler 
 *               继承ChannelInboundHandlerAdapter,从而不需要实现channelRead0方法
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        
        // 判断evt是否是IdleStateEvent(用于触发用户事件,包含 读空闲/写空闲/读写空闲 )
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent)evt;     // 强制类型转换
            
            if (event.state() == IdleState.READER_IDLE) {
                System.out.println("进入读空闲...");
            } else if (event.state() == IdleState.WRITER_IDLE) {
                System.out.println("进入写空闲...");
            } else if (event.state() == IdleState.ALL_IDLE) {
                
                System.out.println("channel关闭前,users的数量为:" + ChatHandler.users.size());
                
                Channel channel = ctx.channel();
                // 关闭无用的channel,以防资源浪费
                channel.close();
                
                System.out.println("channel关闭后,users的数量为:" + ChatHandler.users.size());
            }
        }
        
    }
    
}

相关文章

网友评论

      本文标题:Netty基础(未完待续)

      本文链接:https://www.haomeiwen.com/subject/wjybyktx.html