前言
在上一篇中我简略的讲了一下NIO的执行流程,和用法,一些比较常见的写法,其他具体的概念你可以去网上搜,应该有一大把
概述
这个聊天室我使用Idea+AS写的,整个功能非常简单
1、客户端模块的主要功能:
- 登陆功能:用户可以注册,然后选择服务器登入聊天室。
- 显示用户:将在线用户显示在列表中。
- 接收信息:能接收其他用户发出的信息。
- 发送信息:能发出用户要发出的信息。
2、服务器端模块的主要功能:
- 检验登陆信息:检查登陆信息是否正确,并向客户端返回登陆信息,如
信息正确。就允许用户登陆。 - 显示在线状态:将该用户的状态发给各在线用户。
- 转发聊天信息:将消息转发给所有在线的用户。
设计思想
借助 NIO 来实现 TCP 编程,即通过 selector(选择器)----相当于一个管家,管理所有的IO事件: (例如客户端的连接事件,服务端的 accept事件,客户端与服务端的读写等等,当 IO 事件注册给我们的选择器的时候 ,选择器会给给他们分配一个 key 值(可以理解成一个事件的标签),当 IO 事件完成(就绪)后,会通过 Key 值找到相应的管道,然后通过管道发送数据和接受数据等操作。
NIO 使用的是 Linux 的多路复用技术 (select 模式):把读写事件交给一个单独的线程来处理,这个线程完成 IO 事件 的注册功能,还有不断的去轮询我们的读写缓存区,看是否有数据准备好,准备好的话就通知相应的读写事件(线程),这样的话以前的读写线程就可以做其他的事,这个阻塞的不是所有的 IO线程 阻塞的是 select 这个线程。
服务端代码
public class ChatServer implements Runnable{
private Selector selector;
private SelectionKey selectionKey;
private Map<SocketChannel,String> all;
private final int PORT = 12123;
private final int BLOCK_SIZE = 2048;
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public ChatServer(){
init();
}
/**
* 服务端初始化操作
*/
private void init(){
all = new HashMap <>(40);
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(PORT));//绑定端口
serverSocketChannel.configureBlocking(false);//设置非阻塞
selector = Selector.open();//通过默认的SelectorProvider对象获取一个新的实例
selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); //把服务端channel注册到选择器
System.out.println("Server start :port at "+PORT);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 不断的监听客户端的连接,轮询选择器
*/
@Override
public void run() {
while (true){
try {
int count = selector.select(); //获取就绪channel
if (count == 0) continue;
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
handleKey(selectionKey);
iterator.remove(); //把事件已经出去来了,然后把他删除掉
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleKey(SelectionKey key) throws IOException {
ServerSocketChannel server = null;
if (key.isAcceptable()){
server = (ServerSocketChannel) key.channel();
accept(server);
}
if (key.isValid() && key.isReadable()){
readMsg(key);
}
if(key.isValid() && key.isWritable()){
writeMsg(key);
}
}
private void accept(ServerSocketChannel serverSocketChannel) throws IOException{
//接受socket
SocketChannel socket = serverSocketChannel.accept();
socket.configureBlocking(false);
//将channel 注册到selector中,并一开始读取数据(注册读事件)
socket.register(selector, SelectionKey.OP_READ);
System.out.println("yes connected !");
}
private void readMsg(SelectionKey key){
SocketChannel client = null;
try {
client = (SocketChannel) key.channel();
//设置buffer缓冲区
ByteBuffer buffer = ByteBuffer.allocate(BLOCK_SIZE);
//将数据读到缓冲区,然后把缓冲区的数据取出来
int readByte = client.read(buffer);
StringBuffer buf = new StringBuffer();
//如果读取到了数据
System.out.println("size = "+readByte);
if(readByte > 0){
buf.append(new String(buffer.array(), 0, readByte));
} else if (readByte == -1){//每次下线走的都是这里,就是当客户端close之后,不是应该抛出异常吗
close(key);
}
String msg = buf.toString();
ResponseCode code = (ResponseCode) JsonUtil.JsonToObject(msg,ResponseCode.class);
if (code == null) return;
switch (code.getCode()){
case Code.Chat_Client_Connect:
all.put(client,code.getMsg());
Iterator<SelectionKey> iter = selector.keys().iterator();
while(iter.hasNext()){//通知所有人某某人上线了
SelectionKey skey = iter.next();
if(skey != key && skey.channel() instanceof SocketChannel){
ResponseCode curCode = new ResponseCode();
curCode.setCode(Code.Chat_Server_PeopOnline);
curCode.setMsg(code.getMsg());
skey.attach(JsonUtil.ObjectToJson(curCode));
skey.interestOps(skey.interestOps() | SelectionKey.OP_WRITE);
}
}
System.out.println(new String(code.getMsg().getBytes("utf-8"))+" is online");
break;
case Code.Chat_Client_Exit:
all.remove(client);
break;
case Code.Chat_Client_Speak_All:
String dateTime = sdf.format(new Date());
Iterator<SelectionKey> iter2 = selector.keys().iterator();
while(iter2.hasNext()){
SelectionKey sKey = iter2.next();
if (sKey == key) continue;
if (sKey.channel() instanceof SocketChannel){
ResponseCode curCode = new ResponseCode();
curCode.setCode(Code.Chat_Server_Speak);
curCode.setMsg(code.getMsg());
sKey.attach(JsonUtil.ObjectToJson(curCode));
sKey.interestOps(sKey.interestOps() | SelectionKey.OP_WRITE);//每个管道加上写操作
}
}
break;
case Code.Chat_Client_OnlineNum:
ByteBuffer buffer1 = ByteBuffer.allocate(BLOCK_SIZE);
ResponseCode curCode = new ResponseCode();
curCode.setCode(Code.Chat_Server_OnelineNum);
curCode.setMsg(code.getMsg());
List<String> names = new ArrayList<>();
for (Map.Entry<SocketChannel, String> entry : all.entrySet()) {
names.add(entry.getValue());
}
curCode.setNickNames(names);
key.attach(JsonUtil.ObjectToJson(curCode));
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
buffer.clear();
}
} catch (IOException e) {
//当客户端关闭channel时,服务端再往通道缓冲区中写或读数据,都会报IOException,解决方法是:在服务端这里捕获掉这个异常,并且关闭掉服务端这边的Channel通道
close(key);
}
}
private void close(SelectionKey key){
key.cancel();
SocketChannel client = (SocketChannel) key.channel();
String name = all.get(client);
all.remove(client);
System.out.println(name+"下线啦");
try {
client.socket().close();
client.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
private void writeMsg(SelectionKey key,String msg) {
try {
SocketChannel channel = (SocketChannel) key.channel();
byte[] responseByte = msg.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(BLOCK_SIZE);
writeBuffer.put(responseByte);
writeBuffer.flip();
channel.write(writeBuffer);
} catch (Exception e) {
e.printStackTrace();
}
}
private void writeMsg(SelectionKey key) {
try {
SocketChannel channel = (SocketChannel) key.channel();
Object attachment = key.attachment();
key.attach("");
System.out.println(attachment.toString());
channel.write(ByteBuffer.wrap(attachment.toString().getBytes()));
key.interestOps(SelectionKey.OP_READ);
} catch (Exception e) {
e.printStackTrace();
}
}
//NotYetConnectedException
}
客户端代码
package com.example.visioneh.englishhelper.socket;
import android.util.Log;
import com.example.visioneh.englishhelper.bean.Code;
import com.example.visioneh.englishhelper.bean.ResponseCode;
import com.example.visioneh.englishhelper.bean.User;
import com.example.visioneh.englishhelper.util.JsonUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* Created by 62588 on 2018/6/23.
*/
public class ChatClient {
private InetSocketAddress address;
private SocketChannel channel;
private Selector selector;
private Charset charset = Charset.forName("UTF-8");
private static final int SIZE = 4096;
private OnReceive listener;
private String name;
private volatile boolean isconnect = true;
public interface OnReceive {
void OnLoginSuccess();
void OnMsgReceiveListener(String msg);
void OnlinePeopleReceiveListener(List<User> users);
void OnPeopOnlineListener(String name);
}
public void setOnReceiveListener(OnReceive listener){
this.listener = listener;
}
public ChatClient(String ipaddr, int port,String name) throws IOException {
address = new InetSocketAddress(ipaddr,port);
channel = SocketChannel.open();
channel.configureBlocking(false);
boolean isconn = channel.connect(address);
Log.d("chat",isconn+" - -- - - -- ");
selector = Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT);
new Thread(new ClientThread()).start();
this.name = name;
}
public void send(final String msg){
new Thread(new Runnable() {
@Override
public void run() {
try{
channel.write(ByteBuffer.wrap(msg.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
public void close() throws IOException {
/* ResponseCode code = new ResponseCode();
code.setCode(Code.Chat_Client_Exit);
code.setMsg(name);
send(JsonUtil.ObjectToJson(code));*/
isconnect = false;
if(channel != null){
channel.socket().close();
}
if(selector != null){
selector.close();
}
}
private class ClientThread implements Runnable{
@Override
public void run() {
while (isconnect){
if (selector == null) break;
try {
int count = selector.select(); //获取就绪channel
if (count == 0) continue;
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
iterator.remove();
handleKey(selectionKey);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* @param key
* @throws IOException
*
*/
private void handleKey(SelectionKey key) throws IOException {
if (key.isConnectable()){
try {
channel.finishConnect();//如果是非阻塞模式,可能要循环询问
} catch (Exception e) {}
key.interestOps(SelectionKey.OP_READ);
ResponseCode code = new ResponseCode();
code.setCode(Code.Chat_Client_Connect);
code.setMsg(name);
send(JsonUtil.ObjectToJson(code));
if (listener != null) {
listener.OnLoginSuccess();
}
}
if (key.isValid() && key.isReadable()){
readMsg(key);
}
}
private void readMsg(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
StringBuilder sb = new StringBuilder();
try {
int readByte = channel.read(buffer);
StringBuffer buf = new StringBuffer();
if(readByte > 0){
buf.append(new String(buffer.array(), 0, readByte));
}
String content = buf.toString();
ResponseCode code = (ResponseCode) JsonUtil.JsonToObject(content,ResponseCode.class);
if (code == null)
return ;
if (listener != null){
if (code.getCode() == Code.Chat_Server_Speak){
listener.OnMsgReceiveListener(code.getMsg());
} else if (code.getCode() == Code.Chat_Server_OnelineNum){
int size = code.getNickNames().size();
List<User> users = new ArrayList<>();
for (int i = 0; i < size; i++){
User user = new User();
user.setName(code.getNickNames().get(i));
users.add(user);
}
Log.d("tttttttt",users.toString());
listener.OnlinePeopleReceiveListener(users);
} else if (code.getCode() == Code.Chat_Server_PeopOnline){
listener.OnPeopOnlineListener(code.getMsg());
}
}
//key.interestOps(SelectionKey.OP_READ);
}
catch (IOException e) {
key.cancel();
if (key.channel() != null) {
try {
key.channel().close();
}
catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
package com.example.visioneh.englishhelper.frag;
import android.content.Context;
import android.support.v7.widget.RecyclerView;
import android.view.LayoutInflater;
import android.view.View;
import android.view.ViewGroup;
import android.widget.ImageView;
import android.widget.TextView;
import com.example.visioneh.englishhelper.R;
import com.example.visioneh.englishhelper.bean.ChatMsg;
import com.example.visioneh.englishhelper.bean.Word;
import java.util.List;
/**
* Created by 62588 on 2018/6/24.
*/
public class ChatRoomAdapter extends RecyclerView.Adapter<RecyclerView.ViewHolder> {
private LayoutInflater inflater;
private List<ChatMsg> mlists;
public interface onItemClickListener{
void onItemClick(View view , int position);
void onLongClick(View view ,int position);
}
public RecycleAdapter.onItemClickListener onItemClickListener;
public void setOnItemClickListener(RecycleAdapter.onItemClickListener onItemClickListener) {
this.onItemClickListener = onItemClickListener;
}
public ChatRoomAdapter(Context context, List<ChatMsg> mlists) {
this.inflater = LayoutInflater.from(context);
this.mlists = mlists;
}
@Override
public int getItemViewType(int position) {
if (mlists.get(position).isown()){
return 1;
} else {
return 0;
}
}
@Override
public RecyclerView.ViewHolder onCreateViewHolder(ViewGroup parent, int viewType) {
View view ;
if (viewType == 0){
view=inflater.inflate(R.layout.chat_item_other,parent,false);
} else {
view=inflater.inflate(R.layout.chat_item_me,parent,false);
}
return new OtherItemHolder(view);
}
@Override
public void onBindViewHolder(final RecyclerView.ViewHolder holder, final int position) {
ChatMsg word=mlists.get(position);
OtherItemHolder itemViewHolder= (OtherItemHolder) holder;
itemViewHolder.msg.setText(word.getMsg());
}
@Override
public int getItemCount() {
return mlists.size();
}
class OtherItemHolder extends RecyclerView.ViewHolder{
public TextView msg;
public OtherItemHolder(View itemView) {
super(itemView);
msg = (TextView)itemView.findViewById(R.id.chat_msg_other);
}
}
}
显示效果
image.pngimage.png image.png image.png
遇到的问题
1、client连接报错
分析与解决问题: 关键就出在下面这个代码身上
channel.configureBlocking(false);
boolean isconn = channel.connect(address);
因为我设置了非阻塞,所以当connect执行之后并不能代表已经连接上了,所以在执行下列代码的时候就会有错误
if (key.isConnectable()){
key.interestOps(SelectionKey.OP_READ);
ResponseCode code = new ResponseCode();
code.setCode(Code.Chat_Client_Connect);
code.setMsg(name);
send(JsonUtil.ObjectToJson(code));
if (listener != null) {
listener.OnLoginSuccess();
}
}
虽然已经注册了connect事件,但没有连接完全,所以要加上一个阻塞
try {
channel.finishConnect();//如果是非阻塞模式,可能要循环询问
} catch (Exception e) {}
这样就OK了
2、client下线异常
分析与解决问题: 按照我原本的想法,当客户端关闭之后,服务端进行操作的时候就会出现异常,然后我们捕捉到这个异常,就进行下线处理就行了,没想到一直不出现异常,很奇怪的是它一直在注册读事件,所以我们一直获取到读时间,一个无限循环(到现在我还没弄清楚为什么一直注册读事件,真的奇怪),
我输出了它的readByte,是-1,所以只要判断是-1的话就做下线处理,并close
3、client 退出后 Selector还在不断轮寻,所以会报错的。
分析与解决问题:这时候我们不要把while的条件设置为true,应该设置为一个bool类型的变量,当退出之后设置为false,另外因为是子线程在轮询,所以你要把这个变量设置为volatile类型还保证它的可见性
4、并发量高的时候就会出现粘包与拆包的问题,这也是没办法的事情,我没有做处理
所以还是用框架好啊,下次学着弄一下Netty框架怎么用的,看看它的源码,看看这个问题是怎么解决的
总结
总的来说这个NIO写起来很复杂哎,我反正是感觉不太好用,代码量真的多,它的源码我还没看,所以有些问题我还找不到原因,就很烦,比如说那个close之后为什么一直循环注册read事件呢?如果测试的话还会有各种问题,哎!还是框架好,抽空研究一下Netty是怎么回事,封装大法好哇!等我弄清楚了再写个详解。
网友评论