美文网首页
java特性之NIO及neety

java特性之NIO及neety

作者: 机器不能学习 | 来源:发表于2018-10-15 16:02 被阅读0次

    NIO及new io是java1.4之后的引入的api,之所以是新的io API,那么它可以代替标准的IO。

    首先先比较新IO和标准IO

    新IO是面向Buffer和Channel的,而标准是面向流的。面向流意味着每次从流中读取数据不缓存在任何地方,所以不能对读取到的数据进行移动操作。而面向Buffer可以做到这一点。

    最重要的一点是阻塞与非阻塞。首先了解下同步异步,阻塞非阻塞。同步:请求后必须等待结果,结果返回后才能做其他事。异步:请求后不必等待结果,结果产生后会通知调用者。阻塞非阻塞是描述线程的状态,阻塞是指请求结果后,线程挂起的状态,非阻塞指不能立即返回结果但不会阻塞当前线程。

    NIO是非阻塞,IO是阻塞的。在标准IO时,每次调用write(),read()都会使该线程阻塞。直到操作结束。在这个过程中线程只有等待。为了解决这个问题只有用多线程来保障同时的读写操作,但是多线程的引入使资源大量被浪费(线程是珍贵资源),而且频繁的进行上下文切换使得CPU大量浪费。由此引入了NIO,NIO的操作是无阻塞的,这个意味着一个线程可以多个读取,从而可以做到一个线程管理多个Channel。

    标准IO NIO

    标准IO虽然有缺点,但是它的读写效率是高于NIO的(在对一个流大量读写时),所以它们各有使用场景。NIO用于处理大量的链接,但是带宽都不大的场景。而标准IO用于少量连接带宽超大的场景。



    NIO关键概念

    Buffer

    缓存区是与通道进行数据传输的工具。NIO提供了多个缓存区

    byteBuffer

    CharBuffer

    DoubleBuffer

    FloatBuffer

    IntBuffer

    LongBuffer

    ShortBuffer

    其包含了各个数据类型。

    Buffer的内部有position,limit,capacity。分别代表了,当前指针位置,最大指针位置,容量。在写操作时,limit=capacity。读操作是position不能超过limit。

    生成一个buffer(以byte类型为例)  ByteBuffer buf = ByteBuffer.allocate(48);//指定了大小

    写入buffer,从Channel中读入读取,put写入数据。

    channel.read(buffer)  或者 buffer.put("content")

    读取buffer,Channel从buffer中读取数据,get()获取数据

    channel.write(buffer)或者buffer.get()

    读写转换,一个buffer既可以读也可以写,其原理实际是改变了position指针。

    调用flip()

    rewind()方法,使position归零,可以实现重读的操作。

    clear(),compact()。前者是清除所有的数据,后者是清除刚读完的数据。实际上数据都没有被清除,而是操作了指针。

    Channel

    通道类似于流。但是通道既可以双向读写,也是异步操作。而且通道读写都必须经过buffer。

    这些是Java NIO中最重要的通道的实现:

    FileChannel:从文件中读写数据

    DatagramChannel:能通过UDP读写网络中的数据

    SocketChannel:能通过TCP读写网络中的数据

    ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel

    通道也可以对buffer进行多个的读写,叫做scatter(分散)和Gather(聚合)。前者是一个通道写入多个buffer,后者是多个buffer读取一个通道。读写都有前后顺序。

    channel.read({buffer1,buffer2}) buffer1被读取完后buffer2才会进行。

    通道间的传输。NIO提供了两个函数

    transferFrom(position, count, FromChannel)  ,  transferTo(position, count, toChannel),可以由名字知道,第一个函数从FromChannel运输数据到操作通道,数据量为position+count。函数二则是相反,从操作通道运输到toChannel。



    文件传输

    可以连接到一个文件,对文件进行操作,但是是阻塞式的。不能设为无阻塞。

    RandomAccessFile aFile =newRandomAccessFile("data/nio-data.txt","rw");

      FileChannel inChannel = aFile.getChannel();

    先建立一个地址,再创建一个文件传输通道

    它有read(),write(),close(),size(),force()方法 就不一一介绍了。



    网络编程

    通道

    服务器channel -> serverSocketChannel

    是一个用来监听请求的通道。

    客户端channel -> SocketChannel

    它是一个网络连接的套接字通道,它会连接到服务器的ServerSocketChannle并会创建一个SocketChannel与之连接。

    Selector

    通道管理者,可以监听多个通道,并且知晓读写每个通道的读写操作,所以个一个Selector可以管理多条通道。

    channle通过注册,把自己交给通道管理者,在注册之前要设置自己的阻塞属性为非阻塞。因为Selector只支持非阻塞的操作。注册的时,还需要填写自己感兴趣的操作,方便Selector进行监听并分配到相应的方法。操作包括了

    Connect

    Accept

    Read

    Write

    其中client端只有Connet操作,server端四个都可以使用。但是只有ServerSocketChannel才能使用Accept操作。后面会细说这个Accept属性。

    通道触发了一个事件意思是该事件已经就绪。所以,某个channel成功连接到另一个服务器称为“连接就绪”。一个server socket channel准备好接收新进入的连接称为“接收就绪”。一个有数据可读的通道可以说是“读就绪”。等待写数据的通道可以说是“写就绪”。

    这四种事件用SelectionKey的四个常量来表示:

    SelectionKey.OP_CONNECT

    SelectionKey.OP_ACCEPT

    SelectionKey.OP_READ

    SelectionKey.OP_WRITE

    对channle注册完之后,会返回一个SelectorKey,这个对象像一个标识,每个client请求的封装类。里面包含了多个属性

    interest集合                                                所支持的兴趣属性

    ready集合                                                    已经就绪的兴趣属性

    Channel                                                      所对应的Channel

    Selector                                                      所对应的selector

    附加的对象(可选)                                可以添加一些附加信息,来作为唯一标识

    interes()可以查看所有interest集合

    read集合可以调用Bool的方法来判断属于哪类兴趣

    selectionKey.isAcceptable();

    selectionKey.isConnectable();

    selectionKey.isReadable();

    selectionKey.isWritable();

    从SelectionKey访问Channel和Selector很简单。如下:

    Channel  channel  = selectionKey.channel();

    Selector selector = selectionKey.selector(); 

    选择通道  方法select(),select(time),selectNew()

    这个方法会像Selector的keys集合返回有多少个已经就绪的通道。如果没有这个方法,会报错 java.nio.channels.IllegalBlockingModeException。select()也是一个阻塞方法,它会阻塞到你有已经就绪的通道且它们的SelectionKey是你感兴趣的。而selectNow()是非阻塞,返回一切就绪通道。

    wakeUp(),有阻塞就有唤醒,这个函数会唤醒阻塞方法。

    实例程序:

    package NIO;

    import java.io.IOException;

    import java.net.InetSocketAddress;

    import java.net.Socket;

    import java.nio.Buffer;

    import java.nio.ByteBuffer;

    import java.nio.channels.SelectionKey;

    import java.nio.channels.Selector;

    import java.nio.channels.ServerSocketChannel;

    import java.nio.channels.SocketChannel;

    import java.util.Iterator;

    import java.util.Set;

    public class Socket_server {

    private ServerSocketChannel server;

    private Selector selector;

    public Socket_server() throws IOException {

    initSelector();

    initServer();

    run();

    }

    public void initSelector() throws IOException {

    //创建一个selector

    selector=Selector.open();

    System.out.println("initSelector");

    }

    public void initServer() throws IOException {

    System.out.println("initServer");

    //开启serverChannel

    server=ServerSocketChannel.open();

    //SocketChannel的打开需要一个Address,指定地址和端口

    InetSocketAddress address=new InetSocketAddress("localhost",9091);

    //绑定一个监听端口

    server.bind(address);

    //设定是否用阻塞

    server.configureBlocking(false);

    //将管道注册到Selector中,并指定该管道具有accept操作,这意味着

    server.register(selector, SelectionKey.OP_ACCEPT);

    }

    public void run() throws IOException {

    while(true) {

    selector.select();

    //取出每一个准备好的i/o,server不在内,但server会不断的调用isAcceptable对应的方法

    Set<SelectionKey> sets=selector.selectedKeys();

    //将之变为一个iterator,方便遍历

    Iterator<SelectionKey> iterator=sets.iterator();

    SelectionKey key=null;

    while(iterator.hasNext()) {

    key=iterator.next();

    Hadle(key);

    iterator.remove();

    }

    selector.select();

    }

    }

    public void Hadle(SelectionKey key) throws IOException {

    //根据key的属性,决定其操作

    if(key.isAcceptable()) {

    //这个方法会被反复调用

    HadleAp(key);

    }else if (key.isReadable()) {

    HadleRead(key);

    }else if (key.isWritable()) {

    HableWrite(key);

    }else if (key.isConnectable()) {

    }

    }

    //客户端传来一个可读的通道

    public void HadleRead(SelectionKey key) throws IOException {

    //通过key得到一个通道

    SocketChannel channel=(SocketChannel) key.channel();

    ByteBuffer buffer=ByteBuffer.allocate(1024);

    int i=channel.read(buffer);

    while(i!=0) {

    i=channel.read(buffer);

    }

    //dosomething hadle the buffer date

    channel.close();

    }

    //可写文件,将要返回的内容写入文件

    public void HableWrite(SelectionKey key) throws IOException {

    SocketChannel channel=(SocketChannel) key.channel();

    System.out.println("1"+Thread.currentThread().getName());

    ByteBuffer buffer=ByteBuffer.allocate(1024);

    String content="需要返回的内容";

    buffer.put(content.getBytes());

    channel.write(buffer);

    while(buffer.hasRemaining()) {

    channel.write(buffer);

    }

    channel.close();

    }

    public void HadleAp(SelectionKey key) throws IOException {

    System.out.println("dasd");

    //通过key得到一个serverSocketChannel

    //ServerSocketChannel channel=(ServerSocketChannel) key.channel();

    //ServerSocketChannle可以通过accept方法,获得连接到服务器的连接

    //SocketChannel client=channel.accept();

    //设置为非阻塞

    //client.configureBlocking(false);

    //将client注册到selector,selector将会对其任务进行处理

    //client.register(selector, SelectionKey.OP_WRITE);

    //key.interestOps(SelectionKey.OP_ACCEPT);

    }

    public static void main(String[] args) throws IOException {

    Socket_server server=new Socket_server();

    }

    }


    Netty

    netty是一个NIO框架爱,它可以简单快速的开发网络应用程度。

    一般的网络框架都是基于Reactor的。Reactor模式分为单线程模式,多线程模式,主从模式。

    单线程模式,同一个Reactor线程作为客户端 服务端。

    单线程模式

    多线程模式,专门有一个Reactor线程负责处理accept线程,其他读写等操作交给Reactor线程池处理。一个Reactor线程可以同时处理多条链路,但个链路的相关操作只能一个Reactor负责,这样避免了并发问题。

    多线程模型

    主从模型,使用一个独立的Reactor线程来处理连接操作,连接成功后又使用一个Reactor来处理accept操作。链路创建后,再用一条Reactor来处理相关操作

    ,主从模型

    Netty线程模型同时支持了三种模型,可以通过配置来切换。

    大致的模型构建

    package netty;

    import java.nio.channels.NonWritableChannelException;

    import java.nio.channels.SocketChannel;

    import io.netty.bootstrap.ServerBootstrap;

    import io.netty.channel.Channel;

    import io.netty.channel.ChannelFuture;

    import io.netty.channel.ChannelInboundHandlerAdapter;

    import io.netty.channel.ChannelInitializer;

    import io.netty.channel.ChannelOption;

    import io.netty.channel.ChannelOutboundHandlerAdapter;

    import io.netty.channel.nio.NioEventLoopGroup;

    import io.netty.channel.socket.nio.NioServerSocketChannel;

    //服务器端代码

    public class Model_Run {

    private NioEventLoopGroup Accept_pool;

    private NioEventLoopGroup Handle_pool;

    public Model_Run() {

    init();

    }

    public void init() {

    //第一个创建的是接受client连接的线程组,1表示多线程模式,n(n>1)表示主从模式

    Accept_pool=new NioEventLoopGroup(1);

    //第二个创建处理具体业务的线程组,默认值为CPU*2,建议设置为空闲CPU*2

    Handle_pool=new NioEventLoopGroup(4);

    //用于配置服务的类

    ServerBootstrap bootstrap=new ServerBootstrap();

    //设置group

    bootstrap.group(Accept_pool,Handle_pool)

    //设置Channel的类,服务器端用的NioServerSocketChannel

    .channel(NioServerSocketChannel.class)

    //绑定具体的事件

    .childHandler(new ChannelInitializer<Channel>() {

    @Override

    protected void initChannel(Channel sc) throws Exception {

    //可以绑定多个或者一个

    ((Channel) sc).pipeline().addLast(new Server_Handle());

    sc.pipeline().addLast(new ChannelOutboundHandlerAdapter())

    .addLast(new ChannelInboundHandlerAdapter());

    }

    })

    /*

    * 每次client发来的数据要和服务器进行三次握手后才能进入准备状态,

    * 前两此握手时,服务器会将所有客户加入一个A队列,通过最后一次握手,

    * 服务器会将该客户放入B队列,AB队列size之和为backlog,如果a+b>backlog,就会

    * 拒绝其他客户的请求,所以这个数应该相应大些

    */

    .option(ChannelOption.SO_BACKLOG, 128)

    //保持通信

    .option(ChannelOption.SO_KEEPALIVE, true)

    //接受的缓存区

    .option(ChannelOption.SO_RCVBUF, 32*1024)

    //发送的缓存区

    .option(ChannelOption.SO_SNDBUF, 32*1024);

    //绑定监控port,一个或者多个

    try {

    //并设置为异步

    ChannelFuture future=bootstrap.bind(9091).sync();

    future.channel().closeFuture().sync();

    } catch (InterruptedException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }finally {

    //线程组优雅退出

    Accept_pool.shutdownGracefully();

    Handle_pool.shutdownGracefully();

    }

    }

    }

    逻辑代码一般是写入childHandler的,在childHandler中先会创建一个ChannelPipeline,它是ChannelHandle的执行列表,用于处理通道中接受发送和过滤。

    对于每个Channel,都会创建一个ChannelPipeline赋予它,Channel有唯一个ChannelPipeline。

    Pipeline是一个ChannelHandler的列表,它可以控制Handler的流程,和执行顺序还可以动态的添加 删除 替代pipeline里的ChannelHandler。

    ChanelHandlerContext:每一个ChannelHandler被添加到Pipeline后都会创建一个ChanelHandlerContext中,这个上下文可以和其他的ChannelHandler进行通信,由此可以控制事件的执行顺序。

    netty5将ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter合并到ChannelHandlerAdapter中

    相关文章

      网友评论

          本文标题:java特性之NIO及neety

          本文链接:https://www.haomeiwen.com/subject/hisiaftx.html