- Netty 应用实例:群聊系统。
- 要求:①实现多人群聊;②服务器端:可以监测用户上线,离线,并实现消息转发功能;③客户端:通过 channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)。
- GroupChatServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class GroupChatServer {
private int port; //监听端口
public GroupChatServer(int port) {
this.port = port;
}
//编写run方法,处理客户端的请求
public void run() throws Exception {
//创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获取到pipeline
ChannelPipeline pipeline = ch.pipeline();
//向pipeline加入解码器
pipeline.addLast("decoder", new StringDecoder());
//向pipeline加入编码器
pipeline.addLast("encoder", new StringEncoder());
//加入自己的业务处理handler
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("netty 服务器启动...");
ChannelFuture channelFuture = b.bind(port).sync();
//监听关闭
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatServer(7000).run();
}
}
- GroupChatServerHandler.java
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
//public static List<Channel> channels = new ArrayList<Channel>();
//使用一个hashMap 管理
//public static Map<String, Channel> channels = new HashMap<String,Channel>();
//定义一个channel 组,管理所有的channel
//GlobalEventExecutor.INSTANCE 是全局的事件执行器(单例)
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//handlerAdded 表示连接一旦建立,第一个被执行
//将当前channel 加入到 channelGroup
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//将该客户加入聊天的信息推送给其它在线的客户端
//该方法会将 channelGroup 中所有的channel 都遍历一次,并发送消息,所以我们不需要自己遍历
channelGroup.writeAndFlush("[客户端]:" + channel.remoteAddress() + " 加入聊天:" + sdf.format(new java.util.Date()) + " \n");
channelGroup.add(channel);
}
//断开连接,将xx客户离开信息推送给当前在线的客户
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]:" + channel.remoteAddress() + " 离开了\n");
System.out.println("channelGroup size:" + channelGroup.size());
}
//表示channel 处于活动状态,提示 xxx 上线
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 上线了~");
}
//表示channel 处于不活动状态,提示 xxx离线了
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 离线了~");
}
//读取数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//获取到当前channel
Channel channel = ctx.channel();
//这时我们遍历channelGroup, 根据不同的情况,回送不同的消息
channelGroup.forEach(ch -> {
//若不是当前的channel,则转发消息
if (channel != ch) {
ch.writeAndFlush("[客户端]:" + channel.remoteAddress() + " 发送了消息:" + msg + "\n");
} else {//回显自己发送的消息给自己
ch.writeAndFlush("[自己]发送了消息:" + msg + "\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//关闭通道
ctx.close();
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class GroupChatClient {
//属性
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//得到pipeline
ChannelPipeline pipeline = ch.pipeline();
//加入相关handler
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
//加入自定义的handler
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
System.out.println("-------" + channel.localAddress() + "--------");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
//通过channel 发送到服务器端
channel.writeAndFlush(msg + "\r\n");
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatClient("127.0.0.1", 7000).run();
}
}
- GroupChatClientHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
- Netty 应用实例:心跳机制检测。
- 要求:当服务器超过 3 秒没有读时,就提示读空闲;当服务器超过 5 秒没有写操作时,就提示写空闲,实现当服务器超过 7 秒没有读或者写操作时,就提示读写空闲。
- MyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class MyServer {
public static void main(String[] args) throws Exception {
//创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); //增加一个日志处理器
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入一个netty 提供 IdleStateHandler
/*
1、IdleStateHandler 是netty 提供的处理空闲状态的处理器
2、long readerIdleTime:表示多长时间没有读,就会发送一个心跳检测包检测是否连接
3、long writerIdleTime:表示多长时间没有写,就会发送一个心跳检测包检测是否连接
4、long allIdleTime:表示多长时间没有读写,就会发送一个心跳检测包检测是否连接
5、文档说明:triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.
6、当 IdleStateEvent 触发后,就会传递给管道 的下一个handler去处理
通过调用(触发)下一个handler 的 userEventTriggered,在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)
*/
pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
//加入一个对空闲检测进一步处理的handler(自定义)
pipeline.addLast(new MyServerHandler());
}
});
//启动服务器
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
* @param ctx 上下文
* @param evt 事件
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
//将 evt 向下转型 IdleStateEvent
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "--超时时间--" + eventType);
System.out.println("服务器做相应处理...");
//若发生空闲,则关闭通道
//ctx.channel().close();
}
}
}
- Netty 应用实例:Netty 通过 WebSocket 编程实现服务器和客户端长连接。
- 要求:实现基于 WebSocket 的长连接的全双工地交互,通过改变 Http 协议多次请求的约束,实现了长连接,服务器可以发送消息给浏览器。客户端浏览器和服务器端会相互感知,比如服务器关闭了,浏览器会感知,同样浏览器关闭了,服务器会感知。
- hello.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
var socket;
//判断当前浏览器是否支持websocket
if (window.WebSocket) {
//go on
socket = new WebSocket("ws://localhost:7000/hello");
//相当于channelReado,ev收到服务器端回送的消息
socket.onmessage = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + ev.data;
};
//相当于连接开启(感知到连接开启)
socket.onopen = function (ev) {
var rt = document.getElementById("responseText");
rt.value = "连接开启了.."
};
//相当于连接关闭(感知到连接关闭)
socket.onclose = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "连接关闭了.."
}
} else {
alert("当前浏览器不支持websocket...")
}
//发送消息到服务器
function send(message) {
if (!window.socket) { //先判断socket是否创建好
return;
}
if (socket.readyState == WebSocket.OPEN) {
//通过socket 发送消息
socket.send(message)
} else {
alert("连接没有开启...");
}
}
</script>
<form onsubmit="return false">
<textarea name="message" style="height: 300px; width: 300px"></textarea>
<input type="button" value="发生消息" onclick="send(this.form.message.value)">
<textarea id="responseText" style="height: 300px; width: 300px"></textarea>
<input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class MyServer {
public static void main(String[] args) throws Exception {
//创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//因为基于http协议,所以要使用http的编码和解码器
pipeline.addLast(new HttpServerCodec());
//数据以块方式写,需要添加ChunkedWriteHandler处理器
pipeline.addLast(new ChunkedWriteHandler());
/*
1、http数据在传输过程中是分段,HttpObjectAggregator,就是可以将多个段聚合
2、这就是为什么当浏览器发送大量数据时,就会发出多次http请求
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/*
1、对应websocket,它的数据是以帧(frame) 形式传递
2、WebSocketFrame 有六个实现子类
3、浏览器请求 ws://localhost:7000/xxx:表示请求的uri
4、WebSocketServerProtocolHandler 核心功能是将http协议升级为ws协议(通过一个状态码 101来切换),保持长连接状态
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
//自定义的handler,处理业务逻辑
pipeline.addLast(new MyTextWebSocketFrameHandler());
}
});
//启动服务器
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- MyTextWebSocketFrameHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDateTime;
//这里 TextWebSocketFrame 类型,表示一个文本帧(frame)
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服务器收到消息:" + msg.text());
//回复消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间:" + LocalDateTime.now() + " " + msg.text()));
}
//当web客户端连接后,handlerAdded 方法被触发执行
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//id 表示唯一的值,LongText 是唯一的,ShortText 不是唯一
System.out.println("handlerAdded 被调用:" + ctx.channel().id().asLongText());
System.out.println("handlerAdded 被调用:" + ctx.channel().id().asShortText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved 被调用:" + ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生 " + cause.getMessage());
ctx.close(); //关闭连接
}
}
- 编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,所以在发送数据时就需要编码,接收数据时就需要解码。
- codec(编解码器)的组成部分有两个:
decoder
(解码器)和 encoder
(编码器)。encoder 负责把业务数据转换成字节码数据,decoder 负责把字节码数据转换成业务数据。
- Netty 提供的编码器:
StringEncoder
:对字符串数据进行编码;ObjectEncoder
:对Java对象进行编码。
- Netty 提供的解码器:
StringDecoder
:对字符串数据进行解码;ObjectDecoder
:对 Java 对象进行解码。
- Netty 本身自带的
ObjectDecoder
和ObjectEncoder
可以用来实现 POJO 对象或各种业务对象的编码和解码,底层使用的仍是Java序列化技术,而Java序列化技术本身效率就不高,存在如下问题:①无法跨语言;②序列化后的体积太大,是二进制编码的5倍多;③序列化性能太低。
-
Protobuf(Google Protocol Buffers)
是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化(序列化)。它很适合做数据存储或 RPC 数据交换格式(远程过程调用 remote procedure call)。目前很多公司使用的数据交换技术:http + json tcp + protobuf。
- Protobuf 是以
message
的方式来管理数据的,并且支持跨平台、跨语言。使用 protobuf 编译器能自动生成代码,Protobuf 是将类的定义使用.proto
文件进行描述。
- Protobuf 入门案例:要求:客户端可以随机发送
StudentPoJo / WorkerPoJo
对象到服务器,服务端能接StudentPoJo / WorkerPoJo
对象(需要判断是哪种类型),并显示信息(通过 Protobuf 解码)。
- 导入protobuf依赖:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.13.0</version>
</dependency>
syntax = "proto3";
option optimize_for = SPEED; // 加快解析
option java_package = "com.zzw.netty.codec2"; //指定生成到哪个包下
option java_outer_classname = "MyDataInfo"; // 外部类名, 文件名
//protobuf 可以使用 message 管理其他的message
message MyMessage {
//定义一个枚举类型
enum DataType {
StudentType = 0; //在proto3中要求enum的编号从0开始
WorkerType = 1;
}
//用data_type 来标识传的是哪一个枚举类型
DataType data_type = 1;
//表示每次枚举类型最多只能出现其中的一个,节省空间
oneof dataBody {
Student student = 2;
Worker worker = 3;
}
}
message Student {
int32 id = 1;//Student类的属性
string name = 2;
}
message Worker {
string name = 1;
int32 age = 2;
}
- 下载好protoc编译器,配置好环境变量,然后编译
Student.proto
文件并在当前路径下存放生成的文件MyDataInfo.java
:protoc --java_out=. Student.proto
。
- NettyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
public class NettyServer {
public static void main(String[] args) throws Exception {
//创建BossGroup 和 WorkerGroup
//说明
//1、创建两个线程组 bossGroup 和 workerGroup
//2、bossGroup 只是处理连接请求,真正的客户端业务处理是会交给 workerGroup 完成
//3、两个都是无限循环
//4、bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数:默认值为 cpu核数 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用 NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) //设置线程队列等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
//.handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象(匿名对象)
//给 pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline中加入 ProtobufDecoder
//指定对哪种对象进行解码
pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler()); //给 workerGroup 的 EventLoop 对应的管道设置处理器
}
});
System.out.println(".....服务器 is ready...");
//绑定一个端口并且同步处理,生成了一个 ChannelFuture 对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//给 cf 注册监听器,监控我们关心的事件
//绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6668 成功");
} else {
System.out.println("监听端口 6668 失败");
}
}
});
//对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* 说明:自定义一个Handler 需要继承 netty 规定好的某个HandlerAdapter(规范)
*/
//public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
//读取数据实际(这里我们可以读取客户端发送的消息)
/**
* 1、ChannelHandlerContext ctx:上下文对象,含有管道 pipeline,通道channel,地址
* 2、Object msg:客户端发送的数据,默认是 Object
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MyDataInfo.MyMessage msg) throws Exception {
//根据dataType来显示不同的信息
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
if (dataType == MyDataInfo.MyMessage.DataType.StudentType) {
MyDataInfo.Student student = msg.getStudent();
System.out.println("学生id:" + student.getId() + ",学生姓名:" + student.getName());
} else if (dataType == MyDataInfo.MyMessage.DataType.WorkerType) {
MyDataInfo.Worker worker = msg.getWorker();
System.out.println("工人的年龄:" + worker.getAge() + ",工人的姓名:" + worker.getName());
} else {
System.out.println("传输的类型不正确!");
}
}
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush:write + flush
//将数据写入到缓存,并刷新
//一般需要对发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
//发生异常时,需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
public class NettyClient {
public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline中加入编码器 ProtobufEncoder()
pipeline.addLast("encoder", new ProtobufEncoder());
//加入自定义的处理器
pipeline.addLast(new NettyClientHandler());
}
});
System.out.println("...客户端 is ok...");
//启动客户端去连接服务器端
//关于ChannelFuture 涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.Random;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//随机地发送 Student 或者 Worker 对象
int num = new Random().nextInt(3);
MyDataInfo.MyMessage myMessage = null;
if (0 == num) {
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType)
.setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟 卢俊义").build()).build();
} else {
//发送一个Worker 对象
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType)
.setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("老李").build()).build();
}
ctx.writeAndFlush(myMessage);
}
//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
}
//发生异常时,需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- Netty 的主要组件有 Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe 等。
- ChannelHandler 充当了处理入站和出站数据的应用程序逻辑的容器。例如,实现 ChannelInboundHandler 接口(或 ChannelInboundHandlerAdapter),就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从 ChannelInboundHandler 冲刷数据。业务逻辑通常写在一个或者多个 ChannelInboundHandler 中。ChannelOutboundHandler 原理一样,只不过它是用来处理出站数据的。
- ChannelPipeline 提供了 ChannelHandler 链的容器。以客户端应用程序为例,若事件的运动方向是从客户端到服务端,则称这些事件为出站的,即客户端发送给服务端的数据会通过 pipeline 中的一系列 ChannelOutboundHandler,并被这些 Handler 处理,反之则称为入站的。
- 当 Netty 发送或者接受一个消息时,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如 java 对象);若是出站消息,则会被编码成字节。
- Netty 提供一系列实用的编解码器,它们都实现了 ChannelInboundHadnler 或ChannelOutboundHandler 接口。在这些类中,channelRead 方法已经被重写了。以入站为例,对于每个从入站 Channel 读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的 decode() 方法进行解码,并将已经解码的字节转发给 ChannelPipeline 中的下一个 ChannelInboundHandler。
解码器-ByteToMessageDecoder
- 由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp 有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理为止。
关于 ByteToMessageDecoder 实例分析
网友评论