JAVA NIO

作者: 简单一点点 | 来源:发表于2019-04-27 20:53 被阅读0次

    Java NIO指JDK 1.4中提供的新IO,可以理解为非阻塞IO(non-blocking IO),为所有的原始类型(boolean类型除外)提供缓存支持的数据容器,使用它可以提供非阻塞式的高伸缩性网络。

    全部章节传送门:

    NIO和IO的区别

    1. 面向流与面向缓冲

    Java IO和NIO之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的。

    Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。

    在NIO厍中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的; 在写入数据时,写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。

    1. 阻塞与非阻塞IO

    Java IO的各种流是阻塞的。这意味着,当一个线程调用read()或write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。

    Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。

    1. 选择器(Selectors)

    Java NIO的选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器,然后使用一个单独的线程来“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。

    NIO 的结构

    NIO结构.jpg
    1. Buffer(缓冲区)

    一个Buffer对象是固定数量的数据的容器。其作用是一个存储器,或者分段运输区,在这里数据可被存储并在之后用于检索。尽管缓冲区作用于它们存储的原始数据类型,但缓冲区十分倾向于处理字节。非字节缓冲区可以在后台执行从字节或到字节的转换,这取决于缓冲区是如何创建的。

    缓冲区的工作与通道紧密联系。通道是 I/O 传输发生时通过的入口,而缓冲区是这些数据传输的来源或目标。对于离开缓冲区的传输,您想传递出去的数据被置于一个缓冲区,被传送到通道。对于传回缓冲区的传输,一个通道将数据放置在您所提供的缓冲区中。这种在协同对象(通常是您所写的对象以及一到多个 Channel 对象)之间进行的缓冲区数据传递是高效数据处理的关键。

    Buffer的常见方法如下所示:

    • flip(): 写模式转换成读模式
    • rewind():将 position 重置为 0 ,一般用于重复读。
    • clear() :清空 buffer ,准备再次被写入 (position 变成 0 , limit 变成 capacity) 。
    • compact(): 将未读取的数据拷贝到 buffer 的头部位。
    • mark(): reset():mark 可以标记一个位置, reset 可以重置到该位置。

    Buffer 常见类型: ByteBuffer 、 MappedByteBuffer 、 CharBuffer 、 DoubleBuffer 、 FloatBuffer 、 IntBuffer 、 LongBuffer 、 ShortBuffer 。

    1. Channel (通道)
      NIO 通过Channel(通道) 进行读写。

    通道是双向的,可读也可写,而流的读写是单向的。无论读写,通道只能和Buffer交互。因为 Buffer,通道可以异步地读写。

    1. Selectors(选择器)

    选择器用于使用单个线程处理多个通道。因此,它需要较少的线程来处理这些通道。线程之间的切换对于操作系统来说是昂贵的。 因此,为了提高系统效率选择器是有用的。

    NIO的问题

    实际中,开发者很少直接使用JDK的NIO类库进行开发,具体原因如下。

    1)NIO的类库和API繁杂,使用麻烦,你需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。

    2)需要具备其他的额外技能做铺垫,例如熟悉Java多线程编程。这是因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序。

    3)可靠性能力补齐,工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等问题,NIO编程的特点是功能开发相对容易,但是可靠性能力补齐的工作量和难度都非常大。

    4)JDK NIO的BUG,例如臭名昭著的epoll bug,它会导致Selector空轮询,最终导致CPU 100%。官方声称在JDK1.6版本的update18修复了该问题,但是直到JDK1.7版本该问题仍旧存在,只不过该BUG发生概率降低了一些而已,它并没有被根本解决。

    异常堆栈如下。

    java.lang.Thread.State: RUNNABLE  
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)  
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)  
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)  
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)  
    - locked <</span>0x0000000750928190> (a sun.nio.ch.Util$2)  
    - locked <</span>0x00000007509281a8> (a java.util.Collections$ UnmodifiableSet)  
    - locked <</span>0x0000000750946098> (a sun.nio.ch.EPollSelectorImpl)  
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)  
    at net.spy.memcached.MemcachedConnection.handleIO(Memcached Connection.java:217)  
    at net.spy.memcached.MemcachedConnection.run(MemcachedConnection. java:836)
    

    NIO实战

    下面通过一个多人在线聊天室的程序来演示NIO的使用。

    服务器端代码如下。

    package com.chat.server;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Iterator;
    import java.util.Vector;
    
    public class ChatServer implements Runnable {
    
        //选择器
        private Selector selector;
        //注册serverSocketChannel后的选择键
        private SelectionKey serverKey;
        //标识是否运行
        private boolean isRun;
        //当前聊天室中的用户名称列表
        private Vector<String> unames;
        //时间格式化器
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        /**
         * 构造函数
         * @param port 服务器监控的端口号
         */
        public ChatServer(int port) {
            isRun = true;
            unames = new Vector<String>();
            init(port);
        }
    
        /**
         * 初始化选择器和服务器套接字
         * @param port 服务器监控的端口号
         */
        private void init(int port) {
            try {
                //获得选择器实例
                selector = Selector.open();
                //获得服务器套接字实例
                ServerSocketChannel serverChannel = ServerSocketChannel.open();
                //绑定端口号
                serverChannel.socket().bind(new InetSocketAddress(port));
                //设置为非阻塞
                serverChannel.configureBlocking(false);
                //将channel注册到选择器,指定其行为为“等待接受连接”
                serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
                printInfo("Server starting...");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            try {
                //轮询选择器选择键
                while(isRun) {
                    // n=1表示有准备好进行IO操作的key
                    int n = selector.select();
                    if(n > 0) {
                        //从选择器上获取已选择的key的集合来进行迭代
                        Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                        while(iter.hasNext()) {
                            SelectionKey key = iter.next();
                            //如果此key的通道可接受新的套接字
                            if(key.isAcceptable()) {
                                //一定要remove这个key,否则之后的新连接会被堵塞,无法连接服务器
                                iter.remove();
                                //获取key对应的通道
                                ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
                                //接受新的连接返回和客户端对等的套接字通道
                                SocketChannel channel = serverChannel.accept();
                                if(channel == null) {
                                    continue;
                                }
                                //设置为非阻塞
                                channel.configureBlocking(false);
                                //将套接字通道注册到选择器,并指定其行为“读”
                                channel.register(selector, SelectionKey.OP_READ);
                            }
                            //如果key的通道行为是读
                            if(key.isReadable()) {
                                readMsg(key);
                            }
                            //如果key的通道行为是写
                            if(key.isWritable()) {
                                writeMsg(key);
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private void readMsg(SelectionKey key) throws IOException {
            //获取此key对应的套接字通道
            SocketChannel channel = (SocketChannel)key.channel();
            //创建一个1024k的缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            StringBuffer sb = new StringBuffer();
            //将通道的数据读到缓冲区
            int count = channel.read(buffer);
            if(count > 0) {
                //翻转缓冲区,由写到读
                buffer.flip();
                sb.append(new String(buffer.array(), 0, count));
            }
            String str = sb.toString();
            //若消息中有open,则客户端准备进入聊天界面
            if(str.indexOf("open_") != -1) {
                String name = str.substring(5);
                printInfo(name + " online");
                unames.add(name);
                Iterator<SelectionKey> iter =  selector.selectedKeys().iterator();
                while(iter.hasNext()) {
                    SelectionKey selKey = iter.next();
                    //若不是服务器套接字通道的key,则将数据设置到此key中,
                    //并更新此key感兴趣的动作
                    if(selKey != serverKey) {
                        selKey.attach(unames);
                        selKey.interestOps(selKey.interestOps() | SelectionKey.OP_WRITE);
                    }
                }
            } else if(str.indexOf("exit_") != -1){ //退出命令
                String uname = str.substring(5);
                //删除用户名
                unames.remove(uname);
                //将close字符串附加到key
                key.attach("close");
                //更新此key感兴趣的动作
                key.interestOps(SelectionKey.OP_WRITE);
                Iterator<SelectionKey> iter = key.selector().selectedKeys().iterator();
                while(iter.hasNext()) {
                    SelectionKey selKey = iter.next();
                    if(selKey != serverKey && selKey != key) {
                        selKey.attach(unames);
                        selKey.interestOps(selKey.interestOps() | SelectionKey.OP_WRITE);
                    }
                }
                printInfo(uname + " offline");
            } else {
                String uname = str.substring(0, str.indexOf("^"));
                String msg = str.substring(str.indexOf("^") + 1);
                printInfo("(" + uname + ")说:" + msg);
                String dateTime = sdf.format(new Date());
                String smsg = uname + " " + dateTime + "\n " + msg + "\n";
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while(iter.hasNext()) {
                    SelectionKey selKey = iter.next();
                    if(selKey != serverKey) {
                        selKey.attach(smsg);
                        selKey.interestOps(selKey.interestOps() | SelectionKey.OP_WRITE);
                    }
                }
            }
        }
    
        private void writeMsg(SelectionKey key) throws IOException {
            SocketChannel channel = (SocketChannel)key.channel();
            Object obj = key.attachment();
            //这里有必要将key的附加数据设置为空,否则会有问题
            key.attach("");
            if(obj.toString().equals("close")) {
                key.channel();
                channel.socket().close();
                channel.close();
                return;
            } else {
                channel.write(ByteBuffer.wrap(obj.toString().getBytes()));
            }
            key.interestOps(SelectionKey.OP_READ);
        }
    
        private void printInfo(String str) {
            System.out.println("[" + sdf.format(new Date()) + "] - >" + str);
        }
    
        public static void main(String[] args) {
            ChatServer server = new ChatServer(19999);
            new Thread(server).start();
        }
    }
    

    客户端一共三个类。

    首先是用来和服务端交互的客户端服务类。

    package com.chat.client;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    
    /**
     * 与服务端交互的客户端类
     */
    public class ClientService {
        private static final String HOST = "127.0.0.1";
        private static final int PORT = 19999;
        private static SocketChannel sc;
    
        private static Object lock = new Object();
    
        private static ClientService service;
    
        public static ClientService getInstance() {
            synchronized (lock) {
                if(service == null) {
                    try {
                        service = new ClientService();
                    } catch(IOException e) {
                        e.printStackTrace();
                    }
                }
                return service;
            }
        }
    
        private ClientService() throws IOException {
            sc = SocketChannel.open();
            sc.configureBlocking(false);
            sc.connect(new InetSocketAddress(HOST, PORT));
        }
    
        public void sendMsg(String msg) {
            try {
                while(!sc.finishConnect()) {
    
                }
                sc.write(ByteBuffer.wrap(msg.getBytes()));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public String receiveMsg() {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            buffer.clear();
            StringBuffer sb = new StringBuffer();
            int count = 0;
            String msg = null;
            try {
                while((count = sc.read(buffer)) > 0) {
                    sb.append(new String(buffer.array(), 0, count));
                }
                if(sb.length() > 0) {
                    msg = sb.toString();
                    if("close".equals(sb.toString())) {
                        msg = null;
                        sc.close();
                        sc.socket().close();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return msg;
        }
    }
    

    然后是登陆窗体,用户用来设置名称。

    package com.chat.client;
    
    import javax.swing.*;
    import java.awt.*;
    import java.awt.event.ActionEvent;
    import java.awt.event.ActionListener;
    
    /**
     * 设置名称的登录窗体
     */
    public class SetNameFrame extends JFrame {
        private static final long serilaVersionUID = 1L;
        private static JTextField txtName;
        private static JButton btnOK;
        private static JLabel label;
    
        public SetNameFrame() {
            this.setLayout(null);
            Toolkit kit = Toolkit.getDefaultToolkit();
            int w = kit.getScreenSize().width;
            int h = kit.getScreenSize().height;
            this.setBounds(w / 2 - 230 / 2, h / 2 - 200 / 2, 230, 200);
            this.setTitle("设置名称");
            this.setDefaultCloseOperation(EXIT_ON_CLOSE);
            this.setResizable(false);
            txtName = new JTextField(4);
            this.add(txtName);
            txtName.setBounds(10, 10, 100, 25);
            btnOK = new JButton("OK");
            this.add(btnOK);
            btnOK.setBounds(120, 10, 80, 25);
            label = new JLabel("[w:" + w + ",h:" + h + "]");
            this.add(label);
            label.setBounds(10, 40, 200, 100);
            label.setText("<html>在上面的文本框中输入名字</br>显示器宽度: " + w + "<br/>显示器高度: "
                    + h + "</html>");
    
            btnOK.addActionListener(new ActionListener() {
                @Override
                public void actionPerformed(ActionEvent e) {
                    String uname = txtName.getText();
                    ClientService service = ClientService.getInstance();
                    ChatFrame chatFrame = new ChatFrame(service, uname);
                    chatFrame.show();
                    setVisible(false);
                }
            });
        }
    
        public static void main(String[] args) {
            SetNameFrame setNameFrame = new SetNameFrame();
            setNameFrame.setVisible(true);
        }
    }
    

    最后是聊天室窗体。

    package com.chat.client;
    
    import javax.swing.*;
    import javax.swing.event.ListSelectionEvent;
    import javax.swing.event.ListSelectionListener;
    import java.awt.event.*;
    
    public class ChatFrame {
        private JTextArea readContext = new JTextArea(18, 30);
        private JTextArea writeContext = new JTextArea(6, 30);
    
        private DefaultListModel model = new DefaultListModel();
        private JList list = new JList(model);
    
        private JButton btnSend = new JButton("发送");
        private JButton btnClose = new JButton("关闭");
    
        private JFrame frame = new JFrame("ChatFrame");
    
        private String uname;
    
        private ClientService service;
    
        private boolean isRun = false;
    
        public ChatFrame(ClientService service, String uname) {
            this.isRun = true;
            this.uname = uname;
            this.service = service;
        }
    
        private void init() {
            frame.setLayout(null);
            frame.setTitle(uname + " 聊天窗口");
            frame.setSize(500, 500);
            frame.setLocation(400, 200);
    
            frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
            frame.setResizable(false);
    
            JScrollPane readScroll = new JScrollPane(readContext);
            readScroll.setVerticalScrollBarPolicy(JScrollPane.VERTICAL_SCROLLBAR_AS_NEEDED);
            frame.add(readScroll);
    
            JScrollPane writeScroll = new JScrollPane(writeContext);
            writeScroll.setVerticalScrollBarPolicy(JScrollPane.VERTICAL_SCROLLBAR_AS_NEEDED);
            frame.add(writeScroll);
            frame.add(list);
            frame.add(btnSend);
            frame.add(btnClose);
    
            readScroll.setBounds(10, 10, 320, 300);
            readContext.setBounds(0, 0, 320, 300);
            readContext.setEditable(false);
            readContext.setLineWrap(true);
    
            writeScroll.setBounds(10, 315, 320, 100);
            writeContext.setBounds(0, 0, 320, 100);
            writeContext.setLineWrap(true);
    
            list.setBounds(340, 10, 140, 445);
            btnSend.setBounds(150, 420, 80, 30);
            btnClose.setBounds(250, 420, 80, 30);
    
            frame.addWindowListener(new WindowAdapter() {
                @Override
                public void windowClosing(WindowEvent e) {
                    isRun = false;
                    service.sendMsg("exit_" + uname);
                    System.exit(0);
                }
            });
    
            btnSend.addActionListener(new ActionListener() {
                @Override
                public void actionPerformed(ActionEvent e) {
                    String msg = writeContext.getText().trim();
                    if(msg.length() > 0) {
                        service.sendMsg(uname + "^" + writeContext.getText());
                    }
    
                    writeContext.setText(null);
                    writeContext.requestFocus();
                }
            });
    
            //关闭按钮事件
            btnClose.addActionListener(new ActionListener() {
                @Override
                public void actionPerformed(ActionEvent e) {
                    isRun = false;
                    service.sendMsg("exit_" + uname);
                    System.exit(0);
                }
            });
    
            //右边名称列表选择事件
            list.addListSelectionListener(new ListSelectionListener() {
                @Override
                public void valueChanged(ListSelectionEvent e) {
    
                }
            });
    
            writeContext.addKeyListener(new KeyListener() {
                @Override
                public void keyTyped(KeyEvent e) {
    
                }
    
                @Override
                public void keyPressed(KeyEvent e) {
    
                }
    
                @Override
                public void keyReleased(KeyEvent e) {
                    //按下Enter发送消息
                    if(e.getKeyCode() == KeyEvent.VK_ENTER) {
                        String msg = writeContext.getText().trim();
                        if(msg.length() > 0) {
                            service.sendMsg(uname + "^" + writeContext.getText());
                        }
                        writeContext.setText(null);
                        writeContext.requestFocus();
                    }
                }
            });
        }
    
        //此线程类用于轮询读取服务器发送的消息
        private class MsgThread extends Thread {
            @Override
            public void run() {
                while(isRun) {
                    String msg = service.receiveMsg();
                    if(msg != null) {
                        //若是名称列表数据,则更新窗体右边的列表
                        if(msg.indexOf("[") != -1 && msg.lastIndexOf("]") != -1) {
                            msg = msg.substring(1, msg.length() - 1);
                            String[] userNames = msg.split(",");
                            model.removeAllElements();
                            for(int i = 0; i < userNames.length; i++) {
                                model.addElement(userNames[i].trim());
                            }
                        } else {
                            String str = readContext.getText() + msg;
                            readContext.setText(str);
                            readContext.selectAll();
                        }
                    }
                }
            }
        }
    
        //显示界面
        public void show() {
            this.init();
            service.sendMsg("open_" + uname);
            MsgThread msgThread = new MsgThread();
            msgThread.start();
            this.frame.setVisible(true);
        }
    }
    

    聊天室效果如下。

    NIO实现聊天室.png

    相关文章

      网友评论

          本文标题:JAVA NIO

          本文链接:https://www.haomeiwen.com/subject/xxdlnqtx.html