Socket通常也称作套接字,用于描述IP地址和端口,是一个通信链的句柄,可以用来实现不同虚拟机或不同计算机之间的通信。之前接到一个项目,需要通过TCP/UDP协议与服务端进行通信,并且服务端已经完成,使用的是Mina框架。既然服务端用了Mina,那客户端用Mina肯定也更方便了,但是本人对Mina框架完全没有接触过,所以花费1天时间对其整体流程进行了大致的了解,正巧项目因为各种原因难产了,就正好有时间把心得记录一下,方便日后使用
本篇文章旨在教会大家基本使用,不做深入了解,高手可以直接绕过
代码已经放在在Github上
Mina是什么
Apache Mina是一个能够帮助用户开发高性能和高伸缩性网络应用程序的框架。它通过Java nio技术基于TCP/IP和UDP/IP协议提供了抽象的、事件驱动的、异步的API。
常用的客户端与服务端通讯的框架还有Smack与Netty等,其中Mina是Netty的前辈
Mina的TCP的主要基本类
IoService 既是服务端又是客户端,为大部分Mina服务提供了底层的API支持

IoAccepter 服务端。与ServerSocket不同的是,IoAccepter可以多次调用Bind绑定方法,从而监听多个端口
IoConnector 客户端。它同样也可以连接到多个服务端
IoSession 当前客户端到服务器端的一个连接实例
IoHandler 这个接口是你编写业务逻辑的地方,读取数据、发送数据基本都在这个接口总完成。这个实例是绑定到 IoService 上的,有且只有一个实例(没有给一个IoService注入一个IoHandler实例会抛出异常)
IoFilter 过滤器,链式结构,与Struts2的过滤器是一个意思


