美文网首页
Netty学习笔记(三)

Netty学习笔记(三)

作者: dev_winner | 来源:发表于2020-12-08 09:37 被阅读0次
  • Netty 应用实例:群聊系统。
  • 要求:①实现多人群聊;②服务器端:可以监测用户上线,离线,并实现消息转发功能;③客户端:通过 channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)。
  • GroupChatServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class GroupChatServer {
    private int port; //监听端口
    public GroupChatServer(int port) {
        this.port = port;
    }
    //编写run方法,处理客户端的请求
    public void run() throws Exception {
        //创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //获取到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //向pipeline加入解码器
                            pipeline.addLast("decoder", new StringDecoder());
                            //向pipeline加入编码器
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自己的业务处理handler
                            pipeline.addLast(new GroupChatServerHandler());
                        }
                    });
            System.out.println("netty 服务器启动...");
            ChannelFuture channelFuture = b.bind(port).sync();
            //监听关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        new GroupChatServer(7000).run();
    }
}
  • GroupChatServerHandler.java
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
    //public static List<Channel> channels = new ArrayList<Channel>();
    //使用一个hashMap 管理
    //public static Map<String, Channel> channels = new HashMap<String,Channel>();
    //定义一个channel 组,管理所有的channel
    //GlobalEventExecutor.INSTANCE 是全局的事件执行器(单例)
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    //handlerAdded 表示连接一旦建立,第一个被执行
    //将当前channel 加入到 channelGroup
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //将该客户加入聊天的信息推送给其它在线的客户端
        //该方法会将 channelGroup 中所有的channel 都遍历一次,并发送消息,所以我们不需要自己遍历
        channelGroup.writeAndFlush("[客户端]:" + channel.remoteAddress() + " 加入聊天:" + sdf.format(new java.util.Date()) + " \n");
        channelGroup.add(channel);
    }
    //断开连接,将xx客户离开信息推送给当前在线的客户
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客户端]:" + channel.remoteAddress() + " 离开了\n");
        System.out.println("channelGroup size:" + channelGroup.size());
    }
    //表示channel 处于活动状态,提示 xxx 上线
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 上线了~");
    }
    //表示channel 处于不活动状态,提示 xxx离线了
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 离线了~");
    }
    //读取数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        //获取到当前channel
        Channel channel = ctx.channel();
        //这时我们遍历channelGroup, 根据不同的情况,回送不同的消息
        channelGroup.forEach(ch -> {
            //若不是当前的channel,则转发消息
            if (channel != ch) {
                ch.writeAndFlush("[客户端]:" + channel.remoteAddress() + " 发送了消息:" + msg + "\n");
            } else {//回显自己发送的消息给自己
                ch.writeAndFlush("[自己]发送了消息:" + msg + "\n");
            }
        });
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //关闭通道
        ctx.close();
    }
}
  • GroupChatClient.java
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class GroupChatClient {
    //属性
    private final String host;
    private final int port;
    public GroupChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //得到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //加入相关handler
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自定义的handler
                            pipeline.addLast(new GroupChatClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            Channel channel = channelFuture.channel();
            System.out.println("-------" + channel.localAddress() + "--------");
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                //通过channel 发送到服务器端
                channel.writeAndFlush(msg + "\r\n");
            }
        } finally {
            group.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        new GroupChatClient("127.0.0.1", 7000).run();
    }
}
  • GroupChatClientHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}
  • Netty 应用实例:心跳机制检测。
  • 要求:当服务器超过 3 秒没有读时,就提示读空闲;当服务器超过 5 秒没有写操作时,就提示写空闲,实现当服务器超过 7 秒没有读或者写操作时,就提示读写空闲。
  • MyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class MyServer {
    public static void main(String[] args) throws Exception {
        //创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); //增加一个日志处理器
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    //加入一个netty 提供 IdleStateHandler
                    /*
                        1、IdleStateHandler 是netty 提供的处理空闲状态的处理器
                        2、long readerIdleTime:表示多长时间没有读,就会发送一个心跳检测包检测是否连接
                        3、long writerIdleTime:表示多长时间没有写,就会发送一个心跳检测包检测是否连接
                        4、long allIdleTime:表示多长时间没有读写,就会发送一个心跳检测包检测是否连接
                        5、文档说明:triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.
                        6、当 IdleStateEvent 触发后,就会传递给管道 的下一个handler去处理
                            通过调用(触发)下一个handler 的 userEventTriggered,在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)
                     */
                    pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
                    //加入一个对空闲检测进一步处理的handler(自定义)
                    pipeline.addLast(new MyServerHandler());
                }
            });
            //启动服务器
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  • MyServerHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
public class MyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * @param ctx 上下文
     * @param evt 事件
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            //将 evt 向下转型 IdleStateEvent
            IdleStateEvent event = (IdleStateEvent) evt;
            String eventType = null;
            switch (event.state()) {
                case READER_IDLE:
                    eventType = "读空闲";
                    break;
                case WRITER_IDLE:
                    eventType = "写空闲";
                    break;
                case ALL_IDLE:
                    eventType = "读写空闲";
                    break;
            }
            System.out.println(ctx.channel().remoteAddress() + "--超时时间--" + eventType);
            System.out.println("服务器做相应处理...");
            //若发生空闲,则关闭通道
            //ctx.channel().close();
        }
    }
}
  • Netty 应用实例:Netty 通过 WebSocket 编程实现服务器和客户端长连接。
  • 要求:实现基于 WebSocket 的长连接的全双工地交互,通过改变 Http 协议多次请求的约束,实现了长连接,服务器可以发送消息给浏览器。客户端浏览器和服务器端会相互感知,比如服务器关闭了,浏览器会感知,同样浏览器关闭了,服务器会感知。
  • hello.html
