美文网首页
NIO基础系列(二):Android聊天室

NIO基础系列(二):Android聊天室

作者: 维特无忧堡 | 来源:发表于2018-07-02 19:44 被阅读0次

前言

在上一篇中我简略的讲了一下NIO的执行流程,和用法,一些比较常见的写法,其他具体的概念你可以去网上搜,应该有一大把

概述

这个聊天室我使用Idea+AS写的,整个功能非常简单
1、客户端模块的主要功能:

  • 登陆功能:用户可以注册,然后选择服务器登入聊天室。
  • 显示用户:将在线用户显示在列表中。
  • 接收信息:能接收其他用户发出的信息。
  • 发送信息:能发出用户要发出的信息。

2、服务器端模块的主要功能:

  • 检验登陆信息:检查登陆信息是否正确,并向客户端返回登陆信息,如
    信息正确。就允许用户登陆。
  • 显示在线状态:将该用户的状态发给各在线用户。
  • 转发聊天信息:将消息转发给所有在线的用户。

设计思想

  借助 NIO 来实现 TCP 编程,即通过 selector(选择器)----相当于一个管家,管理所有的IO事件: (例如客户端的连接事件,服务端的 accept事件,客户端与服务端的读写等等,当 IO 事件注册给我们的选择器的时候 ,选择器会给给他们分配一个 key 值(可以理解成一个事件的标签),当 IO 事件完成(就绪)后,会通过 Key 值找到相应的管道,然后通过管道发送数据和接受数据等操作。
  NIO 使用的是 Linux 的多路复用技术 (select 模式):把读写事件交给一个单独的线程来处理,这个线程完成 IO 事件 的注册功能,还有不断的去轮询我们的读写缓存区,看是否有数据准备好,准备好的话就通知相应的读写事件(线程),这样的话以前的读写线程就可以做其他的事,这个阻塞的不是所有的 IO线程 阻塞的是 select 这个线程。

服务端代码

public class ChatServer implements Runnable{
    private Selector selector;
    private SelectionKey selectionKey;
    private Map<SocketChannel,String> all;
    private  final int PORT = 12123;
    private  final int BLOCK_SIZE = 2048;
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    public ChatServer(){
        init();
    }
    /**
     * 服务端初始化操作
     */
    private void init(){
        all = new HashMap <>(40);
        try {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            ServerSocket serverSocket = serverSocketChannel.socket();
            serverSocket.bind(new InetSocketAddress(PORT));//绑定端口
            serverSocketChannel.configureBlocking(false);//设置非阻塞
            selector = Selector.open();//通过默认的SelectorProvider对象获取一个新的实例
            selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); //把服务端channel注册到选择器
            System.out.println("Server start :port at "+PORT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 不断的监听客户端的连接,轮询选择器
     */
    @Override
    public void run() {
        while (true){
            try {
                int count = selector.select();  //获取就绪channel
                if (count == 0)  continue;
                Iterator<SelectionKey> iterator =  selector.selectedKeys().iterator();
                while(iterator.hasNext()){
                        SelectionKey selectionKey = iterator.next();
                        handleKey(selectionKey);
                        iterator.remove();  //把事件已经出去来了,然后把他删除掉
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    private void handleKey(SelectionKey key) throws IOException {
        ServerSocketChannel server = null;
        if (key.isAcceptable()){
            server = (ServerSocketChannel) key.channel();
            accept(server);
        }
        if (key.isValid() && key.isReadable()){
           readMsg(key);
        }
        if(key.isValid() && key.isWritable()){
            writeMsg(key);
        }
    }
    private void accept(ServerSocketChannel serverSocketChannel) throws IOException{
        //接受socket
        SocketChannel socket = serverSocketChannel.accept();
        socket.configureBlocking(false);
        //将channel 注册到selector中,并一开始读取数据(注册读事件)
        socket.register(selector, SelectionKey.OP_READ);
        System.out.println("yes connected !");
    }
    private void readMsg(SelectionKey key){
        SocketChannel client = null;
        try {
            client = (SocketChannel) key.channel();
            //设置buffer缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(BLOCK_SIZE);
            //将数据读到缓冲区,然后把缓冲区的数据取出来
            int readByte = client.read(buffer);
            StringBuffer buf = new StringBuffer();
            //如果读取到了数据
            System.out.println("size = "+readByte);
            if(readByte > 0){
                buf.append(new String(buffer.array(), 0, readByte));
            } else if (readByte == -1){//每次下线走的都是这里,就是当客户端close之后,不是应该抛出异常吗
                close(key);
            }
            String msg = buf.toString();
            ResponseCode code = (ResponseCode) JsonUtil.JsonToObject(msg,ResponseCode.class);
            if (code == null)  return;
            switch (code.getCode()){
                case Code.Chat_Client_Connect:
                    all.put(client,code.getMsg());
                    Iterator<SelectionKey> iter = selector.keys().iterator();
                    while(iter.hasNext()){//通知所有人某某人上线了
                        SelectionKey skey = iter.next();
                        if(skey != key && skey.channel() instanceof SocketChannel){
                            ResponseCode curCode = new ResponseCode();
                            curCode.setCode(Code.Chat_Server_PeopOnline);
                            curCode.setMsg(code.getMsg());
                            skey.attach(JsonUtil.ObjectToJson(curCode));
                            skey.interestOps(skey.interestOps() | SelectionKey.OP_WRITE);
                        }
                    }
                    System.out.println(new String(code.getMsg().getBytes("utf-8"))+" is online");
                    break;
                case Code.Chat_Client_Exit:
                    all.remove(client);
                    break;
                case Code.Chat_Client_Speak_All:
                    String dateTime = sdf.format(new Date());
                    Iterator<SelectionKey> iter2 = selector.keys().iterator();
                    while(iter2.hasNext()){
                        SelectionKey sKey = iter2.next();
                        if (sKey == key) continue;
                        if (sKey.channel() instanceof SocketChannel){
                            ResponseCode curCode = new ResponseCode();
                            curCode.setCode(Code.Chat_Server_Speak);
                            curCode.setMsg(code.getMsg());
                            sKey.attach(JsonUtil.ObjectToJson(curCode));
                            sKey.interestOps(sKey.interestOps() | SelectionKey.OP_WRITE);//每个管道加上写操作
                        }
                    }
                    break;
                case Code.Chat_Client_OnlineNum:
                    ByteBuffer buffer1 = ByteBuffer.allocate(BLOCK_SIZE);
                    ResponseCode curCode = new ResponseCode();
                    curCode.setCode(Code.Chat_Server_OnelineNum);
                    curCode.setMsg(code.getMsg());
                    List<String> names = new ArrayList<>();
                    for (Map.Entry<SocketChannel, String> entry : all.entrySet()) {
                        names.add(entry.getValue());
                    }
                    curCode.setNickNames(names);
                    key.attach(JsonUtil.ObjectToJson(curCode));
                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
                    buffer.clear();
            }
        } catch (IOException e) {
            //当客户端关闭channel时,服务端再往通道缓冲区中写或读数据,都会报IOException,解决方法是:在服务端这里捕获掉这个异常,并且关闭掉服务端这边的Channel通道
            close(key);
        }
    }
    private void close(SelectionKey key){
        key.cancel();
        SocketChannel client = (SocketChannel) key.channel();
        String name = all.get(client);
        all.remove(client);
        System.out.println(name+"下线啦");
        try {
            client.socket().close();
            client.close();
        } catch (IOException e1) {
            e1.printStackTrace();
        }
    }
    private void writeMsg(SelectionKey key,String msg) {
        try {
            SocketChannel channel = (SocketChannel) key.channel();
            byte[] responseByte = msg.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(BLOCK_SIZE);
            writeBuffer.put(responseByte);
            writeBuffer.flip();
            channel.write(writeBuffer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private void writeMsg(SelectionKey key) {
        try {
            SocketChannel channel = (SocketChannel) key.channel();
            Object attachment = key.attachment();
            key.attach("");
            System.out.println(attachment.toString());
            channel.write(ByteBuffer.wrap(attachment.toString().getBytes()));
            key.interestOps(SelectionKey.OP_READ);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //NotYetConnectedException
}

客户端代码

package com.example.visioneh.englishhelper.socket;

import android.util.Log;

import com.example.visioneh.englishhelper.bean.Code;
import com.example.visioneh.englishhelper.bean.ResponseCode;
import com.example.visioneh.englishhelper.bean.User;
import com.example.visioneh.englishhelper.util.JsonUtil;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * Created by 62588 on 2018/6/23.
 */

public class ChatClient {
    private InetSocketAddress address;
    private SocketChannel channel;
    private Selector selector;
    private Charset charset = Charset.forName("UTF-8");
    private static final int SIZE = 4096;
    private OnReceive listener;
    private String name;
    private volatile boolean isconnect = true;
    public  interface OnReceive {
        void OnLoginSuccess();
        void OnMsgReceiveListener(String msg);
        void OnlinePeopleReceiveListener(List<User> users);
        void OnPeopOnlineListener(String name);
    }
    public void setOnReceiveListener(OnReceive listener){
        this.listener = listener;
    }
    public ChatClient(String ipaddr, int port,String name) throws IOException {

        address = new InetSocketAddress(ipaddr,port);
        channel = SocketChannel.open();
        channel.configureBlocking(false);
        boolean isconn = channel.connect(address);
        Log.d("chat",isconn+"  - -- - - -- ");
        selector = Selector.open();
        channel.register(selector, SelectionKey.OP_CONNECT);
        new Thread(new ClientThread()).start();

        this.name = name;
    }
    public void send(final String msg){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try{
                    channel.write(ByteBuffer.wrap(msg.getBytes()));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
    public void close() throws IOException {
      /*  ResponseCode code = new ResponseCode();
        code.setCode(Code.Chat_Client_Exit);
        code.setMsg(name);
        send(JsonUtil.ObjectToJson(code));*/
        isconnect = false;
        if(channel != null){
               channel.socket().close();
        }
        if(selector != null){
               selector.close();
        }
    }
    private class ClientThread implements Runnable{
        @Override
        public void run() {
            while (isconnect){
                if (selector == null) break;
                try {
                    int count = selector.select();  //获取就绪channel
                    if (count == 0)  continue;
                    Iterator<SelectionKey> iterator =  selector.selectedKeys().iterator();
                    while (iterator.hasNext()){
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();
                        handleKey(selectionKey);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * @param key
     * @throws IOException
     *
     */
    private void handleKey(SelectionKey key) throws IOException {
        if (key.isConnectable()){
            try {
                channel.finishConnect();//如果是非阻塞模式,可能要循环询问
            } catch (Exception e) {}
            key.interestOps(SelectionKey.OP_READ);
            ResponseCode code = new ResponseCode();
            code.setCode(Code.Chat_Client_Connect);
            code.setMsg(name);
            send(JsonUtil.ObjectToJson(code));
            if (listener != null) {
                listener.OnLoginSuccess();
            }
        }
        if (key.isValid() && key.isReadable()){
            readMsg(key);
        }
    }
    private void readMsg(SelectionKey key) {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(SIZE);
        StringBuilder sb = new StringBuilder();
        try {

            int readByte = channel.read(buffer);
            StringBuffer buf = new StringBuffer();

            if(readByte > 0){
                buf.append(new String(buffer.array(), 0, readByte));
            }
            String content = buf.toString();
            ResponseCode code = (ResponseCode) JsonUtil.JsonToObject(content,ResponseCode.class);
            if (code == null)
                return ;
            if (listener != null){
                if (code.getCode() == Code.Chat_Server_Speak){
                    listener.OnMsgReceiveListener(code.getMsg());
                } else if (code.getCode() == Code.Chat_Server_OnelineNum){
                    int size = code.getNickNames().size();
                    List<User> users = new ArrayList<>();
                    for (int i = 0; i < size; i++){
                        User user = new User();
                        user.setName(code.getNickNames().get(i));
                        users.add(user);
                    }
                    Log.d("tttttttt",users.toString());
                    listener.OnlinePeopleReceiveListener(users);
                } else if (code.getCode() == Code.Chat_Server_PeopOnline){
                    listener.OnPeopOnlineListener(code.getMsg());
                }
            }
            //key.interestOps(SelectionKey.OP_READ);
        }
        catch (IOException e) {
            key.cancel();
            if (key.channel() != null) {
                try {
                    key.channel().close();
                }
                catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
    }
}
package com.example.visioneh.englishhelper.frag;

import android.content.Context;
import android.support.v7.widget.RecyclerView;
import android.view.LayoutInflater;
import android.view.View;
import android.view.ViewGroup;
import android.widget.ImageView;
import android.widget.TextView;

import com.example.visioneh.englishhelper.R;
import com.example.visioneh.englishhelper.bean.ChatMsg;
import com.example.visioneh.englishhelper.bean.Word;

import java.util.List;

/**
 * Created by 62588 on 2018/6/24.
 */

public class ChatRoomAdapter extends RecyclerView.Adapter<RecyclerView.ViewHolder> {

    private LayoutInflater inflater;
    private List<ChatMsg> mlists;
    public interface onItemClickListener{
        void onItemClick(View view , int position);
        void onLongClick(View view ,int position);
    }
    public RecycleAdapter.onItemClickListener onItemClickListener;

    public void setOnItemClickListener(RecycleAdapter.onItemClickListener onItemClickListener) {
        this.onItemClickListener = onItemClickListener;
    }

    public ChatRoomAdapter(Context context, List<ChatMsg> mlists) {
        this.inflater = LayoutInflater.from(context);
        this.mlists = mlists;
    }

    @Override
    public int getItemViewType(int position) {
       if (mlists.get(position).isown()){
           return 1;
       } else {
           return 0;
       }
    }

    @Override
    public RecyclerView.ViewHolder onCreateViewHolder(ViewGroup parent, int viewType) {
        View view ;
        if (viewType == 0){
             view=inflater.inflate(R.layout.chat_item_other,parent,false);
        } else {
             view=inflater.inflate(R.layout.chat_item_me,parent,false);
        }
        return new OtherItemHolder(view);
    }

    @Override
    public void onBindViewHolder(final RecyclerView.ViewHolder holder, final int position) {
        ChatMsg word=mlists.get(position);
        OtherItemHolder itemViewHolder= (OtherItemHolder) holder;
        itemViewHolder.msg.setText(word.getMsg());
    }
    @Override
    public int getItemCount() {
        return mlists.size();
    }
    class  OtherItemHolder extends RecyclerView.ViewHolder{
        public TextView msg;
        public OtherItemHolder(View itemView) {
            super(itemView);
            msg = (TextView)itemView.findViewById(R.id.chat_msg_other);
        }
    }
}

显示效果

image.png
image.png image.png image.png

遇到的问题

1、client连接报错

分析与解决问题: 关键就出在下面这个代码身上

channel.configureBlocking(false);
boolean isconn = channel.connect(address);
因为我设置了非阻塞,所以当connect执行之后并不能代表已经连接上了,所以在执行下列代码的时候就会有错误

 if (key.isConnectable()){
            key.interestOps(SelectionKey.OP_READ);
            ResponseCode code = new ResponseCode();
            code.setCode(Code.Chat_Client_Connect);
            code.setMsg(name);
            send(JsonUtil.ObjectToJson(code));
            if (listener != null) {
                listener.OnLoginSuccess();
            }
        }

虽然已经注册了connect事件,但没有连接完全,所以要加上一个阻塞

try {
channel.finishConnect();//如果是非阻塞模式,可能要循环询问
} catch (Exception e) {}
这样就OK了

2、client下线异常

分析与解决问题: 按照我原本的想法,当客户端关闭之后,服务端进行操作的时候就会出现异常,然后我们捕捉到这个异常,就进行下线处理就行了,没想到一直不出现异常,很奇怪的是它一直在注册读事件,所以我们一直获取到读时间,一个无限循环(到现在我还没弄清楚为什么一直注册读事件,真的奇怪),
我输出了它的readByte,是-1,所以只要判断是-1的话就做下线处理,并close

3、client 退出后 Selector还在不断轮寻,所以会报错的。

分析与解决问题:这时候我们不要把while的条件设置为true,应该设置为一个bool类型的变量,当退出之后设置为false,另外因为是子线程在轮询,所以你要把这个变量设置为volatile类型还保证它的可见性

4、并发量高的时候就会出现粘包与拆包的问题,这也是没办法的事情,我没有做处理

所以还是用框架好啊,下次学着弄一下Netty框架怎么用的,看看它的源码,看看这个问题是怎么解决的

总结

总的来说这个NIO写起来很复杂哎,我反正是感觉不太好用,代码量真的多,它的源码我还没看,所以有些问题我还找不到原因,就很烦,比如说那个close之后为什么一直循环注册read事件呢?如果测试的话还会有各种问题,哎!还是框架好,抽空研究一下Netty是怎么回事,封装大法好哇!等我弄清楚了再写个详解。

相关文章

网友评论

      本文标题:NIO基础系列(二):Android聊天室

      本文链接:https://www.haomeiwen.com/subject/zboluftx.html