美文网首页
Netty基础(未完待续)

Netty基础(未完待续)

作者: 笔记本一号 | 来源:发表于2020-09-22 00:47 被阅读0次

    不多BB,我们开门见山
    Netty是一个提供了易于使用的API的NIO框架,具有高并发、高性能的特点

    Netty的Reactor线程模型

    https://blog.csdn.net/bingxuesiyang/article/details/89888664

    单线程模型

    所有的IO操作都是由一个NIO线程处理。

    Reactor内部通过selector 监控连接事件,收到事件后通过dispatch进行分发,如果是连接建立的事件,则由Acceptor处理,Acceptor通过accept接受连接,并创建一个Handler来处理连接后续的各种事件,如果是读写事件,直接调用连接对应的Handler来处理。
    Handler完成read->业务处理(decode->compute->encode)->send的全部流程。
    这种模型好处是简单,坏处却很明显,当某个Handler阻塞时,会导致其他客户端的handler和accpetor都得不到执行,无法做到高性能,只适用于业务处理非常快速的场景。

    缺点:单线程NIO负载过重,并发高时产生任务堆积,延迟过高,不适合并发高的场景

    多线程模型

    由一个NIO线程处理客户端连接,由一组NIO线程池处理IO操作

    主线程中,Reactor对象通过selector监控连接事件,收到事件后通过dispatch进行分发,如果是连接建立事件,则由Acceptor处理,Acceptor通过accept接收连接,并创建一个Handler来处理后续事件,而Handler只负责响应事件,不进行业务操作,也就是只进行read读取数据和write写出数据,业务处理交给一个线程池进行处理。
    线程池分配一个线程完成真正的业务处理,然后将响应结果交给主进程的Handler处理,Handler将结果send给client。
    单Reactor承当所有事件的监听和响应,而当我们的服务端遇到大量的客户端同时进行连接,或者在请求连接时执行一些耗时操作,比如身份认证,权限检查等,这种瞬时的高并发就容易成为性能瓶颈。

    缺点:连接处理能力有限,客户连接并发高时,会产生连接延迟过高

    主从线程模型

    一组NIO线程池处理连接,一组NIO线程池处理IO操作

    存在多个Reactor,每个Reactor都有自己的selector选择器,线程和dispatch。
    主线程中的mainReactor通过自己的selector监控连接建立事件,收到事件后通过Accpetor接收,将新的连接分配给某个子线程。
    子线程中的subReactor将mainReactor分配的连接加入连接队列中通过自己的selector进行监听,并创建一个Handler用于处理后续事件
    Handler完成read->业务处理->send的完整业务流程。

    缺点:无

    Netty的核心概念

    EventLoop和EventLoopGroup

    EventLoopGroup是一个线程组,包含了一组NIO线程,主要用于处理网络事件,例如客户端连接,Channel的事件。每个EventLoop负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个EventLoop。EventLoopGroup实际上就是Reactor线程组,Netty 的 IO 线程 NioEventLoop 由于聚合了多路复用器 Selector,一组EventLoopGroup可以同时并发处理成百上千个客户端连接,另一组EventLoopGroup可以处理客户端的通道事件

    ChannelHandler和ChannelPipeline

    ChannelHandler是一个接口,专门处理 I/O 或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。ChannelPipeline是ChannelHandler的容器,而每个Channel都会有一个ChannelPipeline,负责ChannelHandler的管理和事件拦截与调度。Channel会将事件扔到ChannelPipeline中,然后事件会被ChannelPipeline安排一系列的ChannelHandler拦截处理,例如编解码事件、TCP的粘包拆包事件、用户自定义Handler等,经过一系列加工后,事件的消息会被添加缓冲区中等待Channel的刷新和发送。

    事件出站(Outbound)和入站(Inbound)

    • Inbound事件:通常由I/O线程触发,例如TCP链路连接、链路关闭、读事件、异常通知等,它们对应下图的左半部分,这些方法被封装在ChannelInboundHandler里面
    • Outbound事件:通常是用户主动发起的网络IO操作,例如发起连接操作、绑定操作、消息发送等,它们对应下图的右半部分,这些方法被封装在ChannelOutboundHandler里面

    ChannelPipeline为ChannelHandler链提供了一个容器并定义了用于沿着链传播入站和出站事件流的API,当一个数据流进入 ChannlePipeline 时,它会从 ChannelPipeline 头部开始传给第一个 ChannelInboundHandler ,当第一个处理完后再传给下一个,一直传递到管道的尾部。
    与之相对应的是,当数据被写出时,它会从管道的尾部开始,先经过管道尾部的 “最后” 一个ChannelOutboundHandler,当它处理完成后会传递给前一个 ChannelOutboundHandler 。

    事件出站

    ChannelHandlerContext

    保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。I/O 事件由 ChannelInboundHandler 或 ChannelOutboundHandler 处理,并通过调用 ChannelHandlerContext 中定义的事件传播方法。

    一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰。

    ServerBootstrap和Bootstrap

    ServerBootstrap是服务端的启动助手,Bootstrap是客户端的启动助手,它们的目的主要是降低开发的复杂度

    TCP的粘包和拆包

    在TCP协议中一个完整的数据包可能会被TCP拆分为多个包发送,或者将多个小的数据包封装成大的数据包发送,这就会发生TCP的粘包和拆包的问题。
    产生原因:

    • 要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。
    • 待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。
    • 要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包。
    • 接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。
    TCP的粘包和拆包的解决方法

    粘包问题解决

    • FixedLengthFrameDecoder:定长协议解码器,我们可以指定固定的字节数算一个完整的报文
    • LineBasedFrameDecoder:行分隔符解码器,遇到\n或者\r\n,则认为是一个完整的报文
    • DelimiterBasedFrameDecoder:分隔符解码器,与LineBasedFrameDecoder类似,只不过分隔符可以自己指定
    • LengthFieldBasedFrameDecoder:长度编码解码器,将报文划分为报文头/报文体,根据报文头中的Length字段确定报文体的长度,因此报文提的长度是可变的
    • JsonObjectDecoder:json格式解码器,当检测到匹配数量的"{" 、”}”或”[””]”时,则认为是一个完整的json对象或者json数组。

    Netty的默认线程数

    Netty 默认是 CPU 处理器数的两倍,bind 完之后启动。

    Netty高性能的原因

    • IO 线程模型:同步非阻塞,用最少的资源做更多的事。
    • 内存零拷贝:尽量减少不必要的内存拷贝,实现了更高效率的传输。
    • 内存池设计:申请的内存可以重用,主要指直接内存。内部实现是用一颗二叉查找树管理内存分配情况。
    • 串形化处理读写:避免使用锁带来的性能开销。
    • 高性能序列化协议:支持 protobuf 等高性能序列化协议。

    Netty的序列化协议:

    • XML,优点:人机可读性好,可指定元素或特性的名称。缺点:序列化数据只包含数据本身以及类的结构,不包括类型标识和程序集信息;只能序列化公共属性和字段;不能序列化方法;文件庞大,文件格式复杂,传输占带宽。适用场景:当做配置文件存储数据,实时数据转换。

    • JSON,是一种轻量级的数据交换格式,优点:兼容性高、数据格式比较简单,易于读写、序列化后数据较小,可扩展性好,兼容性好、与XML相比,其协议比较简单,解析速度比较快。缺点:数据的描述性比XML差、不适合性能要求为ms级别的情况、额外空间开销比较大。适用场景(可替代XML):跨防火墙访问、可调式性要求高、基于Web browser的Ajax请求、传输数据量相对小,实时性要求相对低(例如秒级别)的服务。

    • Fastjson,采用一种“假定有序快速匹配”的算法。优点:接口简单易用、目前java语言中最快的json库。缺点:过于注重快,而偏离了“标准”及功能性、代码质量不高,文档不全。适用场景:协议交互、Web输出、Android客户端

    • Thrift,不仅是序列化协议,还是一个RPC框架。优点:序列化后的体积小, 速度快、支持多种语言和丰富的数据类型、对于数据字段的增删具有较强的兼容性、支持二进制压缩编码。缺点:使用者较少、跨防火墙访问时,不安全、不具有可读性,调试代码时相对困难、不能与其他传输层协议共同使用(例如HTTP)、无法支持向持久层直接读写数据,即不适合做数据持久化序列化协议。适用场景:分布式系统的RPC解决方案

    • Avro,Hadoop的一个子项目,解决了JSON的冗长和没有IDL的问题。优点:支持丰富的数据类型、简单的动态语言结合功能、具有自我描述属性、提高了数据解析速度、快速可压缩的二进制数据形式、可以实现远程过程调用RPC、支持跨编程语言实现。缺点:对于习惯于静态类型语言的用户不直观。适用场景:在Hadoop中做Hive、Pig和MapReduce的持久化数据格式。

    • Protobuf,将数据结构以.proto文件进行描述,通过代码生成工具可以生成对应数据结构的-POJO对象和Protobuf相关的方法和属性。优点:序列化后码流小,性能高、结构化数据存储格式(XML JSON等)、通过标识字段的顺序,可以实现协议的前向兼容、结构化的文档更容易管理和维护。缺点:需要依赖于工具生成代码、支持的语言相对较少,官方只支持Java 、C++ 、python。适用场景:对性能要求高的RPC调用、具有良好的跨防火墙的访问属性、适合应用层对象的持久化

    • protostuff 基于protobuf协议,但不需要配置proto文件,直接导包即可

    • Jboss marshaling 可以直接序列化java类, 无须实java.io.Serializable接口

    • MessagePack 一个高效的二进制序列化格式

    • Hessian 采用二进制协议的轻量级remoting onhttp工具

    • kryo 基于protobuf协议,只支持java语言,需要注册(Registration),然后序列化(Output),反序列化(Input)

    Netty实战:1、利用Netty实现的聊天室 2、利用Netty实现Http协议 3、利用Netty实现Websocket协议

    1、利用Netty实现的聊天室:

    Sever端

    public class ChatServer {
        private int port;
    
        public ChatServer(int port) {
            this.port = port;
        }
    
        public void start(){
            EventLoopGroup boss=new NioEventLoopGroup();
            EventLoopGroup works=new NioEventLoopGroup();
            try {
                ServerBootstrap boot=new ServerBootstrap();
                boot.group(boss,works)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel sc) throws Exception {
                                ChannelPipeline pipeline = sc.pipeline();
                                pipeline.addLast("line",new LineBasedFrameDecoder(1024));
                                pipeline.addLast("encode", new StringEncoder());
                                pipeline.addLast("decode", new StringDecoder());
                                pipeline.addLast(new ServerHander());
                            }
                        })
                        .option(ChannelOption.SO_BACKLOG,128)
                        .option(ChannelOption.SO_KEEPALIVE,true);
                ChannelFuture future = boot.bind(this.port).sync();
                System.out.println("服务已经启动...............");
                future.channel().closeFuture().sync();
                System.out.println("服务已经关闭...............");
            }catch (Exception e){
    
            }finally {
                boss.shutdownGracefully();
                works.shutdownGracefully();
            }
    
        }
    
        public static void main(String[] args) {
            new ChatServer(8888).start();
        }
    }
    
    

    SeverHander

    public class ServerHander extends SimpleChannelInboundHandler<String> {
        public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
        /**
         * 每当从客户端有消息写入时
         *
         */
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
            Channel inComing = channelHandlerContext.channel();
            for (Channel channel : channels) {
                if (channel != inComing) {
                    channel.writeAndFlush("[用户" + inComing.remoteAddress() + " 说:]" + s + "\n");
                } else {
                    channel.writeAndFlush("[我说:]" + s + "\n");
                }
            }
        }
    
        /**
         * 当有客户端连接时,handlerAdded会执行,就把该客户端的通道记录下来,加入队列
         *
         */
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel inComing = ctx.channel();//获得客户端通道
            //通知其他客户端有新人进入
            for (Channel channel : channels) {
                if (channel != inComing) {
                    channel.writeAndFlush("[欢迎: " + inComing.remoteAddress() + "] 进入聊天室!\n");
                }
            }
            channels.add(inComing);//加入队列
        }
    
        /**
         * 断开连接
         *
         */
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Channel outComing = ctx.channel();//获得客户端通道
            //通知其他客户端有人离开
            for (Channel channel : channels) {
                if (channel != outComing) {
                    channel.writeAndFlush("[再见: ]" + outComing.remoteAddress() + " 离开聊天室!\n");
                }
            }
    
            channels.remove(outComing);
        }
    
    
        /**
         * 当服务器监听到客户端活动时
         *
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            Channel inComing = ctx.channel();
            System.out.println("[" + inComing.remoteAddress() + "]: 进入聊天室");
        }
    
        /**
         * 离线
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            Channel inComing = ctx.channel();
            System.out.println("[" + inComing.remoteAddress() + "]: 离线");
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            Channel inComing = ctx.channel();
            System.out.println(inComing.remoteAddress() + "通讯异常!");
            ctx.close();
        }
    }
    
    

    Client端

    public class ChatClient {
        private String host;
        private int port;
    
        public ChatClient(String host, int port) {
            this.host = host;
            this.port = port;
        }
    
        public void start() {
            EventLoopGroup works = new NioEventLoopGroup();
            try {
                Bootstrap boot = new Bootstrap();
                boot.group(works).channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel sc) throws Exception {
                                ChannelPipeline pipeline = sc.pipeline();
                                pipeline.addLast("line",new LineBasedFrameDecoder(1024));
                                pipeline.addLast("encode", new StringEncoder());//编码器
                                pipeline.addLast("decode", new StringDecoder());//解码器
                                pipeline.addLast(new ClientHander());
                            }
                        });
                ChannelFuture future = boot.connect(this.host, this.port).sync();
                System.out.println("客户端已经连接");
                future.channel().closeFuture().sync();
                System.out.println("客户端已经关闭");
            } catch (Exception e) {
    
            } finally {
                works.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            new ChatClient("localhost",8888).start();
        }
    }
    
    

    ClientHander

    public class ClientHander extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
            System.out.println(s);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            new Thread(()-> {
                    Scanner scanner = new Scanner(System.in);
                    while (scanner.hasNextLine()) {
                        String s = scanner.nextLine()+"\n";
                        ByteBuf buffer = Unpooled.buffer(s.length());
                        buffer.writeBytes(s.getBytes());
                        ctx.writeAndFlush(buffer);
                    }
            }).start();
        }
    }
    

    启动Sever

    image.png

    启动一个Client

    image.png

    启动第二个Client

    image.png image.png image.png image.png

    2、利用Netty实现的Http协议:

    Server端

    public class HttpServer {
    
        public static void main(String[] args) throws Exception {
    
            // 定义一对线程组
            // 主线程组, 用于接受客户端的连接,但是不做任何处理,跟老板一样,不做事
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            // 从线程组, 老板线程组会把任务丢给他,让手下线程组去做任务
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            
            try {
                // netty服务器的创建, ServerBootstrap 是一个启动类
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup)           // 设置主从线程组
                                .channel(NioServerSocketChannel.class)
                                .childHandler(new HttpServerInitializer()); 
                
                // 启动server,并且设置8088为启动的端口号,同时启动方式为同步
                ChannelFuture channelFuture = serverBootstrap.bind(8088).sync();
                
                // 监听关闭的channel,设置位同步方式
                channelFuture.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
    }
    
    
    public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel channel) throws Exception {
            // 通过SocketChannel去获得对应的管道
            ChannelPipeline pipeline = channel.pipeline();
            
            // 通过管道,添加handler
            // HttpServerCodec是由netty自己提供的助手类,可以理解为拦截器
            // 当请求到服务端,我们需要做解码,响应到客户端做编码
            pipeline.addLast("HttpServerCodec", new HttpServerCodec());
            
            // 添加自定义的助手类,返回 "hello netty~"
            pipeline.addLast("customHandler", new CustomHandler());
        }
    
    }
    
    public class CustomHandler extends SimpleChannelInboundHandler<HttpObject> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) 
                throws Exception {
            // 获取channel
            Channel channel = ctx.channel();
            
            if (msg instanceof HttpRequest) {
                // 显示客户端的远程地址
                System.out.println(channel.remoteAddress());
                
                // 定义发送的数据消息
                ByteBuf content = Unpooled.copiedBuffer("Hello netty~", CharsetUtil.UTF_8);
                
                // 构建一个http response
                FullHttpResponse response = 
                        new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
                                HttpResponseStatus.OK, 
                                content);
                // 为响应增加数据类型和长度
                response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
                response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
                
                // 把响应刷到客户端
                ctx.writeAndFlush(response);
            }
            
        }
    /*
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel注册到NioEventLoop");
            super.channelRegistered(ctx);
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel取消和NioEventLoop的绑定");
            super.channelUnregistered(ctx);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel准备就绪");
            super.channelActive(ctx);
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel被关闭");
            super.channelInactive(ctx);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel读数据完成");
            super.channelReadComplete(ctx);
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            System.out.println("用户事件触发。。。");
            super.userEventTriggered(ctx, evt);
        }
    
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel可写更改");
            super.channelWritabilityChanged(ctx);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("捕获到异常");
            super.exceptionCaught(ctx, cause);
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("新事件被添加");
            super.handlerAdded(ctx);
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            System.out.println("事件被移除");
            super.handlerRemoved(ctx);
        }
    */
    }
    

    3、利用Netty实现的Websocket协议:

    由于Websocket的握手需要使用http,所以在pipline中需要注册支持http的事件HttpServerCodec
    Server端

    public class WSServer {
    
        public static void main(String[] args) throws Exception {
            
            EventLoopGroup mainGroup = new NioEventLoopGroup();
            EventLoopGroup subGroup = new NioEventLoopGroup();
            
            try {
                ServerBootstrap server = new ServerBootstrap();
                server.group(mainGroup, subGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new WSServerInitialzer());
                
                ChannelFuture future = server.bind(8088).sync();
                
                future.channel().closeFuture().sync();
            } finally {
                mainGroup.shutdownGracefully();
                subGroup.shutdownGracefully();
            }
        }
        
    }
    
    
    public class WSServerInitialzer extends ChannelInitializer<SocketChannel> {
    
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            
            // websocket 基于http协议,所以要有http编解码器
            pipeline.addLast(new HttpServerCodec());
            // 对写大数据流的支持 
            pipeline.addLast(new ChunkedWriteHandler());
            // 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
            // 几乎在netty中的网络编程,都会使用到此hanler
            pipeline.addLast(new HttpObjectAggregator(1024*64));
    
        // 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开
            // 如果是读空闲或者写空闲,不处理
            pipeline.addLast(new IdleStateHandler(8, 10, 12));
            // 自定义的空闲状态检测
            pipeline.addLast(new HeartBeatHandler());
    
    
    
            /**
             * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws
             * 本handler会帮你处理一些繁重的复杂的事
             * 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳
             * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
             */
            pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
            
            // 自定义的handler
            pipeline.addLast(new ChatHandler());
        }
    
    }
    
    

    TextWebSocketFrame: 在netty中,是用于为websocket专门处理文本的对象,frame是消息的载体

    public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    
        // 用于记录和管理所有客户端的channle
        private static ChannelGroup clients = 
                new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) 
                throws Exception {
            // 获取客户端传输过来的消息
            String content = msg.text();
            System.out.println("接受到的数据:" + content);
            
    //      for (Channel channel: clients) {
    //          channel.writeAndFlush(
    //              new TextWebSocketFrame(
    //                      "[服务器在]" + LocalDateTime.now() 
    //                      + "接受到消息, 消息为:" + content));
    //      }
            // 下面这个方法,和上面的for循环效果是一样的
            clients.writeAndFlush(
                    new TextWebSocketFrame(
                            "[服务器在]" + LocalDateTime.now() 
                            + "接受到消息, 消息为:" + content));
            
        }
    
        /**
         * 当客户端连接服务端之后(打开连接)
         * 获取客户端的channle,并且放到ChannelGroup中去进行管理
         */
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            clients.add(ctx.channel());
        }
    
          @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            
            String channelId = ctx.channel().id().asShortText();
            System.out.println("客户端被移除,channelId为:" + channelId);
             //System.out.println("客户端断开,channle对应的长id为:" + ctx.channel().id().asLongText());
            //System.out.println("客户端断开,channle对应的短id为:" + ctx.channel().id().asShortText());
            // 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel
            clients.remove(ctx.channel());
        }
    /**
    发生异常时进行捕获
    */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            // 发生异常之后关闭连接(关闭channel),随后从ChannelGroup中移除
            ctx.channel().close();
            clients.remove(ctx.channel());
        }   
        
    }
    
    

    心跳检测

    /**
     * @Description: 用于检测channel的心跳handler 
     *               继承ChannelInboundHandlerAdapter,从而不需要实现channelRead0方法
     */
    public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            
            // 判断evt是否是IdleStateEvent(用于触发用户事件,包含 读空闲/写空闲/读写空闲 )
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent)evt;     // 强制类型转换
                
                if (event.state() == IdleState.READER_IDLE) {
                    System.out.println("进入读空闲...");
                } else if (event.state() == IdleState.WRITER_IDLE) {
                    System.out.println("进入写空闲...");
                } else if (event.state() == IdleState.ALL_IDLE) {
                    
                    System.out.println("channel关闭前,users的数量为:" + ChatHandler.users.size());
                    
                    Channel channel = ctx.channel();
                    // 关闭无用的channel,以防资源浪费
                    channel.close();
                    
                    System.out.println("channel关闭后,users的数量为:" + ChatHandler.users.size());
                }
            }
            
        }
        
    }
    

    相关文章

      网友评论

          本文标题:Netty基础(未完待续)

          本文链接:https://www.haomeiwen.com/subject/wjybyktx.html