<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8">
    <title>Title</title>
  </head>
  <body>
    <script>
      var socket;
      //判断当前浏览器是否支持websocket
      if (window.WebSocket) {
        //go on
        socket = new WebSocket("ws://localhost:7000/hello");
        //相当于channelReado,ev收到服务器端回送的消息
        socket.onmessage = function (ev) {
          var rt = document.getElementById("responseText");
          rt.value = rt.value + "\n" + ev.data;
        };
        //相当于连接开启(感知到连接开启)
        socket.onopen = function (ev) {
          var rt = document.getElementById("responseText");
          rt.value = "连接开启了.."
        };
        //相当于连接关闭(感知到连接关闭)
        socket.onclose = function (ev) {
          var rt = document.getElementById("responseText");
          rt.value = rt.value + "\n" + "连接关闭了.."
        }
      } else {
        alert("当前浏览器不支持websocket...")
      }
      //发送消息到服务器
      function send(message) {
        if (!window.socket) { //先判断socket是否创建好
          return;
        }
        if (socket.readyState == WebSocket.OPEN) {
          //通过socket 发送消息
          socket.send(message)
        } else {
          alert("连接没有开启...");
        }
      }
    </script>
    <form onsubmit="return false">
      <textarea name="message" style="height: 300px; width: 300px"></textarea>
      <input type="button" value="发生消息" onclick="send(this.form.message.value)">
      <textarea id="responseText" style="height: 300px; width: 300px"></textarea>
      <input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
    </form>
  </body>
