一、netty入门
一,Netty是什么
2,Netty是JAR包,一般使用ALL-IN-ONE的JAR包就可以开发了。
3,Netty不需要运行在Tomcat这类服务器中,他是单独构建一个服务器。
4,Netty可以构建HTTP服务器,socket服务器,websocket服务器等。
5,Netty其实是对JDK1.4以后提供的NIO的封装,NIO就是new i/o,JDK1.7推出了NIO2。
为什么要学netty?
这里借用知乎上一个回答:
作为一个学Java的,如果没有研究过Netty,那么你对Java语言的使用和理解仅仅停留在表面水平,会点SSH,写几个MVC,访问数据库和缓存,这些只是初等Java程序员干的事。如果你要进阶,想了解Java服务器的深层高阶知识,Netty绝对是一个必须要过的门槛。有了Netty,你可以实现自己的HTTP服务器,FTP服务器,UDP服务器,RPC服务器,WebSocket服务器,Redis的Proxy服务器,MySQL的Proxy服务器等等。如果你想知道Nginx是怎么写出来的,如果你想知道Tomcat和Jetty是如何实现的,如果你也想实现一个简单的Redis服务器,那都应该好好理解一下Netty,它们高性能的原理都是类似的。
我们回顾一下传统的HTTP服务器的原理
1.创建一个ServerSocket,监听并绑定一个端口
2.一系列客户端来请求这个端口
3.服务器使用Accept,获得一个来自客户端的Socket连接对象
4.启动一个新线程处理连接
1.读Socket,得到字节流
2.解码协议,得到Http请求对象
3.处理Http请求,得到一个结果,封装成一个HttpResponse对象
4.编码协议,将结果序列化字节流
5.写Socket,将字节流发给客户端
5.继续循环步骤3
HTTP服务器之所以称为HTTP服务器,是因为编码解码协议是HTTP协议,如果协议是Redis协议,那它就成了Redis服务器,如果协议是WebSocket,那它就成了WebSocket服务器,等等。使用Netty你就可以定制编解码协议,实现自己的特定协议的服务器。
二,Netty服务架构图
以下官方图展示了Netty基本提供的服务:

主要提供的就是HTTP服务器,socket服务器,websocket服务器。
三,Netty原理架构图
从下面的原创图,可以基本看出Netty构建服务器的原理:

