美文网首页
Java Socket 编程那些事(2)

Java Socket 编程那些事(2)

作者: Kevin_ZGJ | 来源:发表于2017-09-10 16:26 被阅读100次

    前言

    在上一篇博客中,我们使用了BIO,也就是同步阻塞式IO实现了Socket通信。
    Java Socket编程那些事(1)
    现在我们使用jdk1.4之后的NIO来实现,NIO(new io / no-blocking io),同步非阻塞IO。

    基本原理

    服务端打开一个通道(ServerSocketChannel),并向通道中注册一个选择器(Selector),这个选择器是与一些感兴趣的操作的标识(SelectionKey,即通过这个标识可以定位到具体的操作,从而进行响应的处理)相关联的,然后基于选择器(Selector)轮询通道(ServerSocketChannel)上注册的事件,并进行相应的处理。
    客户端在请求与服务端通信时,也可以向服务器端一样注册(比服务端少了一个SelectionKey.OP_ACCEPT操作集合),并通过轮询来处理指定的事件,而不必阻塞。

    服务端

    package com.richstonedt.socket;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.util.Iterator;
    
    /**
     * @author zhangguoji
     * @date 2017/9/8 20:47
     */
    public class NIOServer {
        private Selector selector;
    
        /**
         * 获得一个ServerSocket通道,并对该通道做一些初始化的工作
         * @param port 绑定的端口号
         * @throws IOException
         */
        public void initServer(int port) throws IOException {
            // 获得一个ServerSocket通道
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            // 设置通道为非阻塞
            serverChannel.configureBlocking(false);
            // 将该通道对应的ServerSocket绑定到本地port端口
            serverChannel.socket().bind(new InetSocketAddress(port));
            // 获得一个通道管理器
            this.selector = Selector.open();
            //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
            //当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        }
    
        /**
         * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
         *
         * @throws IOException
         */
        public void listen() throws IOException {
            System.out.println("服务端启动成功!");
            // 轮询访问selector
            while (true) {
                //当注册的事件到达时,方法返回;否则,该方法会一直阻塞
                selector.select();
                // 获得selector中选中的项的迭代器,选中的项为注册的事件
                Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    // 删除已选的key,以防重复处理
                    iterator.remove();
                    // 客户端请求连接事件
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key
                                .channel();
                        // 获得和客户端连接的通道
                        SocketChannel channel = server.accept();
                        // 设置成非阻塞
                        channel.configureBlocking(false);
    
                        //在这里可以给客户端发送信息
                        channel.write(ByteBuffer.wrap("向客户端发送了一条信息".getBytes()));
                        //在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
                        channel.register(this.selector, SelectionKey.OP_READ);
    
                        // 获得了可读的事件
                    } else if (key.isReadable()) {
                        read(key);
                    }
                }
    
            }
        }
    
        /**
         * 处理读取客户端发来的信息 的事件
         * @param key
         * @throws IOException
         */
        public void read(SelectionKey key) throws IOException {
            // 服务器可读取消息:得到事件发生的Socket通道
            SocketChannel channel = (SocketChannel) key.channel();
            // 创建读取的缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(512);
            channel.read(buffer);
            byte[] data = buffer.array();
            String msg = new String(data).trim();
            System.out.println("服务端收到信息:" + msg);
            ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
            channel.write(outBuffer);// 将消息回送给客户端
        }
    
        /**
         * 启动服务端测试
         *
         * @throws IOException
         */
        public static void main(String[] args) throws IOException {
            NIOServer server = new NIOServer();
            server.initServer(8000);
            server.listen();
        }
    }
    
    

    服务端连接过程
    1、创建ServerSocketChannel实例serverSocketChannel,并bind到指定端口。
    2、创建Selector实例selector;
    3、将serverSocketChannel注册到selector,并指定事件OP_ACCEPT。
    4、while循环执行:
    4.1、调用select方法,该方法会阻塞等待,直到有一个或多个通道准备好了I/O操作或等待超时。
    4.2、获取选取的键列表;
    4.3、循环键集中的每个键:
    4.3.a、获取通道,并从键中获取附件(如果添加了附件);
    4.3.b、确定准备就绪的操纵并执行,如果是accept操作,将接收的信道设置为非阻塞模式,并注册到选择器;
    4.3.c、如果需要,修改键的兴趣操作集;
    4.3.d、从已选键集中移除键

    客户端

    package com.richstonedt.socket;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    
    /**
     * NIO客户端
     *
     * @author zhangguoji
     * @date 2017/9/8 21:43
     */
    public class NIOClient {
        //通道管理器
        private Selector selector;
    
        /**
         * 获得一个Socket通道,并对该通道做一些初始化的工作
         *
         * @param ip   连接的服务器的ip
         * @param port 连接的服务器的端口号
         * @throws IOException
         */
        public void initClient(String ip, int port) throws IOException {
            // 获得一个Socket通道
            SocketChannel channel = SocketChannel.open();
            // 设置通道为非阻塞
            channel.configureBlocking(false);
            // 获得一个通道管理器
            this.selector = Selector.open();
    
            // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调
            //用channel.finishConnect();才能完成连接
            channel.connect(new InetSocketAddress(ip, port));
            //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。
            channel.register(selector, SelectionKey.OP_CONNECT);
        }
    
        /**
         * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
         *
         * @throws IOException
         */
        public void listen() throws IOException {
            // 轮询访问selector
            while (true) {
                selector.select();
                // 获得selector中选中的项的迭代器
                Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    // 删除已选的key,以防重复处理
                    iterator.remove();
                    // 连接事件发生
                    if (key.isConnectable()) {
                        SocketChannel channel = (SocketChannel) key
                                .channel();
                        // 如果正在连接,则完成连接
                        if (channel.isConnectionPending()) {
                            channel.finishConnect();
    
                        }
                        // 设置成非阻塞
                        channel.configureBlocking(false);
    
                        //在这里可以给服务端发送信息哦
                        channel.write(ByteBuffer.wrap("向服务端发送了一条信息".getBytes()));
                        //在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。
                        channel.register(this.selector, SelectionKey.OP_READ);
    
                        // 获得了可读的事件
                    } else if (key.isReadable()) {
                        read(key);
                    }
    
                }
    
            }
        }
    
        /**
         * 处理读取服务端发来的信息 的事件
         *
         * @param key
         * @throws IOException
         */
        public void read(SelectionKey key) throws IOException {
            // 客户端可读取消息:得到事件发生的Socket通道
            SocketChannel channel = (SocketChannel) key.channel();
            // 创建读取的缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(512);
            channel.read(buffer);
            byte[] data = buffer.array();
            String msg = new String(data).trim();
            System.out.println("客户端收到信息:" + msg);
            ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
            channel.write(outBuffer);// 将消息回送给服务端
        }
    
    
        /**
         * 启动客户端测试
         *
         * @throws IOException
         */
        public static void main(String[] args) throws IOException {
            NIOClient client = new NIOClient();
            client.initClient("localhost", 8000);
            client.listen();
        }
    }
    
    

    客户端连接过程:(和服务器端类似)
    1、创建SocketChannel实例socketChannel,并连接到服务器端口
    2、创建Selector实例selector;
    3、将socketChannel注册到selector,并指定事件OP_CONNECT。
    4、while循环执行:
    4.1、调用select方法,该方法会阻塞等待,直到有一个或多个通道准备好了I/O操作或等待超时。
    4.2、获取选取的键列表;
    4.3、循环键集中的每个键:
    4.3.a、获取通道,并从键中获取附件(如果添加了附件);
    4.3.b、确定准备就绪的操纵并执行,如果是accept操作,将接收的信道设置为非阻塞模式,并注册到选择器;
    4.3.c、如果需要,修改键的兴趣操作集;
    4.3.d、从已选键集中移除键

    运行结果

    最终这两段代码的运行结果就是客户端和服务器之间不断发送信息
    server:

    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    服务端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    ...
    

    client:

    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    客户端收到信息:向服务端发送了一条信息向客户端发送了一条信息
    ...
    

    实现原理

    其实Java的NIO使用了IO多路复用,,I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。
    目前支持的IO多路复用有select,poll和epoll。
    与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。

    select

    select本质上是通过设置或者检查存放fd标志位的数据结构来进行下一步处理。这样所带来的缺点是:

    1. select最大的缺陷就是单个进程所打开的FD是有一定限制的,它由FD_SETSIZE设置,默认值是1024。
        一般来说这个数目和系统内存关系很大,具体数目可以cat /proc/sys/fs/file-max察看。32位机默认是1024个。64位机默认是2048.
    2. 对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低。
        当套接字比较多的时候,每次select()都要通过遍历FD_SETSIZE个Socket来完成调度,不管哪个Socket是活跃的,都遍历一遍。这会浪费很多CPU时间。如果能给套接字注册某个回调函数,当他们活跃时,自动完成相关操作,那就避免了轮询,这正是epoll与kqueue做的。
    3. 需要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大。

    poll

    基本原理:poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。这个过程经历了多次无谓的遍历。
    它没有最大连接数的限制,原因是它是基于链表来存储的,但是同样有一个缺点:

    1. 大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。
    2. poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。

    epoll

    epoll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。
    基本原理:epoll支持水平触发和边缘触发,最大的特点在于边缘触发,它只告诉进程哪些fd刚刚变为就绪态,并且只会通知一次。还有一个特点是,epoll使用“事件”的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知。
    epoll的优点:

    1. 没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口)。
    2. 效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。
        只有活跃可用的FD才会调用callback函数;即Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll。
    3. 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销。

    epoll原理

    epoll是Linux下的一种IO多路复用技术,可以非常高效的处理数以百万计的socket句柄。
    c封装后的3个epoll系统调用

    • int epoll_create(int size)
      epoll_create建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。
    • *nt epoll_ctl(int epfd, int op, int fd, struct epoll_event event)
      epoll_ctl可以操作epoll_create创建的epoll,如将socket句柄加入到epoll中让其监控,或把epoll正在监控的某个socket句柄移出epoll。
    • *int epoll_wait(int epfd, struct epoll_event events,int maxevents, int timeout)
      epoll_wait在调用时,在给定的timeout时间内,所监控的句柄中有事件发生时,就返回用户态的进程。

    大概看看epoll内部是怎么实现的:

    epoll初始化时,会向内核注册一个文件系统,用于存储被监控的句柄文件,调用epoll_create时,会在这个文件系统中创建一个file节点。同时epoll会开辟自己的内核高速缓存区,以红黑树的结构保存句柄,以支持快速的查找、插入、删除。还会再建立一个list链表,用于存储准备就绪的事件。
    当执行epoll_ctl时,除了把socket句柄放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后,就把socket插入到就绪链表里。
    当epoll_wait调用时,仅仅观察就绪链表里有没有数据,如果有数据就返回,否则就sleep,超时时立刻返回。
    epoll的两种工作模式:

    LT:level-trigger,水平触发模式,只要某个socket处于readable/writable状态,无论什么时候进行epoll_wait都会返回该socket。
    ET:edge-trigger,边缘触发模式,只有某个socket从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该socket。

    相关文章

      网友评论

          本文标题:Java Socket 编程那些事(2)

          本文链接:https://www.haomeiwen.com/subject/qtctsxtx.html