美文网首页
Socket编程

Socket编程

作者: 黄靠谱 | 来源:发表于2019-02-14 18:44 被阅读14次

    概述

    代码实现方式:

    1. BIO:服务端阻塞式监听到一个客户端,就单独开启一个子线程(或者丢到线程池)阻塞式的监听客户端的消息,客户端连接成功以后,也是阻塞式的监听服务端写入的消息。
    2. NIO:服务端把自己绑定在Selector上,然后重写4个方法监听和响应客户端的Accept()、read()、write()的事件,循环遍历Selector的事件,来响应客户端的行为
    3. AIO:服务器端绑定一个ConnectHandler,连接以后,给该线程绑定一个ReadHandler,和一个WriteHandler,涉及的类比较多,功能单一。

    特点:

    1. 复杂度: BIO < NIO < AIO
    2. 线程的占用:NIO最少 < AIO < BIO
    3. 响应的即时性:BIO最快 > AIO > NIO最慢

    阻塞式多线程TCPsocket编程原理

    • ServerSocket创建TCP的服务器,调用阻塞式的accept()方法监听连接的客户端。
    • 客户端和服务端通过Socket对象的输入流、输出流进行内容交互 socket.getInputStream()和socket.getOutputStream()

    BIO

    服务器端:用线程池去管理多个接入的客户端,把接收到的客户端Socket作为参数,用处理类在单独的线程里面和客户端交互

    1. 初始化服务端
    2. while(true)+ socket.accept的方式,阻塞式的等待新客户的接入
    3. 一旦有一个先Clinet接入的话,则从线程池中创建一个线程,来和Client交互,一般是读取信息,也可以输出固定的信息,比如服务器已经收到
    4. 工作子线程也是while(true)+buf.readLine(),阻塞和循环式的读取Clinet发过来的消息
            public static void main(String[] args) {
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
                try {
                    ServerSocket socket = new ServerSocket(5000);
                    while (true) {
                        Socket client = socket.accept();
                        newFixedThreadPool.execute(new ClientHandler(client));
                    }  
                }
                catch (IOException ioe) {
                    ioe.printStackTrace();
                }
           }
    

    交互类:

    public class ClientHandler  implements Runnable {
         private Socket client = null;
            private String address;
            //通过构造函数注入接收到的客户端Socket对象
            public ClientHandler(Socket client) {
                this.client = client;
            }
            
            @Override
            public void run() {
                try {
                    String host = client.getInetAddress().toString();
                    String port = Integer.toString(client.getPort());
    
                    PrintStream out = new PrintStream(client.getOutputStream());
                    System.out.println("Get Connection");
                    BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream()));
    
                    address = host + ":" + port;
                    System.out.println("get connection from" + address);
    
                    while (true) {
                        //阻塞式遍历读取客户端发过来的消息
                       String str = buf.readLine();
                       System.out.println(str);
                        if (str != null) {
                            out.println(str);
                            out.flush();
                        }
                    }
    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    }
    
    

    客户端:

    public class TcpClient1 {
        public static void main(String[] args) throws IOException {
            //尝试连接服务端 127.0.1.1:5000的服务器
            Socket client = new Socket("127.0.0.1", 5000);
            //获取输出流,往服务端写数据
            PrintStream out = new PrintStream(client.getOutputStream());
            BufferedReader buf =  new BufferedReader(new InputStreamReader(client.getInputStream()));
            String[] msgs = {"你好,我是client1"};
            for (String msg : msgs) {
                out.println(msg);
                out.flush();
                while (true) {
                    String echo = buf.readLine();
                    if (echo != null) {
                        System.out.println(echo);
                    }
                }
            }
        }
    }
    

    NIO

    NIO server端的代码逻辑

    1. 创建和初始化Selector和ServerSocketChannel,给Server绑定Accept事件
    2. 主线程While(true) 从selector中读取发生的事件,一旦获取到就触发监听
    3. 如果阻塞过程中触发了事件,就遍历事件按照事件的类型来处理(连接事件、读取事件、写事件),处理完成删除
    4. 处理连接操作,就可以从key里面获取到Server,从Server.accept返回ClientChannel,就可以和客户端端交互了

    服务端的操作(读取客户端的消息,和往客户端的Channel里面写消息)

    1. 获取到ClientChannel需要首先注册READ的监听,就可以读取到客户端发过来的消息,socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
    2. 也可以直接往clientChannel里面写内容, socketChannel.write(ByteBuffer.wrap(("welcome".getBytes())));

    客户端的操作:

    • 初始化客户端和Selector,类似的绑定OP_CONNECT的监听,需要在监听事件中 finishConnect()的方法才算连接成功
    • 连接成功之后,要在Client端绑定OP_READ监听,这样如果Server往里面写数据,就可以读取到。 socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer);
    • 也可以直接往Channel里面写数据,这样服务端也可以收的到 channel.write(ByteBuffer.wrap(new String("say hi from client").getBytes()));

    相关代码

    1. 服务端代码
        public static void main(String[] args) throws IOException {
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(10083));
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            ServerSelectorProtocol protocol = new ServerSelectorProtocol(BUFF_SIZE); 
            while (true) {
                selector.select();
                Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
                while (keyIter.hasNext()) {
                    SelectionKey key = keyIter.next(); 
                    keyIter.remove(); 
                    if (key.isAcceptable()){  
                        protocol.handleAccept(key);  
                    }  
                    if (key.isReadable()){  
                        protocol.handleRead(key);  
                    }  
                }
            }
        }
    
    1. 服务端处理类ServerSelectorProtocol
        public void handleAccept(SelectionKey key) throws IOException {
            System.out.println("Accept");
            //根据key.channel() 获取Server,再根据Server.accept()获取Clinet
            SocketChannel socketChannel = ((ServerSocketChannel)key.channel()).accept();
            socketChannel.configureBlocking(false);
            //附着一个ByteBuffer,把接收到的客户端socketChannel也注册在selector上,监听客户端channel的read操作
            socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
            System.out.println("create new session from "+socketChannel.getRemoteAddress()+"\n");
            //往ClinetChannle里面写数据
            socketChannel.write(ByteBuffer.wrap(("welcome".getBytes())));
        }
    
        public void handleRead(SelectionKey key) throws IOException {
            SocketChannel clntChan = (SocketChannel) key.channel();  
            //获取该信道所关联的附件,这里为缓冲区  
            ByteBuffer buf = (ByteBuffer) key.attachment();
            buf.clear();
            long bytesRead = clntChan.read(buf);
            //如果read()方法返回-1,说明客户端关闭了连接,那么客户端已经接收到了与自己发送字节数相等的数据,可以安全地关闭  
            if (bytesRead == -1){   
                clntChan.close();  
            }else if(bytesRead > 0){
                buf.flip();
                String result = "";
                while(bytesRead>0){
                     byte[] data = buf.array();
                     result+=new String(data);
                     bytesRead = clntChan.read(buf);
                }
                System.out.println(result);
            }  
        }
    
    1. 客户端代码
        public static void main(String[] args) throws IOException {
            SocketChannel clientChannel = SocketChannel.open();
            Selector selector = Selector.open();
            ClientHandler handler=new ClientHandler();
            clientChannel.configureBlocking(false);
            clientChannel.connect(new InetSocketAddress("127.0.0.1", 10083));
            clientChannel.register(selector, SelectionKey.OP_CONNECT);
            while (true) {
                selector.select();
                Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
                while (keyIter.hasNext()) {
                    SelectionKey key = keyIter.next(); 
                    keyIter.remove(); 
                    if (key.isConnectable()){  
                        handler.handleConnect(key);  
                    }  
                    if (key.isReadable()){  
                        handler.handleRead(key);  
                    }  
                }
            }
        }
    
    1. 客户端Handler
     public void handleConnect(SelectionKey key) throws IOException {
             SocketChannel channel=(SocketChannel)key.channel();
             //如果正在连接,则完成连接
             if(channel.isConnectionPending()){
                 channel.finishConnect();
             }
            System.out.println("Connected");
            //ServerChannel的key,有accept接入
            SocketChannel socketChannel = (SocketChannel) key.channel();
            socketChannel.configureBlocking(false);
            //附着一个ByteBuffer,把接收到的客户端socketChannel也注册在selector上,监听客户端channel的read操作
            socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFF_SIZE));
            channel.write(ByteBuffer.wrap(new String("say hi from client").getBytes()));
        }
        
        public void handleRead(SelectionKey key) throws IOException {
             SocketChannel channel = (SocketChannel)key.channel();
             ByteBuffer buffer = ByteBuffer.allocate(200);
             channel.read(buffer);
             byte[] data = buffer.array();
             String message = new String(data);
             System.out.println("recevie message from server:, size:" + buffer.position() + " msg: " + message);
        }
    

    AIO Socket编程

    1. AIO Socket编程支持2种方式,将来式和回调式,我演示的是回调式
    2. 参与的类比较多,但是职责单一,AcceptHandler监听客户的连接行为,ReadHandler监听客户的写入行为,WriteHandler没什么用,只是作为服务端向客户端写消息成功后,系统调用成功的反馈
    3. 因为服务端的行为很多都是要重复的,而回调函数都是一次性的,比如异步监听客户端的accept、read行为,所以每次回调之后都需要重复绑定

    服务端:

    • Server 服务创建和初始化AsynchronousServerSocketChannel,并且绑定AcceptHandler,监听客户的连接行为
    • AcceptHandler 监听到客户连接后,获取到客户的AsynchronousSocketChannel,然后对客户绑定ReadHandler
    • ReadHandler 监听客户往服务端发消息,读取到之后,会发起一个反馈消息,告知客户端已经收到你的消息,此时服务端写消息,需要绑定一个WriteHandler作为写入完成的反馈
    • WriteHandler 因为服务端要往客户端写反馈消息,该写入操作需要有个异步回调操作,WriteHandler 里面输出 “写入完成”
    public class Server {
        public final static int PORT = 8001;
        public final static String IP = "127.0.0.1";
        public static void main(String[] args) throws IOException {
            AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(IP,PORT));
            System.out.println("Server listen on "+PORT);
            server.accept(null,new AcceptHandler(server));
            while(true){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {
        final ByteBuffer buffer = ByteBuffer.allocate(1024);
        private AsynchronousServerSocketChannel server = null;
        AcceptHandler(AsynchronousServerSocketChannel server){
            this.server=server;
        }
    
        @Override
        public void completed(AsynchronousSocketChannel socket, Server attachment) {
            server.accept(null,this);
            try {
                System.out.println("有客户端连接:" +  socket.getRemoteAddress().toString());
            
            } catch (IOException e1) {
                e1.printStackTrace();
            }  
            startRead(socket);
        }
    
    
        @Override
        public void failed(Throwable exc, Server attachment) {}
        
         public void startRead(AsynchronousSocketChannel socket) {   
                ByteBuffer clientBuffer = ByteBuffer.allocate(1024);   
                ReadHandler rd=new ReadHandler(socket);  
                socket.read(clientBuffer, clientBuffer, rd);   
                try {               
                } catch (Exception e) {   
                    e.printStackTrace();   
                }   
            }   
    }
    
    public class ReadHandler implements CompletionHandler<Integer,ByteBuffer>{   
        private AsynchronousSocketChannel socket;   
        public  String msg;  
        public ReadHandler(AsynchronousSocketChannel socket) {   
            this.socket = socket;   
        }       
        private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();    
        
        //回调函数里面的buf参数,就是Client写的内容,调用 msg=decoder.decode(buf).toString(); 读取客户端的内容
        @Override  
        public void completed(Integer i, ByteBuffer buf) {   
            if (i > 0) {   
                socket.read(buf, buf, this);
                buf.flip();   
                try {   
                    msg=decoder.decode(buf).toString();  
                    System.out.println("收到" +socket.getRemoteAddress().toString() + "的消息:" + msg);   
                    buf.compact();   
                } catch (CharacterCodingException e) {   
                    e.printStackTrace();   
                } catch (IOException e) {   
                    e.printStackTrace();   
                }   
                   
                try {  
                    write(socket);  
                } catch (UnsupportedEncodingException ex) {  
                    Logger.getLogger(ReadHandler.class.getName()).log(Level.SEVERE, null, ex);  
                }  
            } else if (i == -1) {   
                try {   
                    System.out.println("客户端断线:" + socket.getRemoteAddress().toString());   
                    buf = null;   
                } catch (IOException e) {   
                    e.printStackTrace();   
                }   
            }   
        }   
      
        @Override  
        public void failed(Throwable exc, ByteBuffer attachment) { }  
         
        public void write(AsynchronousSocketChannel socket) throws UnsupportedEncodingException{  
            String sendString="server recieve your message:"+msg;  
            ByteBuffer clientBuffer=ByteBuffer.wrap(sendString.getBytes("UTF-8"));   
            socket.write(clientBuffer, clientBuffer, new WriteHandler(socket));  
        }  
    }  
    
    public class WriteHandler implements CompletionHandler<Integer,ByteBuffer>  {   
        private AsynchronousSocketChannel socket;   
        public WriteHandler(AsynchronousSocketChannel socket) {   
            this.socket = socket;   
        }   
        @Override  
        public void completed(Integer i, ByteBuffer buf) {  
            if (i > 0) {   
                System.out.println("往客户端发送消息成功");
            } else if (i == -1) {   
                try {   
                    System.out.println("对端断线:" + socket.getRemoteAddress().toString());   
                    buf = null;   
                } catch (IOException e) {   
                    e.printStackTrace();   
                }   
            }   
        }  
        @Override  
        public void failed(Throwable exc, ByteBuffer attachment) {}   
    }  
    

    客户端

    相对来说比较简单,直接用匿名内部类来解决,connect事件绑定一个回调,连接成功后写入一段话,写入成功后,再绑一个read的监听,来接受客户端的消息

        public static void main(String[] args) throws IOException {
            final AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
            InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1",8001);
            CompletionHandler<Void, ? super Object> handler = new CompletionHandler<Void,Object>(){
                @Override
                public void completed(Void result, Object attachment) {
                    client.write(ByteBuffer.wrap("Hello".getBytes()),null, 
                            new CompletionHandler<Integer,Object>(){
                                @Override
                                public void completed(Integer result,Object attachment) {
                                    final ByteBuffer buffer = ByteBuffer.allocate(1024);
                                    client.read(buffer,buffer,new CompletionHandler<Integer,ByteBuffer>(){
                                        @Override
                                        public void completed(Integer result,
                                                ByteBuffer attachment) {
                                            buffer.flip();
                                            System.out.println(new String(buffer.array()));
                                            try {
                                                client.close();
                                            } catch (IOException e) {
                                                e.printStackTrace();
                                            }
                                        }
                                        @Override
                                        public void failed(Throwable exc,ByteBuffer attachment) {}
                                    });
                                }
                                @Override
                                public void failed(Throwable exc, Object attachment) {}
                    });
                }
                @Override
                public void failed(Throwable exc, Object attachment) {}
            };
            client.connect(serverAddress, null, handler);
        }
    

    相关文章

      网友评论

          本文标题:Socket编程

          本文链接:https://www.haomeiwen.com/subject/jnryeqtx.html