为什么要学 Netty
Apple、Twitter、
Facebook、Google、Square和Instagram,还有流行的开源项目,如Infinispan、HornetQ、Vert.x、Apache Cassandra和Elasticsearch ,它们所有的核心代码都利用了Netty强大的网络抽象
这么可能还不明白这些框架用什么跟我们有什么关系呢?我们现在用tomcat不是也是挺好的吗?
tomcat 确实是一款优秀的 web 服务器,用的也是非常多,但是 tomcat 只能处理 基于 HTTP 的请求,如果我们想私有化我们的协议并且不是基于 HTTP 协议那么 tomcat 也无能为力了, 或者说我们开发的设备是物联网设备 定义了很多很少用的协议没有现成的服务端sdk的时候我们就要基于原生socket编程,但是原生socket编程,原生socket 写简单的例子还可以,如果要是开发高并发的服务器应用难度系数不是一个demo能比的.
Netty是建立在NIO基础之上,Netty在NIO之上又提供了更高层次的抽象。大家可以试试自己写一个基于NIO http server就会体会到我们为什么要学 netty
IO模型 BIO、NIO、AIO 的比较
消息时的系统通信,通常基于网络协议实现。常见的协议包括TCP/IP,UDP/IP。
TCP/IP等协议用于数据传输,但要完成通信,还需要对数据进行处理。例如读取和写入数据。
I/O可以分为两种:同步IO和异步IO,同步I/O最常见的是 BIO(Blocking IO)、NIO(Non-Blocking IO)
BIO:是当发起I/O的读或写操作时,均为阻塞方式,直到应用程序读到了流或者将流写入数据。
image.png
NIO:基于事件驱动思想,常采用reactor(反应器)模式。当发起 IO请求时,应用程序是非阻塞的。当SOCKET有流可读或写的时候,由操作系统通知应用程序,应用程序再将流读取到缓冲区或者写入系统。
image.png
AIO:同样基于事件驱动的思想,通常采用Proactor(前摄器模式)实现。在进行I/O操作时,直接调用API的read或write,这两种方法均为异步。对于读操作,操作系统将数据读到缓冲区,并通知应用程序,对于写操作,操作系统将write方法传递的流写入并主动通知应用程序。它节省了NIO中遍历事件通知队列的代价。
image.pngtomcat 的AIO模式
这里注意比较NIO和AIO的不同,AIO是操作系统完成IO并通知应用程序,NIO是操作系统通知应用程序,由应用程序完成。
Hello Netty
处理模型概览
在我们 hello netty demo 中 handler 是我们最小的处理单元,我们需要学习就是 如何编写一个 Handler 和如何组装出我们的 pipeline
image.png image.png代码实战
CustomHandler1
完成的功能打印客户端IP
@Slf4j
public class CustomHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext context, HttpObject httpObject) throws Exception {
if (!(httpObject instanceof HttpRequest)) {
return;
}
log.info("channelRead0 1");
Channel channel = context.channel();
SocketAddress socketAddress = channel.remoteAddress();
log.info("socketAddress :{}", socketAddress);
// 通知执行下一个InboundHandler
context.fireChannelRead(httpObject);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("channelRegistered");
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.info("channelUnregistered");
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("channelActive");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("channelInactive");
super.channelInactive(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("channelReadComplete1");
super.channelReadComplete(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log.info("userEventTriggered");
super.userEventTriggered(ctx, evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
log.info("channelWritabilityChanged1");
super.channelWritabilityChanged(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("exceptionCaught");
// ctx.close();
super.exceptionCaught(ctx, cause);
}
}
CustomHandler2
向请求者响应 :hello netty
@Slf4j
public class CustomHandler2 extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext context, HttpObject httpObject) throws Exception {
log.info("channelRead0 2");
if (!(httpObject instanceof HttpRequest)) {
return;
}
Channel channel = context.channel();
SocketAddress socketAddress = channel.remoteAddress();
// log.info("socketAddress :{}", socketAddress);
System.out.println(socketAddress);
ByteBuf buf = Unpooled.copiedBuffer("hello netty ", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf);
response.headers().add(HttpHeaderNames.CONTENT_TYPE, "text/plant");
response.headers().add(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes());
response.headers().add(HttpHeaderNames.SERVER, "hello netty ");
channel.writeAndFlush(response);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("channelRegistered");
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.info("channelUnregistered");
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("channelActive");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("channelInactive");
super.channelInactive(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("channelReadComplete2");
super.channelReadComplete(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log.info("userEventTriggered");
super.userEventTriggered(ctx, evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
log.info("channelWritabilityChanged2");
super.channelWritabilityChanged(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("exceptionCaught");
ctx.close();
super.exceptionCaught(ctx, cause);
}
}
装配我们的 Handler
@Slf4j
public class HelloServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
log.info("initChannel");
// 拿到 pipeline
ChannelPipeline pipeline = socketChannel.pipeline();
// 将我们的handel 添加到 调用链中
pipeline.addLast(new HttpServerCodec(), new CustomHandler(), new CustomHandler2());
}
}
创建服务入口接收用户请求
@Slf4j
public class NettyExample {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
ChannelFuture future = bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new HelloServerInitializer())
.bind(9999)
.sync();
log.info("启动完成");
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1000);
log.info("关闭处理的线程组");
} catch (InterruptedException e) {
e.printStackTrace();
}
boss.shutdownGracefully();
worker.shutdownGracefully();
}).start();
// 此方法是阻塞的
future.channel().closeFuture().sync();
log.info("清理资源");
log.info("服务关闭");
}
}
调用我们的 netty 服务
.... 一大堆启动日志
17:28:18.770 [main] INFO cn.wyj.learn.netty.NettyExample - 启动完成
17:28:59.779 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.HelloServerInitializer - initChannel
17:28:59.818 [nioEventLoopGroup-3-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
17:28:59.818 [nioEventLoopGroup-3-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
17:28:59.820 [nioEventLoopGroup-3-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@68787c06
17:28:59.832 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.CustomHandler - channelRegistered
17:28:59.833 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.CustomHandler2 - channelRegistered
17:28:59.833 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.CustomHandler - channelActive
17:28:59.833 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.CustomHandler2 - channelActive
17:28:59.838 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
17:28:59.838 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
17:28:59.839 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
17:28:59.839 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
17:28:59.875 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.CustomHandler - channelRead0 1
17:28:59.875 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.CustomHandler - socketAddress :/0:0:0:0:0:0:0:1:23468
17:28:59.876 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.CustomHandler2 - channelRead0 2
/0:0:0:0:0:0:0:1:23468
17:28:59.885 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.CustomHandler - channelReadComplete1
17:28:59.885 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.CustomHandler2 - channelReadComplete2
17:28:59.886 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.CustomHandler - channelReadComplete1
17:28:59.886 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.CustomHandler2 - channelReadComplete2
17:28:59.887 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.CustomHandler - channelInactive
17:28:59.887 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.CustomHandler2 - channelInactive
17:28:59.888 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.CustomHandler - channelUnregistered
17:28:59.888 [nioEventLoopGroup-3-1] INFO cn.wyj.learn.netty.CustomHandler2 - channelUnregistered
17:44:58.773 [Thread-0] INFO cn.wyj.learn.netty.NettyExample - 关闭处理的线程组
17:44:58.774 [main] INFO cn.wyj.learn.netty.NettyExample - 清理资源
17:44:58.774 [main] INFO cn.wyj.learn.netty.NettyExample - 服务关闭
17:44:58.780 [nioEventLoopGroup-3-1] DEBUG io.netty.buffer.PoolThreadCache - Freed 2 thread-local buffer(s) from thread: nioEventLoopGroup-3-1
多见不怪的 Pipeline
那些地方用到过 pipelin
-
Linux
cat nginx.log | grep "500" | rev | > output.txt
-
层级辉煌过的 struts2
image.png
- spring mvc intercepter
- filter
网友评论