一、定义
官方定义
Netty 是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服务器和客户端。
个人理解
高性能Java网络编程框架(High performance Java networking)、异步IO、基于事件驱动、优秀的开源通信框架
二、特性
- 多路复用的IO模型(bossGroup,workGroup)
- 未遵循Servlet标准
- 没有路由映射功能
- 支持长链接(可以像websocket一样服务器可以向浏览器推送)

三、版本
总结下来就是【3废弃、4主流、5鸡肋】,目前都建议使用4.x版本,因为
- 使用Executor代替ThreadFactory
- 更简单更精确的缓冲区泄漏追踪
- 全局唯一的Channel ID
- 更灵活的线程模型
了解具体的可以参考:
- 《netty版本升级血泪史之线程篇》https://www.cnblogs.com/zoucaitou/p/4280618.html
- 《netty初步,与各个版本的比较》https://blog.csdn.net/u010154380/article/details/46988269
四、案列
使用Netty做一个聊天室的效果,学会Netty的基本写法套路(客户端、服务队、核心逻辑ServerHandler)。
代码如下,趣味性和技术性共赏。
客户端
package io.netty.example.chat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* 客户端 连接不上服务端会自动关闭 Connection refused / System.in 手动阻塞不会
*
* @author wuhe on 4/2/21
*/
public class ChatClient {
public static void main(String[] args) {
// 客户端只需要一个事件循环组
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// 服务端的是ServerBootstrap、NioServerSocketChannel
Bootstrap bootstrap = new Bootstrap();
// 客户端不使用childHandler
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 管道
ChannelPipeline pipeline = ch.pipeline();
//添加的都是handler
// 解码器decoder
pipeline.addLast("name", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
// 编码器encoder
pipeline.addLast(new LengthFieldPrepender(4));
// 字符串的编解码器
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// 自定义处理器
pipeline.addLast("testClientHandler", new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
}
// 发生了异常之后,一般是把连接关闭掉
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
});
}
});
// 客户端是connect 服务端是bind
Channel clintChannel = bootstrap.connect("localhost", 8888).sync().channel();
for (; ; ) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
clintChannel.writeAndFlush(bufferedReader.readLine() + "\r\n");
}
} catch (InterruptedException | IOException e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
服务端
package io.netty.example.chat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
* Server => Initializer => ServerHandler => Client
*
* @see io.netty.example.http.MyHttpServer
*/
public class ChatServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 管道
ChannelPipeline pipeline = ch.pipeline();
//添加的都是handler
// 解码器decoder
pipeline.addLast("name", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
// 编码器encoder
pipeline.addLast(new LengthFieldPrepender(4));
// 字符串的编解码器
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// 自定义处理器
pipeline.addLast("testServerHandler", new ServerHandler());
}
});
ChannelFuture sync = bootstrap.bind(8888).sync();
sync.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
核心的ServerHandler
package io.netty.example.chat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class ServerHandler extends SimpleChannelInboundHandler<String> {
// channel组
// new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
// ||
// \/
// this("group-0x" + Integer.toHexString(nextId.incrementAndGet()), executor, stayClosed);
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel channel = ctx.channel();
System.out.println("【" + channel.remoteAddress() + "】:" + msg);
channelGroup.forEach(ch -> {
if (ch != channel) {
ch.writeAndFlush("【" + channel.remoteAddress() + "】:" + msg);
} else {
channel.writeAndFlush("【我】:" + msg);
}
});
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("<<<<<<======【系统消息】" + channel.remoteAddress() + "准备出发======>>>>>>>\n");
System.out.println("<<<<<<======【系统消息】" + channel.remoteAddress() + "准备出发======>>>>>>>\n");
// 不需要手动remove TODO 寻找源码
// channelGroup.remove(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("<<<<<<======【系统消息】" + channel.remoteAddress() + "已经离开======>>>>>>>\n");
System.out.println("<<<<<<======【系统消息】" + channel.remoteAddress() + "已经离开======>>>>>>>\n");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("<<<<<<======【系统消息】" + channel.remoteAddress() + "进入了快乐星球======>>>>>>>\n");
channelGroup.add(channel);
System.out.println("<<<<<<======【系统消息】" + channel.remoteAddress() + "进入了快乐星球======>>>>>>>\n");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("<<<<<<======【系统消息】" + channel.remoteAddress() + "退出了快乐星球======>>>>>>>\n");
System.out.println("<<<<<<======【系统消息】" + channel.remoteAddress() + "退出了快乐星球======>>>>>>>\n");
}
// 发生了异常之后,一般是把连接关闭掉
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
效果截图



五、扩展了解
学习Netty能学习到什么
BIO、NIO、线程池、阻塞队列、网络编程、TCP/UDP、事件调度机制、线程模型、设计模式
还有哪些HTTP客户端
- JDK原生的URLConnection
- Apache的 Http Client
- Netty的异步 Http Client
- Spring的 RestTemplate
- Spring Cloud Netflix中首选的Feign(集成上述:底层默认URLConnection)
谁在用Netty
- Dubbo
- RocketMQ
- Elasticsearch
- gRPC
学而不厌
网友评论