美文网首页Java互联网科技Java 杂谈
提升能力从学习Netty开始

提升能力从学习Netty开始

作者: java欧阳丰 | 来源:发表于2019-08-05 11:02 被阅读3次

    netty 介绍

    一、 Netty 是什么

    Netty 是一个广泛使用的 Java 网络编程框架
    而Netty就是基于Java NIO技术封装的一套框架
    一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持

    二、Netty 组成部分

    1 、Channel

    NIO 基本结构,它代表了一个用于连接到实体如硬件设备、文件、网络套接字活程序组件,能够执行一个活多个不同的I/O 操作(例如读和写)的开放连接
    可以把 Channe 想象成一个可以 、打开 、关闭 、连接、断开 、传入、传出 数据的运输工具

    2、 Callback

    callback(回调)是一个简单的方法,提供给另一种方法作为引用,这个后者就可以在某个合适的时间调用前者
    Netty 内部使用回调处理事件时,一旦这样的回调被触发,事件可以有接口ChannelHandler的实现来处理

    3、Futre

    Futre 提供了另一种在操作完成时通知应用程序的方式 。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问

    4、事件和 ChannelHandler

    Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经
    发生的事件来触发适当的动作。这些动作可能是:
    记录日志;
    数据转换;
    流控制;
    应用程序逻辑
    传统的I/o是阻塞的
    Java 用 Selector 实现非阻塞IO
    使用了事件通知API以确定在一组非阻塞套接字中有那些已经能够进行IO相关的操作

    netty 解决TCP粘包和拆包问题

    三 、 什么时粘包和拆包

    首先TCP是一个"流"协议,犹如河中水一样连成一片,没有严格的分界线。当我们在发送数据的时候就会出现多发送与少发送问题,也就是TCP粘包与拆包。得不到我们想要的效果。
    所谓粘包:当你把A,B两个数据从甲发送到乙,本想A与B单独发送,但是你却把AB一起发送了,此时AB粘在一起,就是粘包了
    所谓拆包: 如果发送数据的时候,你把A、B拆成了几份发,就是拆包了。当然数据不是你主动拆的,是TCP流自动拆的

    解决办法

    1、消息定长,比如把报文消息固定为500字节,不够用空格补位
    2、在包尾增加回车换行符进行分割,例如FTP协议
    3、将消息分为消息头和消息体,消息头中包含表示消息总长度的字段
    4、更复杂的应用层协议

    新建maven项目 pom.xml 引入netty相关依赖

    <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>5.0.0.Alpha1</version>
            </dependency>
    

    编写服务端

    package com.example.netty;
     
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    import io.netty.util.CharsetUtil;
    import io.netty.util.ReferenceCountUtil;
     
    /**
     * netty服务端
     * NioEventLoopGroup是一个处理I/O操作的多线程事件循环。
     * Netty为不同类型的传输提供了各种EventLoopGroup实现。
     * 我们在这个例子中实现了一个服务器端应用程序,
     * 因此将使用两个NioEventLoopGroup。第一个通常被称为“boss”,接受传入的连接。第二个通常称为“worker”,
     * 在boss接受连接并将接受的连接注册到worker之后,处理接受连接的流量。使用多少线程以及如何将它们映射到创建的通道取决于EventLoopGroup实现,
     * 甚至可以通过构造函数进行配置。
     *
     * ServerBootstrap是一个设置服务器的助手类。您可以直接使用通道设置服务器。但是,请注意这是一个冗长的过程,在大多数情况下您不需要这样做。
     * 在这里,我们指定使用NioServerSocketChannel类,该类用于实例化一个新通道以接受传入连接。
     *
     * 这里指定的处理程序总是由新接受的通道计算。ChannelInitializer是用于帮助用户配置新通道的特殊处理程序。
     * 您很可能希望通过添加一些处理程序(如DiscardServerHandler)来实现您的网络应用程序,来配置新通道的ChannelPipeline。
     * 随着应用程序变得复杂,您可能会向管道中添加更多的处理程序,并最终将这个匿名类提取到顶级类中。
     *
     * 您还可以设置特定于通道实现的参数。我们正在编写TCP/IP服务器,因此我们可以设置套接字选项,如tcpNoDelay和keepAlive。
     * 请参阅ChannelOption的apidocs和特定的ChannelConfig实现,以获得受支持的ChannelOptions的概述。
     *
     * 你注意到option()和childOption()了吗?option()用于接收传入连接的NioServerSocketChannel。
     * childOption()用于父服务器通道接受的通道,在本例中是NioServerSocketChannel。
     * 我们现在准备好出发了。剩下的就是绑定到端口并启动服务器。在这里,我们绑定到机器中所有NICs(网络接口卡)的端口8080。
     * 现在,您可以任意次数地调用bind()方法(使用不同的绑定地址)。
     *
     * telnet 可以测试服务器是否工作
     * telnet localhost 8080
     */
    public class NettyServer {
        public static void main(String[] args) throws Exception {
            new NettyServer().run(8080);
        }
        //新建一个netty服务器
        public void run(int port) throws Exception{
            //NioEventLoopGroup是一个处理I/O操作的多线程循环
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workGroup = new NioEventLoopGroup();
            System.out.println("准备运行端口:" + port);
            try{
                //服务器的设置助手类
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap = serverBootstrap.group(bossGroup,workGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        //这里指定的处理程序总是由新接受的通道计算。
                        .childHandler (new ChildChannelHandler());
                //绑定端口,同步等待成功
                ChannelFuture future = serverBootstrap.bind(port).sync();
                //等待服务监听端口关闭
                future.channel().closeFuture().sync();
        } finally {
                    //退出,释放线程资源
                workGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    }
    //ChannelInitializer是用于帮助用户配置新通道的特殊处理程序
    class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
     
        //请求到达后调用
        protected void initChannel(SocketChannel socketChannel) throws Exception {
    //        ByteBuf byteBuf= Unpooled.copiedBuffer("$".getBytes());
    //        socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));
            socketChannel.pipeline().addLast(new StringDecoder());//进行字符串的编解码设置
            socketChannel.pipeline().addLast(new StringEncoder());
            socketChannel.pipeline().addLast(new ReadTimeoutHandler(60));//设置超时时间
            socketChannel.pipeline().addLast(new DiscardServerHandler());
        }
    }
     
    /**服务器类型设置
     *
     * **/
    class DiscardServerHandler extends ChannelHandlerAdapter {
        @Override
        //只要接收到数据,就会调用channelRead()方法
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
     
            try {
                //ByteBuf是一个引用计数的对象,必须通过release()方法显式地释放它
                ByteBuf in = (ByteBuf) msg;
                System.out.println("传输内容是");
                System.out.println(in.toString(CharsetUtil.UTF_8));
                //返回信息
                ByteBuf resp= Unpooled.copiedBuffer("收到信息$".getBytes());
                ctx.writeAndFlush(resp);
            }  finally {
                System.out.println(msg);
                ReferenceCountUtil.release(msg);
            }
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 出现异常就关闭
            cause.printStackTrace();
            ctx.close();
        }
     
    }
    

    编写客户端

    package com.example.netty;
     
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.util.CharsetUtil;
    import io.netty.util.ReferenceCountUtil;
     
    /**
     * netty 编写的客户端
     */
    public class TimeClient {
        public static void main(String[] args) throws Exception {
            new TimeClient().connect(8080,"localhost");
        }
        public  void connect(int port, String host)throws Exception{
            //配置客户端
            System.out.println(port+"--"+host);
            EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ByteBuf byteBuf= Unpooled.copiedBuffer("$".getBytes());
                                socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));
                                socketChannel.pipeline().addLast(new TimeClientHandler());
                            }
                        });
                //绑定端口,同步等待成功
                ChannelFuture future = bootstrap.connect(host,port).sync();
                //等待服务监听端口关闭
                future.channel().closeFuture().sync();
            }finally {
                //优雅退出,释放线程资源
                eventLoopGroup.shutdownGracefully();
            }
        }
    }
    class TimeClientHandler extends ChannelHandlerAdapter {
        private byte[] req;
        public TimeClientHandler(){
            req="$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$".getBytes();
        }
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ByteBuf message=null;
            for(int i=0;i<100;i++){
                message= Unpooled.buffer(req.length);
                message.writeBytes(req);
                ctx.writeAndFlush(message);
            }
        }
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            try {
                ByteBuf in = (ByteBuf) msg;
                System.out.println(in.toString(CharsetUtil.UTF_8));
            }  finally {
                ReferenceCountUtil.release(msg);
            }
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 出现异常就关闭
            cause.printStackTrace();
            ctx.close();
        }
     
    }
    

    感谢你的阅读如果感觉本文对你有所帮助可以点一下喜欢和关注一下,让更多的人可以看到这篇文章谢谢。

    相关文章

      网友评论

        本文标题:提升能力从学习Netty开始

        本文链接:https://www.haomeiwen.com/subject/cdupdctx.html