</html>
  • MyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class MyServer {
    public static void main(String[] args) throws Exception {
        //创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    //因为基于http协议,所以要使用http的编码和解码器
                    pipeline.addLast(new HttpServerCodec());
                    //数据以块方式写,需要添加ChunkedWriteHandler处理器
                    pipeline.addLast(new ChunkedWriteHandler());
                    /*
                        1、http数据在传输过程中是分段,HttpObjectAggregator,就是可以将多个段聚合
                        2、这就是为什么当浏览器发送大量数据时,就会发出多次http请求
                     */
                    pipeline.addLast(new HttpObjectAggregator(8192));
                    /*
                        1、对应websocket,它的数据是以帧(frame) 形式传递
                        2、WebSocketFrame 有六个实现子类
                        3、浏览器请求 ws://localhost:7000/xxx:表示请求的uri
                        4、WebSocketServerProtocolHandler 核心功能是将http协议升级为ws协议(通过一个状态码 101来切换),保持长连接状态
                     */
                    pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
                    //自定义的handler,处理业务逻辑
                    pipeline.addLast(new MyTextWebSocketFrameHandler());
                }
            });
            //启动服务器
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  • MyTextWebSocketFrameHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDateTime;
//这里 TextWebSocketFrame 类型,表示一个文本帧(frame)
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        System.out.println("服务器收到消息:" + msg.text());
        //回复消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间:" + LocalDateTime.now() + " " + msg.text()));
    }
    //当web客户端连接后,handlerAdded 方法被触发执行
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //id 表示唯一的值,LongText 是唯一的,ShortText 不是唯一
        System.out.println("handlerAdded 被调用:" + ctx.channel().id().asLongText());
        System.out.println("handlerAdded 被调用:" + ctx.channel().id().asShortText());
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved 被调用:" + ctx.channel().id().asLongText());
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("异常发生 " + cause.getMessage());
        ctx.close(); //关闭连接
    }
}
  • 编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,所以在发送数据时就需要编码,接收数据时就需要解码。
  • codec(编解码器)的组成部分有两个:decoder(解码器)和 encoder(编码器)。encoder 负责把业务数据转换成字节码数据,decoder 负责把字节码数据转换成业务数据。
  • Netty 提供的编码器:StringEncoder:对字符串数据进行编码;ObjectEncoder:对Java对象进行编码。
  • Netty 提供的解码器:StringDecoder:对字符串数据进行解码;ObjectDecoder:对 Java 对象进行解码。
  • Netty 本身自带的ObjectDecoderObjectEncoder可以用来实现 POJO 对象或各种业务对象的编码和解码,底层使用的仍是Java序列化技术,而Java序列化技术本身效率就不高,存在如下问题:①无法跨语言;②序列化后的体积太大,是二进制编码的5倍多;③序列化性能太低。
  • Protobuf(Google Protocol Buffers)是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化(序列化)。它很适合做数据存储或 RPC 数据交换格式(远程过程调用 remote procedure call)。目前很多公司使用的数据交换技术:http + json tcp + protobuf。
  • Protobuf 是以message的方式来管理数据的,并且支持跨平台、跨语言。使用 protobuf 编译器能自动生成代码,Protobuf 是将类的定义使用.proto文件进行描述。
  • Protobuf 入门案例:要求:客户端可以随机发送StudentPoJo / WorkerPoJo对象到服务器,服务端能接StudentPoJo / WorkerPoJo对象(需要判断是哪种类型),并显示信息(通过 Protobuf 解码)。
  • 导入protobuf依赖:
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.13.0</version>
</dependency>
  • Student.proto
syntax = "proto3";
option optimize_for = SPEED; // 加快解析
option java_package = "com.zzw.netty.codec2"; //指定生成到哪个包下
option java_outer_classname = "MyDataInfo"; // 外部类名, 文件名
//protobuf 可以使用 message 管理其他的message
message MyMessage {
  //定义一个枚举类型
  enum DataType {
    StudentType = 0; //在proto3中要求enum的编号从0开始
    WorkerType = 1;
  }
  //用data_type 来标识传的是哪一个枚举类型
  DataType data_type = 1;
  //表示每次枚举类型最多只能出现其中的一个,节省空间
  oneof dataBody {
    Student student = 2;
    Worker worker = 3;
  }
}
message Student {
  int32 id = 1;//Student类的属性
  string name = 2;
}
message Worker {
  string name = 1;
  int32 age = 2;
}
  • 下载好protoc编译器,配置好环境变量,然后编译Student.proto文件并在当前路径下存放生成的文件MyDataInfo.javaprotoc --java_out=. Student.proto
  • NettyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
public class NettyServer {
    public static void main(String[] args) throws Exception {
        //创建BossGroup 和 WorkerGroup
        //说明
        //1、创建两个线程组 bossGroup 和 workerGroup
        //2、bossGroup 只是处理连接请求,真正的客户端业务处理是会交给 workerGroup 完成
        //3、两个都是无限循环
        //4、bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数:默认值为 cpu核数 * 2
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个
        try {
            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程来进行设置
            bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    .channel(NioServerSocketChannel.class) //使用 NioSocketChannel 作为服务器的通道实现
                    .option(ChannelOption.SO_BACKLOG, 128) //设置线程队列等待连接的个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
                    //.handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
                    .childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象(匿名对象)
                        //给 pipeline 设置处理器
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //在pipeline中加入 ProtobufDecoder
                            //指定对哪种对象进行解码
                            pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
                            pipeline.addLast(new NettyServerHandler()); //给 workerGroup 的 EventLoop 对应的管道设置处理器
                        }
                    });
            System.out.println(".....服务器 is ready...");
            //绑定一个端口并且同步处理,生成了一个 ChannelFuture 对象
            //启动服务器(并绑定端口)
            ChannelFuture cf = bootstrap.bind(6668).sync();
            //给 cf 注册监听器,监控我们关心的事件
            //绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑
            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("监听端口 6668 成功");
                    } else {
                        System.out.println("监听端口 6668 失败");
                    }
                }
            });
            //对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  • NettyServerHandler.java
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
 * 说明:自定义一个Handler 需要继承 netty 规定好的某个HandlerAdapter(规范)
 */
