传输迁移
- 未使用Netty 的阻塞网络编程
public class PlainOioServer {
public void serve(int port) throws IOException {
// 将服务器绑定到指定端口
final ServerSocket socket = new ServerSocket(port);
try {
for (;;) {
final Socket clientSocket = socket.accept(); // 接受连接
System.out.println("Accepted connection from " + clientSocket);
new Thread(new Runnable() { // 创建一个新的线程来处理该连接
@Override
public void run() {
OutputStream out;
try {
out = clientSocket.getOutputStream();
out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); // 将消息写给已连接的客户端
out.flush();
clientSocket.close(); // 关闭连接
}
catch (IOException e) {
e.printStackTrace();
}
finally {
try {
clientSocket.close();
}
catch (IOException ex) {
// ignore on close
}
}
}
}).start();
}
}
catch (IOException e) {
e.printStackTrace();
}
}
}
- 未使用Netty 的异步网络编程
public void serve(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket ssocket = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
ssocket.bind(address); // 将服务器绑定到选定的端口
Selector selector = Selector.open(); // 打开Selector来处理 Channel
serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 将ServerSocket注册到Selector以接受连接
final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
for (;;) {
try {
selector.select(); // 等待需要处理的新事件;阻塞将一直持续到下一个传入事件
} catch (IOException ex) {
ex.printStackTrace();
// handle exception
break;
}
Set<SelectionKey> readyKeys = selector.selectedKeys(); // 获取所有接收事件的Selection-Key实例
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) { // 检查事件是否是一个新的已经就绪可以被接受的连接
ServerSocketChannel server =
(ServerSocketChannel)key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
// 接受客户端,并将它注册到选择器
client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
System.out.println("Accepted connection from " + client);
}
if (key.isWritable()) { // 检查套接字是否已经准备好写数据
SocketChannel client =
(SocketChannel)key.channel();
ByteBuffer buffer =
(ByteBuffer)key.attachment();
while (buffer.hasRemaining()) {
if (client.write(buffer) == 0) { // 将数据写到已连接的客户端
break;
}
}
client.close();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
// ignore on close
}
}
}
}
}
- 使用Netty 的阻塞网络处理
public void server(int port) throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
EventLoopGroup group = new OioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // 创建Server-Bootstrap
b.group(group)
.channel(OioServerSocketChannel.class) // 使用OioEventLoopGroup以允许阻塞模式(旧的I/O)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() { // 指定Channel-Initializer,对于每个已接受的连接都调用它
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(
ChannelHandlerContext ctx)
throws Exception {
ctx.writeAndFlush(buf.duplicate()) // 将消息写到客户端,并添加ChannelFutureListener,以便消息一被写完就关闭连接
.addListener(
ChannelFutureListener.CLOSE);
}
});
}
});
ChannelFuture f = b.bind().sync(); // 绑定服务器以接受连接
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();// 释放所有的资源
}
}
- 使用Netty 的异步网络处理
public void server(int port) throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // 创建Server-Bootstrap
b.group(group).channel(NioServerSocketChannel.class) // 使用OioEventLoopGroup以允许阻塞模式(旧的I/O)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() { // 指定Channel-Initializer,对于每个已接受的连接都调用它
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(
ChannelHandlerContext ctx)
throws Exception {
ctx.writeAndFlush(buf.duplicate()) // 将消息写到客户端,并添加ChannelFutureListener,以便消息一被写完就关闭连接
.addListener(
ChannelFutureListener.CLOSE);
}
});
}
});
ChannelFuture f = b.bind().sync(); // 绑定服务器以接受连接
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();// 释放所有的资源
}
}
通过代码比较,netty基本不受影响(通用性强).
传输API
Channel 接口的层次结构
如图所示,每个Channel 都将会被分配一个ChannelPipeline 和ChannelConfig。ChannelConfig 包含了该Channel 的所有配置设置,并且支持热更新。由于特定的传输可能具有独特的设置,所以它可能会实现一个ChannelConfig 的子类型。Comparable保证每一个channel都是独一无二的.
ChannelPipeline 持有所有将应用于入站和出站数据以及事件的ChannelHandler 实例.
- 将数据从一种格式转换为另一种格式;
- 提供异常的通知;
- 提供Channel 变为活动的或者非活动的通知;
- 提供当Channel 注册到EventLoop 或者从EventLoop 注销时的通知;
- 提供有关用户自定义事件的通知。
channel是线程安全的,多个线程可以共用一个channel.
内置的传输
- NIO | io.netty.channel.socket.nio | 使用java.nio.channels 包作为基础——基于选择器的方式
- Epoll | io.netty.channel.epoll | 由JNI 驱动的epoll()和非阻塞IO。这个传输支持只有在Linux 上可用的多种特性,如SO_REUSEPORT,比NIO 传输更快,而且是完全非阻塞的
- OIO | io.netty.channel.socket.oio | 使用java.net 包作为基础——使用阻塞流
- Local | io.netty.channel.local | 可以在VM 内部通过管道进行通信的本地传输
- Embedded | io.netty.channel.embedded | Embedded 传输,允许使用ChannelHandler 而又不需要一个真正的基于网络的传输。这在测试你的ChannelHandler 实现时非常有用
NIO——非阻塞I/O
选择并处理状态的变化
Epoll—用于Linux 的本地非阻塞传输
流程和NIO一样,失去了NIO的通用性,但运行在linux上更快,只需要将NioEventLoopGroup替换成EpollEventLoopGroup,并且将NioServerSocketChannel.class 替换为EpollServerSocketChannel.class 即可.
OIO—旧的阻塞I/O
建立在java.net 包的阻塞实现之上,不是异步的.Netty利用了SO_TIMEOUT这个Socket标志,它指定了等待一个I/O操作完成的最大毫秒数。如果操作在指定的时间间隔内没有完成,则将会抛出一个SocketTimeout Exception。Netty将捕获这个异常并继续处理循环。在EventLoop下一次运行时,它将再次尝试。这实际上也是类似于Netty这样的异步框架能够支持OIO的唯一方式.
OIO 的处理逻辑
用于JVM 内部通信的Local 传输
用于在同一个JVM 中运行的客户端和服务器程序之间的异步通信.
Embedded 传输
可以将一组ChannelHandler 作为帮助器类嵌入到其他的ChannelHandler 内部。通过这种方式,你将可以扩展一个ChannelHandler 的功能,而又不需要修改其内部代码。
网友评论