美文网首页nio
socket应用之多线程与NIO

socket应用之多线程与NIO

作者: 等流星的牧羊人 | 来源:发表于2016-11-22 21:28 被阅读424次

    by 等流星的牧羊人

    在本次网络编程作业中,对HTTPServer一共采用了两种方案进行性能改进。

    第一种是比较常规的多线程,第二种则是采用了NIO的多路复用模式。

    多线程

    在现有的HTTPServer中,一个很大的问题在于,它只有一个用户线程。当接受一个HTTPClient的请求,并进行处理的时候,由于响应设计I/O操作,需要一定响应时间。这期间,用户线程可以看作是阻塞的,无法响应新的Client提交过来的请求。

    多线程可以解决这个问题,用户线程只要负责不断接受新来的请求,而对每个请求的处理,则是通过新启的子线程处理,这样就不会导致用户线程阻塞。从而服务器可以支持并发请求。

    通过implements Runnable接口封装了一个响应请求的独立的线程类。

    HTTPServer.java

    package com.zaper.sea.river.socketwork;
    
    import java.net.*;
    /**
     * Created by Zaper Ocean on 2016/11/16.
     */
    public class HTTPServer{
        public static void main(String args[]) {
            System.out.println(HTTPServer.class.getResource("me/a.json"));
            int port;
            ServerSocket serverSocket;
    
            try {
                port = Integer.parseInt(args[0]);
            }catch (Exception e) {
                System.out.println("port = 8080 (默认)");
                port = 8080; //默认端口为8080
            }
    
            try{
                serverSocket = new ServerSocket(port);
                System.out.println("服务器正在监听端口:" + serverSocket.getLocalPort());
    
                while(true) { //服务器在一个无限循环中不断接收来自客户的TCP连接请求
                    try{
                        //等待客户的TCP连接请求
                        final Socket socket = serverSocket.accept();
                        System.out.println("建立了与客户的一个新的TCP连接,该客户的地址为:"+
                                socket.getInetAddress()+":" + socket.getPort());
                        //启动一个新线程响应客户请求
                        Thread myHTTPServer=new Thread(new HTTPServerRunnable(socket));
                        myHTTPServer.start();
                    }catch(Exception e){e.printStackTrace();}
                } //#while
            }catch (Exception e) {e.printStackTrace();}
        }
    }
    
    
    

    HTTPServerRunnable.java

    package com.zaper.sea.river.socketwork;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.Socket;
    
    /**
     * Created by Zaper Ocean on 2016/11/17.
     */
    public class HTTPServerRunnable implements Runnable {
        private Socket socket = null;
    
        public HTTPServerRunnable(Socket socket){this.socket = socket;}
    
        public void run() {
            try {
                /*读取HTTP请求信息*/
                InputStream socketIn= null; //获得输入流
                socketIn = this.socket.getInputStream();
                Thread.sleep(500);  //睡眠500毫秒,等待HTTP请求
                int size=socketIn.available();
                byte[] requestBuffer=new byte[size];
                socketIn.read(requestBuffer);
                String request=new String(requestBuffer);
                System.out.println(request); //打印HTTP请求数据
    
                /*解析HTTP请求*/
                //获得HTTP请求的第一行
                String firstLineOfRequest=request.substring(0,request.indexOf("\r\n"));
                //解析HTTP请求的第一行
                String[] parts=firstLineOfRequest.split(" ");
                String uri=parts[1]; //获得HTTP请求中的uri
    
                /*决定HTTP响应正文的类型*/
                String contentType;
                if(uri.indexOf("html")!=-1 || uri.indexOf("htm")!=-1)
                    contentType="text/html";
                else if(uri.indexOf("jpg")!=-1 || uri.indexOf("jpeg")!=-1)
                    contentType="image/jpeg";
                else if(uri.indexOf("gif")!=-1)
                    contentType="image/gif";
                else
                    contentType="application/octet-stream";
    
    
                /*创建HTTP响应结果 */
                //HTTP响应的第一行
                String responseFirstLine="HTTP/1.1 200 OK\r\n";
                //HTTP响应头
                String responseHeader="Content-Type:"+contentType+"\r\n\r\n";
                //获得读取响应正文数据的输入流
                System.out.println(uri);
                System.out.println(HTTPServer.class.getResource("/root/"+uri));
                InputStream in=HTTPServer.class.getResourceAsStream("/root/"+uri);
    
                /*发送HTTP响应结果 */
                OutputStream socketOut=socket.getOutputStream(); //获得输出流
                //发送HTTP响应的第一行
                socketOut.write(responseFirstLine.getBytes());
                //发送HTTP响应的头
                socketOut.write(responseHeader.getBytes());
                //发送HTTP响应的正文
                int len=0;
                byte[] buffer=new byte[128];
                while((len=in.read(buffer))!=-1)
                    socketOut.write(buffer,0,len);
    
                Thread.sleep(1000);  //睡眠1秒,等待客户接收HTTP响应结果
                socket.close(); //关闭TCP连接
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    运行结果

    HTTPServer运行 HTTPClient运行

    NIO

    除了第一种常规的MultiThread方案,还采取了NIO方案。

    在我们的HTTPServer中,需要进行的IO操作是非常多的。读写文件与读写socket都是,而众所周知,IO操作是最耗时间的操作。那么如何减少这部分时间呢?JDK在1.4之后为我们提供了一个新思路,NIO。

    对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是kernel。当一个read操作发生时,它会经历两个阶段:

    1. 等待数据准备
    2. 将数据从内核拷贝到进程中

    接着介绍一下IO的几种模型,最权威的总结来自Stevens的UNP(Unix Network Progamming),有以下五种:

    • blocking IO
    • nonblocking IO
    • IO multiplexing
    • signal driven IO
    • asynchronous IO

    而无论原始的HTTPServer还是多线程方案都属于第一种BIO

    NIO

    当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
    所以,blocking IO的特点就是在IO执行的两个阶段都被block了。

    所以,很明显BIO对性能的影响很明显。

    回到第一种方案,我们之所以使用多线程,主要原因在于socket.accept()、socket.read()、socket.write()三个主要函数都是同步阻塞的,当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。其实这也是所有使用多线程的本质:

    • 利用多核。

    • 当I/O阻塞系统,但CPU空闲的时候,可以利用多线程使用CPU资源。

    IO multiplexing

    在NIO中,单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

    当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
    这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

    所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。 select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

    NIO模型

    BIO模型之所以需要多线程,是因为在进行I/O操作的时候,一是没有办法知道到底能不能写、能不能读,只能"傻等",即使通过各种估算,算出来操作系统没有能力进行读写,也没法在socket.read()和socket.write()函数中返回,这两个函数无法进行有效的中断。所以除了多开线程另起炉灶,没有好的办法利用CPU。

    NIO的读写函数可以立刻返回,这就给了我们不开线程利用CPU的最好机会:如果一个连接不能读写(socket.read()返回0或者socket.write()返回0),我们可以把这件事记下来,记录的方式通常是在Selector上注册标记位,然后切换到其它就绪的连接(channel)继续进行读写。

    下面具体看下如何利用事件模型单线程处理所有I/O请求:

    NIO的主要事件有几个:读就绪、写就绪、有新连接到来。

    我们首先需要注册当这几个事件到来的时候所对应的处理器。然后在合适的时机告诉事件选择器:我对这个事件感兴趣。对于写操作,就是写不出去的时候对写事件感兴趣;对于读操作,就是完成连接和系统没有办法承载新读入的数据的时;对于accept,一般是服务器刚启动的时候;而对于connect,一般是connect失败需要重连或者直接异步调用connect的时候。

    其次,用一个死循环选择就绪的事件,会执行系统调用,还会阻塞的等待新事件的到来。新事件到来的时候,会在selector上注册标记位,标示可读、可写或者有连接到来。

    注意,select是阻塞的,无论是通过操作系统的通知(epoll)还是不停的轮询(select,poll),这个函数是阻塞的。所以我们可以放心大胆地在一个while(true)里面调用这个函数而不用担心CPU空转。

    了解完NIO的multiplexing原理,然后还有几个概念。

    Buffer ,是一个对象, 它包含一些要写入或者刚读出的数据.最常用的缓冲区类型是 ByteBuffer。常用状态变量包括 position,limit和capacity
    Channel ,是一个对象,可以通过它读取和写入数据。拿 NIO 与原来的 I/O 做个比较,通道就像是流。
    所有数据都通过 Buffer 对象来处理。我们永远不会将字节直接写入通道中,相反,是将数据写入包含一个或者多个字节的缓冲区。同样,也不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。

    最后就是代码了。

    NioHTTPServer

    package com.zaper.sea.river.socketwork.niosocket;
    
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.nio.charset.Charset;
    import java.util.Iterator;
    import java.util.logging.Logger;
    
    /**
     * Created by Zaper Ocean on 2016/11/19.
     */
    public class NioHTTPServer {
        private static final Logger log = Logger.getLogger(NioHTTPServer.class.getName());
        private Selector selector;
    
        public NioHTTPServer bindInet(String ip,int port) throws IOException {
            ServerSocketChannel serverChannel=ServerSocketChannel.open();
            /**
             *  与Selector一起使用时,Channel必须处于非阻塞模式下。
             */
            serverChannel.configureBlocking(false);
            serverChannel.socket().bind(new InetSocketAddress(ip,port));
    
            /**Opens a selector.
             *
             */
            selector=Selector.open();
    
            /**
             * Operation-set bit for socket-accept operations.
             *
             * <p> Suppose that a selection key's interest set contains
             * <tt>OP_ACCEPT</tt> at the start of a <a
             * href="Selector.html#selop">selection operation</a>.  If the selector
             * detects that the corresponding server-socket channel is ready to accept
             * another connection, or has an error pending, then it will add
             * <tt>OP_ACCEPT</tt> to the key's ready set and add the key to its
             * selected-key set.  </p>
             *
             * 通过Selector监听Channel时对连接感兴趣
             */
            serverChannel.register(selector,SelectionKey.OP_ACCEPT);
            return this;
        }
    
        public void polling() throws IOException {
            log.info("Nio HTTP Server stated polling :");
            while (true){
                if(selector.select()>0){
                    Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                    while (it.hasNext()){
                        SelectionKey sk=it.next();
                        if(sk.isAcceptable()){
                            log.info("Nio HTTP Server: SelectionKey is acceptable.");
    
                            ServerSocketChannel serverSocketChannel = (ServerSocketChannel)sk.channel();
    
                            /**
                             * Accepts a connection made to this channel's socket.
                             *
                             * <p> If this channel is in non-blocking mode then this method will
                             * immediately return <tt>null</tt> if there are no pending connections.
                             * Otherwise it will block indefinitely until a new connection is available
                             * or an I/O error occurs.
                             *
                             * 获得客户端连接通道
                             */
                            SocketChannel socketChannel = serverSocketChannel.accept();
    
                            log.info("Nio HTTP Server: accept client socket " + socketChannel);
                            socketChannel.configureBlocking(false);
                            socketChannel.register(sk.selector(), SelectionKey.OP_READ);
                        }
                        else if(sk.isReadable()){
                            log.info("Nio HTTP Server: SelectionKey is readable.");
    
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            SocketChannel socketChannel = (SocketChannel)sk.channel();
    
                            /**SocketChannel.read()将数据从SocketChannel读到Buffer中。
                             * read()方法返回的int值表示读了多少字节进Buffer里。
                             * 如果返回的是-1,表示已经读到了流的末尾(连接关闭了)。
                             */
                            while(true) {
                                int readBytes = socketChannel.read(byteBuffer);
                                if(readBytes>0) {
                                    log.info("Nio HTTP Server: readBytes = " + readBytes);
                                    String request=new String(byteBuffer.array(), 0, readBytes);
                                    log.info("Nio HTTP Server: data = \n" + request);
                                    byteBuffer.flip();
                                    socketChannel.write(getResponseBuffer(request));
                                    //socketChannel.write(byteBuffer);
                                    sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                                    break;
                                }
                            }
                            socketChannel.close();
                        }
                        else if(sk.isWritable()){
                            log.info("Nio HTTP Server: SelectionKey is writable.");
    
                            //获取与该信道关联的缓冲区,里面有之前读取到的数据
                            ByteBuffer buf = (ByteBuffer) sk.attachment();
                            //重置缓冲区,准备将数据写入信道
                            buf.flip();
                            SocketChannel clntChan = (SocketChannel) sk.channel();
                            //将数据写入到信道中
                            clntChan.write(buf);
                            if (!buf.hasRemaining()){
                                //如果缓冲区中的数据已经全部写入了信道,则将该信道感兴趣的操作设置为可读
                                sk.interestOps(SelectionKey.OP_READ);
                            }
                            //为读入更多的数据腾出空间
                            buf.compact();
                        }
                        /**
                         * 注意每次迭代末尾的keyIterator.remove()调用。
                         */
                        it.remove();
    
                    }
                }
            }
        }
    
        public ByteBuffer getResponseBuffer(String request) throws IOException {
            //清空ByteBuffer
    //        byteBuffer.clear();
    
            /*解析HTTP请求*/
            //获得HTTP请求的第一行
            String firstLineOfRequest=request.substring(0,request.indexOf("\r\n"));
            //解析HTTP请求的第一行
            String[] parts=firstLineOfRequest.split(" ");
            String uri=parts[1]; //获得HTTP请求中的uri
    
            /*决定HTTP响应正文的类型*/
            String contentType;
            if(uri.indexOf("html")!=-1 || uri.indexOf("htm")!=-1)
                contentType="text/html";
            else if(uri.indexOf("jpg")!=-1 || uri.indexOf("jpeg")!=-1)
                contentType="image/jpeg";
            else if(uri.indexOf("gif")!=-1)
                contentType="image/gif";
            else
                contentType="application/octet-stream";
    
            /*创建HTTP响应结果 */
            //HTTP响应的第一行
            String responseFirstLine="HTTP/1.1 200 OK\r\n";
            //HTTP响应头
            String responseHeader="Content-Type:"+contentType+"\r\n\r\n";
            //获得读取响应正文数据的输入流
            System.out.println(uri);
            System.out.println(NioHTTPServer.class.getResource("/root/"+uri));
    
            String path=NioHTTPServer.class.getResource("/root/"+uri).getPath();
            FileInputStream fis = new FileInputStream(path);
            Charset charset = Charset.forName("GBK");// 创建GBK字符集
            // 得到文件通道
            FileChannel fc = fis.getChannel();
            // 分配与文件尺寸等大的缓冲区
            ByteBuffer bf = ByteBuffer.allocate((int) fc.size());
    
            try {
                // 整个文件内容全读入缓冲区,即是内存映射文件
                fc.read(bf);
                System.out.println(bf.position());
                // 把缓冲中当前位置回复为零
                bf.rewind();
                System.out.println(bf.position());
                // 输出缓冲区中的内容
                System.out.println(charset.decode(bf));
            } catch (IOException e) {
                e.printStackTrace();
            }
            bf.flip();
            return bf;
        }
    
        public static void main(String[] args) throws IOException {
            new NioHTTPServer().bindInet("localhost", 8080).polling();
        }
    }
    
    

    运行结果

    NioHTTPServer运行 HTTPClient运行

    相关文章

      网友评论

        本文标题:socket应用之多线程与NIO

        本文链接:https://www.haomeiwen.com/subject/nvwepttx.html