//public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
    //读取数据实际(这里我们可以读取客户端发送的消息)
    /**
     * 1、ChannelHandlerContext ctx:上下文对象,含有管道 pipeline,通道channel,地址
     * 2、Object msg:客户端发送的数据,默认是 Object
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MyDataInfo.MyMessage msg) throws Exception {
        //根据dataType来显示不同的信息
        MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
        if (dataType == MyDataInfo.MyMessage.DataType.StudentType) {
            MyDataInfo.Student student = msg.getStudent();
            System.out.println("学生id:" + student.getId() + ",学生姓名:" + student.getName());
        } else if (dataType == MyDataInfo.MyMessage.DataType.WorkerType) {
            MyDataInfo.Worker worker = msg.getWorker();
            System.out.println("工人的年龄:" + worker.getAge() + ",工人的姓名:" + worker.getName());
        } else {
            System.out.println("传输的类型不正确!");
        }
    }
    //数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //writeAndFlush:write + flush
        //将数据写入到缓存,并刷新
        //一般需要对发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
    }
    //发生异常时,需要关闭通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
  • NettyClient.java
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
public class NettyClient {
    public static void main(String[] args) throws Exception {
        //客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //创建客户端启动对象
            //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //在pipeline中加入编码器 ProtobufEncoder()
                            pipeline.addLast("encoder", new ProtobufEncoder());
                            //加入自定义的处理器
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            System.out.println("...客户端 is ok...");
            //启动客户端去连接服务器端
            //关于ChannelFuture 涉及到netty的异步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            //给关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
  • NettyClientHandler.java
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.Random;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    //当通道就绪就会触发该方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //随机地发送 Student 或者 Worker 对象
        int num = new Random().nextInt(3);
        MyDataInfo.MyMessage myMessage = null;
        if (0 == num) {
            myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType)
                    .setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟 卢俊义").build()).build();
        } else {
            //发送一个Worker 对象
            myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType)
                    .setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("老李").build()).build();
        }
        ctx.writeAndFlush(myMessage);
    }
    //当通道有读取事件时,会触发
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
    }
    //发生异常时,需要关闭通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  • Netty 的主要组件有 Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe 等。
  • ChannelHandler 充当了处理入站和出站数据的应用程序逻辑的容器。例如,实现 ChannelInboundHandler 接口(或 ChannelInboundHandlerAdapter),就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从 ChannelInboundHandler 冲刷数据。业务逻辑通常写在一个或者多个 ChannelInboundHandler 中。ChannelOutboundHandler 原理一样,只不过它是用来处理出站数据的。
  • ChannelPipeline 提供了 ChannelHandler 链的容器。以客户端应用程序为例,若事件的运动方向是从客户端到服务端,则称这些事件为出站的,即客户端发送给服务端的数据会通过 pipeline 中的一系列 ChannelOutboundHandler,并被这些 Handler 处理,反之则称为入站的。
  • 当 Netty 发送或者接受一个消息时,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如 java 对象);若是出站消息,则会被编码成字节。
  • Netty 提供一系列实用的编解码器,它们都实现了 ChannelInboundHadnler 或ChannelOutboundHandler 接口。在这些类中,channelRead 方法已经被重写了。以入站为例,对于每个从入站 Channel 读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的 decode() 方法进行解码,并将已经解码的字节转发给 ChannelPipeline 中的下一个 ChannelInboundHandler。
解码器-ByteToMessageDecoder
  • 由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp 有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理为止。
关于 ByteToMessageDecoder 实例分析

相关文章

网友评论

      本文标题:Netty学习笔记(三)

      本文链接:https://www.haomeiwen.com/subject/aouxgktx.html