运行环境
当前环境中使用的jar包分别为:
mina-core-2.0.7.jar
slf4j-api-1.6.6.jar
今天我们就来实现一个TCP/IP的通信协议DEMO。按照国际惯例,上代码
服务端代码
public static void main(String[] args) {
try {
// 创建一个非阻塞的server端的Socket
IoAcceptor acceptor = new NioSocketAcceptor();
// 设置过滤器(使用Mina提供的文本换行符编解码器)
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset .forName("UTF-8"),
LineDelimiter.WINDOWS.getValue(),
LineDelimiter.WINDOWS.getValue())));
// 设置读取数据的缓冲区大小
acceptor.getSessionConfig().setReadBufferSize(2048);
// 读写通道10秒内无操作进入空闲状态
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
acceptor.getFilterChain().addLast("keepalive", new KeepAliveFilter(new ServerKeepAliveMessageFactoryImp()));
// 绑定逻辑处理器
acceptor.setHandler(new Demo1ServerHandler());
// 绑定端口
acceptor.bind(new InetSocketAddress(4444));
System.out.println("服务端启动成功... 端口号为:" + 4444);
} catch (Exception e) {
System.out.println("服务端启动异常....");
e.printStackTrace();
}
}
- IoAcceptor刚才已经说够了,其实跟ServerSocket差不多。
- 随后设置过滤器IoFilter,你可以添加各种过滤器。这里是直接使用了Mina自己的文本换行符编解码工具类,你同样可以自己去定义这个,只要实现ProtocolCodecFactory接口完成getEncoder与getDecoder方法即可,这里就不深究了
- Mina提供了心跳检测功能,只要你通过KeepAliveFilter实现KeepAliveMessageFactory接口即可。这个还是要稍微说说的
public class ServerKeepAliveMessageFactoryImp implements KeepAliveMessageFactory {
@Override
public boolean isRequest(IoSession ioSession, Object o) {
if (o instanceof String && o.equals(Params.SEND)) {
return true;
}
return false;
}
@Override
public boolean isResponse(IoSession ioSession, Object o) {
if (o instanceof String && o.equals(Params.RECEIVE)) {
return true;
}
return false;
}
@Override
public Object getRequest(IoSession ioSession) {
return null;
}
@Override
public Object getResponse(IoSession ioSession, Object o) {
return Params.RECEIVE;
}
}
isRequest,只要是心跳发送包,不管是谁的都会回调
isResponse,只要是心跳响应包,不管事谁的都可以回调
getRequest,发心跳包
getResponse,收到心跳包,回发心跳包
因此作为服务端,我在这里处理逻辑是,只要能够收到心跳包,就给客户端发一个已经接收到的标志
- Demo1ServerHandler就是真正的逻辑处理部分,他继承自IoHandlerAdapter以实现IoHandler接口
public class Demo1ServerHandler extends IoHandlerAdapter {
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
String msg = message.toString();
if ("exit".equals(msg)) {
//如果客户端发来exit,则关闭该连接
session.close(true);
}
//向客户端发送消息
Date date = new Date();
session.write(date);
System.out.println("服务器接受消息成功..."+message.toString());
super.messageReceived(session, message);
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
System.out.println("服务器发送消息成功...");
super.messageSent(session, message);
}
@Override
public void sessionClosed(IoSession session) throws Exception {
System.out.println("服务器与客户端断开连接...");
super.sessionClosed(session);
}
@Override
public void sessionCreated(IoSession session) throws Exception {
System.out.println("服务器与客户端创建连接...");
super.sessionCreated(session);
}
@Override
public void sessionOpened(IoSession session) throws Exception {
System.out.println("服务器与客户端连接打开...");
super.sessionOpened(session);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
System.out.println("服务器进入空闲状态...");
super.sessionIdle(session, status);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
System.out.println("服务器发送异常...");
super.exceptionCaught(session, cause);
}
}
其实这个也是很好理解的,状态都一清二楚的。
(1)对于TCP连接来说,连接被接受的时候调用sessionCreated,但要注意此时TCP连接并建立,此方法仅代表字面义,也就是连接的对象IoSession被创建完毕的时候,回调这个方法。
(2)sessionOpened方法在连接被打开时调用,它总是在sessionCreated方法之后被调用。连接成功之后得到IoSession对象,可以使用这个对象进行一系列的操作比如write发送,或者close关闭连接。注意不要搞错这个生命周期
(3)sessionIdle方法在 IoSession的通道进入空闲状态时调用。
(4)exceptionCaught方法在你的程序、Mina自身出现异常时回调,一般这里是关闭IoSession
(5)接收到消息时调用messageReceived方法。一般情况下,message是一个IoBuffer类,如果你使用了协议编解码器,那么可以强制转换为你需要的类型。通常我们都是会使用协议编解码器的,就像上面的代码,因为协 议编解码器是TextLineCodecFactory,所以我们可以强制转message为String类型
- 最后IoAcceptor绑定端口号,等待成功的回调,服务端流程结束
服务端搭建完毕之后,可以使用telnet去测试
客户端
其实客户端差不多就是把服务端的代码照抄一遍。区别上基本是将IoAccepter变成IoConnector
首先是配置下manifest
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE"></uses-permission>
<uses-permission android:name="android.permission.INTERNET"></uses-permission>
传统的网络权限
有一点要注意的是,我们需要在子线程中完成与服务端的绑定工作,所以采用了IntentService去执行
public class PushService extends IntentService {
public PushService() {
super("PushService");
}
@Override
protected void onHandleIntent(Intent intent) {
PushManager.getInstance().connect();
}
}
再来看看PushManager的具体实现
public class PushManager {
private static volatile PushManager manager;
private static NioSocketConnector connector;
private static ConnectFuture connectFuture;
private static IoSession ioSession;
private PushManager() {
connector=new NioSocketConnector();
connector.setConnectTimeoutMillis(Params.CONNECT_TIMEOUT);
//为接收器设置管理服务
connector.setHandler(new ClientSessionHandler());
//设置过滤器(使用Mina提供的文本换行符编解码器)
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(),LineDelimiter.WINDOWS.getValue())));
//读写通道5秒内无操作进入空闲状态
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, Params.REQUEST_TIMEOUT);
//设置读取数据的缓冲区大小
connector.getSessionConfig().setReadBufferSize(2048);
//设置心跳
KeepAliveMessageFactory heartBeatFactory = new ClientKeepAliveMessageFactoryImp();
KeepAliveRequestTimeoutHandler heartBeatHandler = new ClientKeepAliveMessageTimeoutFactoryImp();
KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory, IdleStatus.BOTH_IDLE, heartBeatHandler);
//是否回发
heartBeat.setForwardEvent(true);
//心跳发送频率
heartBeat.setRequestInterval(Params.REQUEST_INTERVAL);
connector.getSessionConfig().setKeepAlive(true);
connector.getFilterChain().addLast("keepalive", heartBeat);
}
public static PushManager getInstance() {
if (manager==null) {
synchronized (PushManager.class) {
manager=new PushManager();
}
}
return manager;
}
/**
* 连接
* @return
*/
public boolean connect() {
if (connector!=null && connector.isActive() &&
connectFuture!=null && connectFuture.isConnected() &&
ioSession!=null && ioSession.isConnected()) {
return true;
}
try {
connectFuture=connector.connect(new InetSocketAddress(Params.HOSTNAME, Params.PORT));
//等待是否连接成功,相当于是转异步执行为同步执行。
connectFuture.awaitUninterruptibly();
//连接成功后获取会话对象。如果没有上面的等待,由于connect()方法是异步的,session 可能会无法获取。
ioSession=connectFuture.getSession();
sendMessage("Hello Client");
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 关闭
*/
public void close() {
if(ioSession!=null && ioSession.isConnected()) {
ioSession.close(false);
}
if(connectFuture!=null && connectFuture.isConnected()) {
connectFuture.cancel();
}
if(connector!=null && !connector.isDisposed()) {
connector.dispose();
}
}
/**
* 发送
* @param message
* @return
*/
public boolean sendMessage(String message) {
if (ioSession==null || !ioSession.isConnected()) {
return false;
}
WriteFuture writeFuture=ioSession.write(message);
if (writeFuture==null) {
return false;
}
writeFuture.awaitUninterruptibly();
if (writeFuture.isWritten()) {
return true;
}
else {
return false;
}
}
}
注意这里连接与关闭连接的代码,IoSession的close()仅仅关闭了TCP的连接通道,并没有关闭Server端、Client端的程序。你需要调用IoService的dispose()方法停止Server端、Client端。
这里再看看客户端心跳包的处理
public class ClientKeepAliveMessageFactoryImp implements KeepAliveMessageFactory {
@Override
public boolean isRequest(IoSession ioSession, Object o) {
if (o instanceof String && o.equals(Params.SEND)) {
return true;
}
return false;
}
@Override
public boolean isResponse(IoSession ioSession, Object o) {
if (o instanceof String && o.equals(Params.RECEIVE)) {
return true;
}
return false;
}
@Override
public Object getRequest(IoSession ioSession) {
return Params.SEND;
}
@Override
public Object getResponse(IoSession ioSession, Object o) {
return null;
}
}
这里就是由客户端主动发起心跳监听请求
最终结果
来分别看看服务端与客户端效果截图


达成预期
备注
大家可以参考《Apache_Mina_Server_2.0中文参考手册V1.0》这本操作手册对Mina进行更多的了解
其他Socket通信框架
之前在Github上接触到AndroidAsync框架,这个框架比较屌,是基于NIO的高效且有线程回调的框架。他既可以完成普通Socket传输,也可以完成SocketIO、WebSocket、Http请求,并且支持取消传输功能,甚至还可以作为一个简单的HTTP服务器。他的使用也很简单
AsyncServer.getDefault().connectSocket(Params.HOSTNAME, Params.PORT, new ConnectCallback() {
@Override
public void onConnectCompleted(Exception ex, AsyncSocket socket) {
if (ex!=null) {
Log.d("PushService", "连接出错");
return;
}
socket.setDataCallback(new DataCallback() {
@Override
public void onDataAvailable(DataEmitter emitter, ByteBufferList bb) {
Log.d("PushService", new String(bb.getAllByteArray()));
}
});
socket.setClosedCallback(new CompletedCallback() {
@Override
public void onCompleted(Exception ex) {
if (ex!=null) {
Log.d("PushService", "setClosedCallback出错");
return;
}
Log.d("PushService", "setClosedCallback");
}
});
socket.setEndCallback(new CompletedCallback() {
@Override
public void onCompleted(Exception ex) {
if (ex!=null) {
Log.d("PushService", "setEndCallback出错");
return;
}
Log.d("PushService", "setEndCallback");
}
});
socket.setWriteableCallback(new WritableCallback() {
@Override
public void onWriteable() {
Log.d("PushService", "onWriteable");
}
});
Util.writeAll(socket, "Client\r\n".getBytes(), new CompletedCallback() {
@Override
public void onCompleted(Exception ex) {
if (ex!=null) {
Log.d("PushService", "writeAll出错");
return;
}
Log.d("PushService", "writeAll");
}
});
}});
setDataCallback就是接收数据的方法,writeAll就是写数据的方法,也没啥难度可说
网友评论