NIO及new io是java1.4之后的引入的api,之所以是新的io API,那么它可以代替标准的IO。
首先先比较新IO和标准IO
新IO是面向Buffer和Channel的,而标准是面向流的。面向流意味着每次从流中读取数据不缓存在任何地方,所以不能对读取到的数据进行移动操作。而面向Buffer可以做到这一点。
最重要的一点是阻塞与非阻塞。首先了解下同步异步,阻塞非阻塞。同步:请求后必须等待结果,结果返回后才能做其他事。异步:请求后不必等待结果,结果产生后会通知调用者。阻塞非阻塞是描述线程的状态,阻塞是指请求结果后,线程挂起的状态,非阻塞指不能立即返回结果但不会阻塞当前线程。
NIO是非阻塞,IO是阻塞的。在标准IO时,每次调用write(),read()都会使该线程阻塞。直到操作结束。在这个过程中线程只有等待。为了解决这个问题只有用多线程来保障同时的读写操作,但是多线程的引入使资源大量被浪费(线程是珍贵资源),而且频繁的进行上下文切换使得CPU大量浪费。由此引入了NIO,NIO的操作是无阻塞的,这个意味着一个线程可以多个读取,从而可以做到一个线程管理多个Channel。
标准IO NIO标准IO虽然有缺点,但是它的读写效率是高于NIO的(在对一个流大量读写时),所以它们各有使用场景。NIO用于处理大量的链接,但是带宽都不大的场景。而标准IO用于少量连接带宽超大的场景。
NIO关键概念
Buffer
缓存区是与通道进行数据传输的工具。NIO提供了多个缓存区
byteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
其包含了各个数据类型。
Buffer的内部有position,limit,capacity。分别代表了,当前指针位置,最大指针位置,容量。在写操作时,limit=capacity。读操作是position不能超过limit。
生成一个buffer(以byte类型为例) ByteBuffer buf = ByteBuffer.allocate(48);//指定了大小
写入buffer,从Channel中读入读取,put写入数据。
channel.read(buffer) 或者 buffer.put("content")
读取buffer,Channel从buffer中读取数据,get()获取数据
channel.write(buffer)或者buffer.get()
读写转换,一个buffer既可以读也可以写,其原理实际是改变了position指针。
调用flip()
rewind()方法,使position归零,可以实现重读的操作。
clear(),compact()。前者是清除所有的数据,后者是清除刚读完的数据。实际上数据都没有被清除,而是操作了指针。
Channel
通道类似于流。但是通道既可以双向读写,也是异步操作。而且通道读写都必须经过buffer。
这些是Java NIO中最重要的通道的实现:
FileChannel:从文件中读写数据
DatagramChannel:能通过UDP读写网络中的数据
SocketChannel:能通过TCP读写网络中的数据
ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel
通道也可以对buffer进行多个的读写,叫做scatter(分散)和Gather(聚合)。前者是一个通道写入多个buffer,后者是多个buffer读取一个通道。读写都有前后顺序。
channel.read({buffer1,buffer2}) buffer1被读取完后buffer2才会进行。
通道间的传输。NIO提供了两个函数
transferFrom(position, count, FromChannel) , transferTo(position, count, toChannel),可以由名字知道,第一个函数从FromChannel运输数据到操作通道,数据量为position+count。函数二则是相反,从操作通道运输到toChannel。
文件传输
可以连接到一个文件,对文件进行操作,但是是阻塞式的。不能设为无阻塞。
RandomAccessFile aFile =newRandomAccessFile("data/nio-data.txt","rw");
FileChannel inChannel = aFile.getChannel();
先建立一个地址,再创建一个文件传输通道
它有read(),write(),close(),size(),force()方法 就不一一介绍了。
网络编程
通道
服务器channel -> serverSocketChannel
是一个用来监听请求的通道。
客户端channel -> SocketChannel
它是一个网络连接的套接字通道,它会连接到服务器的ServerSocketChannle并会创建一个SocketChannel与之连接。
Selector
通道管理者,可以监听多个通道,并且知晓读写每个通道的读写操作,所以个一个Selector可以管理多条通道。
channle通过注册,把自己交给通道管理者,在注册之前要设置自己的阻塞属性为非阻塞。因为Selector只支持非阻塞的操作。注册的时,还需要填写自己感兴趣的操作,方便Selector进行监听并分配到相应的方法。操作包括了
Connect
Accept
Read
Write
其中client端只有Connet操作,server端四个都可以使用。但是只有ServerSocketChannel才能使用Accept操作。后面会细说这个Accept属性。
通道触发了一个事件意思是该事件已经就绪。所以,某个channel成功连接到另一个服务器称为“连接就绪”。一个server socket channel准备好接收新进入的连接称为“接收就绪”。一个有数据可读的通道可以说是“读就绪”。等待写数据的通道可以说是“写就绪”。
这四种事件用SelectionKey的四个常量来表示:
SelectionKey.OP_CONNECT
SelectionKey.OP_ACCEPT
SelectionKey.OP_READ
SelectionKey.OP_WRITE
对channle注册完之后,会返回一个SelectorKey,这个对象像一个标识,每个client请求的封装类。里面包含了多个属性
interest集合 所支持的兴趣属性
ready集合 已经就绪的兴趣属性
Channel 所对应的Channel
Selector 所对应的selector
附加的对象(可选) 可以添加一些附加信息,来作为唯一标识
interes()可以查看所有interest集合
read集合可以调用Bool的方法来判断属于哪类兴趣
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
从SelectionKey访问Channel和Selector很简单。如下:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
选择通道 方法select(),select(time),selectNew()
这个方法会像Selector的keys集合返回有多少个已经就绪的通道。如果没有这个方法,会报错 java.nio.channels.IllegalBlockingModeException。select()也是一个阻塞方法,它会阻塞到你有已经就绪的通道且它们的SelectionKey是你感兴趣的。而selectNow()是非阻塞,返回一切就绪通道。
wakeUp(),有阻塞就有唤醒,这个函数会唤醒阻塞方法。
实例程序:
package NIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Socket_server {
private ServerSocketChannel server;
private Selector selector;
public Socket_server() throws IOException {
initSelector();
initServer();
run();
}
public void initSelector() throws IOException {
//创建一个selector
selector=Selector.open();
System.out.println("initSelector");
}
public void initServer() throws IOException {
System.out.println("initServer");
//开启serverChannel
server=ServerSocketChannel.open();
//SocketChannel的打开需要一个Address,指定地址和端口
InetSocketAddress address=new InetSocketAddress("localhost",9091);
//绑定一个监听端口
server.bind(address);
//设定是否用阻塞
server.configureBlocking(false);
//将管道注册到Selector中,并指定该管道具有accept操作,这意味着
server.register(selector, SelectionKey.OP_ACCEPT);
}
public void run() throws IOException {
while(true) {
selector.select();
//取出每一个准备好的i/o,server不在内,但server会不断的调用isAcceptable对应的方法
Set<SelectionKey> sets=selector.selectedKeys();
//将之变为一个iterator,方便遍历
Iterator<SelectionKey> iterator=sets.iterator();
SelectionKey key=null;
while(iterator.hasNext()) {
key=iterator.next();
Hadle(key);
iterator.remove();
}
selector.select();
}
}
public void Hadle(SelectionKey key) throws IOException {
//根据key的属性,决定其操作
if(key.isAcceptable()) {
//这个方法会被反复调用
HadleAp(key);
}else if (key.isReadable()) {
HadleRead(key);
}else if (key.isWritable()) {
HableWrite(key);
}else if (key.isConnectable()) {
}
}
//客户端传来一个可读的通道
public void HadleRead(SelectionKey key) throws IOException {
//通过key得到一个通道
SocketChannel channel=(SocketChannel) key.channel();
ByteBuffer buffer=ByteBuffer.allocate(1024);
int i=channel.read(buffer);
while(i!=0) {
i=channel.read(buffer);
}
//dosomething hadle the buffer date
channel.close();
}
//可写文件,将要返回的内容写入文件
public void HableWrite(SelectionKey key) throws IOException {
SocketChannel channel=(SocketChannel) key.channel();
System.out.println("1"+Thread.currentThread().getName());
ByteBuffer buffer=ByteBuffer.allocate(1024);
String content="需要返回的内容";
buffer.put(content.getBytes());
channel.write(buffer);
while(buffer.hasRemaining()) {
channel.write(buffer);
}
channel.close();
}
public void HadleAp(SelectionKey key) throws IOException {
System.out.println("dasd");
//通过key得到一个serverSocketChannel
//ServerSocketChannel channel=(ServerSocketChannel) key.channel();
//ServerSocketChannle可以通过accept方法,获得连接到服务器的连接
//SocketChannel client=channel.accept();
//设置为非阻塞
//client.configureBlocking(false);
//将client注册到selector,selector将会对其任务进行处理
//client.register(selector, SelectionKey.OP_WRITE);
//key.interestOps(SelectionKey.OP_ACCEPT);
}
public static void main(String[] args) throws IOException {
Socket_server server=new Socket_server();
}
}
Netty
netty是一个NIO框架爱,它可以简单快速的开发网络应用程度。
一般的网络框架都是基于Reactor的。Reactor模式分为单线程模式,多线程模式,主从模式。
单线程模式,同一个Reactor线程作为客户端 服务端。
单线程模式多线程模式,专门有一个Reactor线程负责处理accept线程,其他读写等操作交给Reactor线程池处理。一个Reactor线程可以同时处理多条链路,但个链路的相关操作只能一个Reactor负责,这样避免了并发问题。
多线程模型主从模型,使用一个独立的Reactor线程来处理连接操作,连接成功后又使用一个Reactor来处理accept操作。链路创建后,再用一条Reactor来处理相关操作
,主从模型Netty线程模型同时支持了三种模型,可以通过配置来切换。
大致的模型构建
package netty;
import java.nio.channels.NonWritableChannelException;
import java.nio.channels.SocketChannel;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
//服务器端代码
public class Model_Run {
private NioEventLoopGroup Accept_pool;
private NioEventLoopGroup Handle_pool;
public Model_Run() {
init();
}
public void init() {
//第一个创建的是接受client连接的线程组,1表示多线程模式,n(n>1)表示主从模式
Accept_pool=new NioEventLoopGroup(1);
//第二个创建处理具体业务的线程组,默认值为CPU*2,建议设置为空闲CPU*2
Handle_pool=new NioEventLoopGroup(4);
//用于配置服务的类
ServerBootstrap bootstrap=new ServerBootstrap();
//设置group
bootstrap.group(Accept_pool,Handle_pool)
//设置Channel的类,服务器端用的NioServerSocketChannel
.channel(NioServerSocketChannel.class)
//绑定具体的事件
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel sc) throws Exception {
//可以绑定多个或者一个
((Channel) sc).pipeline().addLast(new Server_Handle());
sc.pipeline().addLast(new ChannelOutboundHandlerAdapter())
.addLast(new ChannelInboundHandlerAdapter());
}
})
/*
* 每次client发来的数据要和服务器进行三次握手后才能进入准备状态,
* 前两此握手时,服务器会将所有客户加入一个A队列,通过最后一次握手,
* 服务器会将该客户放入B队列,AB队列size之和为backlog,如果a+b>backlog,就会
* 拒绝其他客户的请求,所以这个数应该相应大些
*/
.option(ChannelOption.SO_BACKLOG, 128)
//保持通信
.option(ChannelOption.SO_KEEPALIVE, true)
//接受的缓存区
.option(ChannelOption.SO_RCVBUF, 32*1024)
//发送的缓存区
.option(ChannelOption.SO_SNDBUF, 32*1024);
//绑定监控port,一个或者多个
try {
//并设置为异步
ChannelFuture future=bootstrap.bind(9091).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
//线程组优雅退出
Accept_pool.shutdownGracefully();
Handle_pool.shutdownGracefully();
}
}
}
逻辑代码一般是写入childHandler的,在childHandler中先会创建一个ChannelPipeline,它是ChannelHandle的执行列表,用于处理通道中接受发送和过滤。
对于每个Channel,都会创建一个ChannelPipeline赋予它,Channel有唯一个ChannelPipeline。
Pipeline是一个ChannelHandler的列表,它可以控制Handler的流程,和执行顺序还可以动态的添加 删除 替代pipeline里的ChannelHandler。
ChanelHandlerContext:每一个ChannelHandler被添加到Pipeline后都会创建一个ChanelHandlerContext中,这个上下文可以和其他的ChannelHandler进行通信,由此可以控制事件的执行顺序。
netty5将ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter合并到ChannelHandlerAdapter中
网友评论