1,组件名词解释
(1),Bootstrap / ServerBootstrap(建立连接):
Netty引导组件,简化NIO的开发步骤,是一个Netty程序的开始,作用是配置和串联各个组件。
(2)EventLoopGroup(事件循环组):
是EventLoop组合,可以包含多个EventLoop。创建一个EventLoopGroup的时候,内部包含的方法就会创建一个子对象EventLoop。
(3)EventLoop(事件循环):
循环服务Channel,可以包含多个Channel。
(4)Channel(通道):
代表一个Scoket连接,或者其他IO操作组件。 Channel是通讯的载体
(5)ChannelInitializer(初始化连接):
主要提供了一个传输通道ChannelPipeline。
(6)ChannelPipeline(传输通道):
主要是管理各种ChannelHandler业务控制器,提供一个链式管理模式。
(7)ChannelHandler(业务控制器):
主要业务写入的地方,由开发人员写入,Netty也提供了很多写好的控制器和适配器,可以直接引用。
(8)ChannelInboundHandler(通道传入控制器):
继承至ChannelHandler,在传输通道中对传入事件进行控制。
(9)ChannelOutboundHandler(通道传出控制器):
继承至ChannelHandler,在传输通道中对传出事件进行控制。
(10)Decoder(解码):
网络传输都是byte传输,所以Netty首先接收到的是byte,需要进行解码,编程JAVA对象。
Netty提供了很多解码器,包括服务架构图(最上)显示的Google Protobuf编码,这是Google提供的跨平台的小体积编码方式,在Netty中可以直接解码。
(11)Encoder(编码):
和解码类似,在传出服务器的时候,需要编码成byte传输给客户端。
(12)Future / ChannelFuture(消息返回,图上没有):
Netty提供的返回结果,类似回调函数,告知你执行结果是什么。
2,构建服务器基本方法
从原理架构图可以看出,构建一个Netty服务器基本需要3个步骤:
1),ServerBootstrap(建立连接):构建一个Scoket或者其他连接,通过事件循环建立通道。
2),ChannelInitializer(初始化连接):构建传输通道,用于管理控制器。
3),ChannelHandler(业务控制器):构建业务控制器,最基本的是解码,编码,信息传入/传出。
同样,构建一个客户端,同样也是这些步骤,不过建立连接是使用Bootstrap,并且使用一个事件循环,而服务器一般使用两个事件循环。
四,应用场景
互联网行业
游戏行业
大数据领域
企业软件
通信行业
五,接下来以实现一个简单的及时通讯作为示例来帮助理解
pom依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
Server 服务端
SimpleChatServerHandler.java
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
//让我们从 handler (处理器)的实现开始,handler 是由 Netty 生成用来处理 I/O 事件的。
//SimpleChatServerHandler 继承自 SimpleChannelInboundHandler,这个类实现了ChannelInboundHandler接口,
// ChannelInboundHandler 提供了许多事件处理的接口方法,然后你可以覆盖这些方法。现在仅仅只需要
// 继承 SimpleChannelInboundHandler 类而不是你自己去实现接口方法。
public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1)
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
//覆盖了 handlerAdded() 事件处理方法。每当从服务端收到新的客户端连接时,客户端的 Channel 存入ChannelGroup列表中,
// 并通知列表中的其他客户端 Channel
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
//writeAndFlush()方法分为两步, 先 write 再 flush
channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
}
channels.add(ctx.channel());
}
//覆盖了 handlerRemoved() 事件处理方法。每当从服务端收到客户端断开时,客户端的 Channel 移除 ChannelGroup 列表中,
// 并通知列表中的其他客户端 Channel
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开\n");
}
channels.remove(ctx.channel());
}
//覆盖了 channelRead0() 事件处理方法。每当从服务端读到客户端写入信息时,将信息转发给其他客户端的 Channel。
// 其中如果你使用的是 Netty 5.x 版本时,需要把 channelRead0() 重命名为messageReceived()
@Override
protected void messageReceived(ChannelHandlerContext ctx, String s) throws Exception { // (4)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
if (channel != incoming){
channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n");
} else {
channel.writeAndFlush("[txbb]" + s + "\n");
}
}
}
//覆盖了 channelActive() 事件处理方法。服务端监听到客户端活动
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在线");
}
//覆盖了 channelInactive() 事件处理方法。服务端监听到客户端不活动
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉线");
}
//exceptionCaught() 事件处理方法是当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在
// 处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来并且把关联的 channel 给关闭掉。然而
// 这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (7)
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"异常");
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
}
}
SimpleChatServerInitializer.java
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
//SimpleChatServerInitializer 用来增加多个的处理类到 ChannelPipeline 上,包括编码、解码、SimpleChatServerHandler 等
public class SimpleChatServerInitializer extends
ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());//解码
pipeline.addLast("encoder", new StringEncoder());//编码
pipeline.addLast("handler", new SimpleChatServerHandler());
System.out.println("SimpleChatClient:"+ch.remoteAddress() +"连接上");
}
}
SimpleChatServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class SimpleChatServer {
private int port;
public SimpleChatServer(int port) {
this.port = port;
}
public void run() throws Exception {
// 配置nio线程组 创建两个线程组
// NioEventLoopGroup是用于处理I / O操作的多线程事件循环器,Netty提供了很多不同的EventLoopGroup的
// 实现处理不同的传输。在这个例子中我们实现了一个服务端的应用,因此会有2第一个经常被称为'boss',
// 使用接收进来的连接。第二个经常被称为'worker',使用处理已经被接收的连接,一旦'boss'接收到连接,
// 如何知道多少个线程已经被使用,如何映射到已经创建的Channel上都需要依赖于EventLoopGroup的实现,
// 并且可以通过构造函数来配置他们的关系
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) 主线程,连接线成 bossGroup主要用来轮询获取客户端的连接,其实就是一个死循环 是负责处理TCP/IP连接的
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理线程组 workergroup中取出一个管道channel来建立连接 是负责处理Channel(通道)的I/O事件
try {
//ServerBootstrap是一个启动NIO服务的辅助启动类。您可以在这个服务中直接使用Channel,但是这会是一个复杂的处理过程,在很多情况下你并不需要这样做。
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup) // server端引导类,管理两个线程组
//此处我们指定使用NioServerSocketChannel类来说明一个新的Channel如何接收进来的连接。
.channel(NioServerSocketChannel.class) // (3)
//此处的事件处理类经常会被使用处理过一个最近的已经接收到的Channel。SimpleChatServerInitializer继承自ChannelInitializer是一个特殊的处理类,
// 他的目的是帮助用户配置一个新的Channel。也许你想通过增加一些处理类类SimpleChatServerHandler来配置一个新的Channel或其对应的ChannelPipeline来
// 实现你的网络程序。当你的程序变的复杂时,可能你会增加更多的处理类到pipline上,然后提取这些匿名类到最顶层的类上。
// 绑定客户端连接时候触发操作
.childHandler(new SimpleChatServerInitializer()) //(4)
//option()是提供给NioServerSocketChannel以接收进来的连接。childOption()是提供给由父管道ServerChannel接收到的连接,
// 在这个例子中也是NioServerSocketChannel。
//您可以设置此处指定的Channel实现的配置参数。我们正在编写一个TCP / IP的服务端,因此我们被允许设置socket的参数选项tcpNoDelay和keepAlive。
// 请参考ChannelOption和详细的ChannelConfig实现的接口文档最初可以对ChannelOption的有一个大概的认识。
// SO_BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存
// 放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
//初始化服务端可连接队列,指定了队列的大小128
.option(ChannelOption.SO_BACKLOG, 128) // (5)
//保持长连接
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
System.out.println("SimpleChatServer 启动了");
//剩下的就是绑定端口然后启动服务。这里我们在机器上绑定了机器所有网卡上的8080端口。当然现在你可以多次调用bind()方法(基于不同绑定地址)
// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(port).sync(); // (7)
// 等待服务器 socket 关闭 。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
// 最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定,然后服务器等待通道关闭,因为使用sync(),所以关闭操作也会被阻塞。
f.channel().closeFuture().sync();
} finally {
// 退出,释放线程池资源
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("SimpleChatServer 关闭了");
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new SimpleChatServer(port).run();
}
}
Client 客户端
SimpleChatClientInitializer.java
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());//解码
pipeline.addLast("encoder", new StringEncoder());//编码
pipeline.addLast("handler", new SimpleChatClientHandler());
}
}
SimpleChatClientHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
//客户端的处理类比较简单,只需要将读到的信息打印出来即可
public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s);
}
}
Client 客户端1
SimpleChatClient.java
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.BufferedReader;
import java.io.InputStreamReader;
//客户端1
public class SimpleChatClient {
public static void main(String[] args) throws Exception{
new SimpleChatClient("localhost", 8080).run();
}
private final String host;
private final int port;
public SimpleChatClient(String host, int port){
this.host = host;
this.port = port;
}
public void run() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();// 处理线程组 workergroup中取出一个管道channel来建立连接
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleChatClientInitializer());
// 绑定 ip 端口 创建连接 调用sync()方法会阻塞直到服务器完成绑定 然后服务器再获取通道channel
Channel channel = bootstrap.connect(host, port).sync().channel();
//定义向服务器发送的内容 system.in 控制台输入 in.readLine() 获取值 每次读一行。换句话说,用户输入一行内容,然后回车,这些内容一次性读取进来。
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
while(true){
//writeAndFlush()方法分为两步, 先 write 再 flush
channel.writeAndFlush(in.readLine() + "\r\n");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 退出,释放线程池资源
group.shutdownGracefully();
}
}
}
Client 客户端2
SimpleChatServerHandler.java
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.BufferedReader;
import java.io.InputStreamReader;
//客户端2
public class SimpleChatClient {
public static void main(String[] args) throws Exception{
new SimpleChatClient("localhost", 8080).run();
}
private final String host;
private final int port;
public SimpleChatClient(String host, int port){
this.host = host;
this.port = port;
}
public void run() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();// 处理线程组 workergroup中取出一个管道channel来建立连接
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleChatClientInitializer());
// 绑定 ip 端口 创建连接 调用sync()方法会阻塞直到服务器完成绑定 然后服务器再获取通道channel
Channel channel = bootstrap.connect(host, port).sync().channel();
//定义向服务器发送的内容 system.in 控制台输入 in.readLine() 获取值 每次读一行。换句话说,用户输入一行内容,然后回车,这些内容一次性读取进来。
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
while(true){
//writeAndFlush()方法分为两步, 先 write 再 flush
channel.writeAndFlush(in.readLine() + "\r\n");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 退出,释放线程池资源
group.shutdownGracefully();
}
}
}
网友评论