netty实现
一:实现原理
实现原理:
长连接的维持是要客户端程序定时向服务端程序发送一个维持连接包的。如果长时间未发送维持连接包,服务端程序将断开连接。
客户端:
Client通过持有Socket的对象,可以随时(使用sendObject方法)发送Massage Object(消息)给服务端。如果keepAliveDelay毫秒(程序中是2秒)内未发送任何数据,则自动发送一个KeepAlive Object(心跳)给服务端,用于维持连接。
由于我们向服务端可以发送很多不同的消息对象,服务端也可以返回不同的对象。所以对于返回对象的处理,要编写具体的ObjectAction实现类进行处理。通过Client.addActionMap方法进行添加。这样,程序会回调处理。
服务端:
由于客户端会定时(keepAliveDelay毫秒)发送维持连接的信息过来,所以服务端要有一个检测机制。即当服务端receiveTimeDelay毫秒(程序中是3秒)内未接收任何数据,则自动断开与客户端的连接(直接调用socket的close方法,关闭掉socket即可)。ActionMapping的原理与客户端相似(相同)。通过添加相应的ObjectAction实现类,可以实现不同对象的响应、应答过程。
二:实现代码示例
1:定义传输对象
public class KeepAlive implements Serializable {
private static final long serialVersionUID = 5731607955571883638L;
@Override
public String toString() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "维持连接包";
}
}
2:定义server端
/**
* <p>
* 服务端接口请求,并响应。
* 心跳方式:超过3s未收到client响应,则认为client端已经断开,则此时断开server端。
* </p>
*/
public class Server {
public static void main(String[] args) {
int port = 65432;
Server server = new Server(port);
server.start();
}
public interface ObjectAction {
Object action(Object rev, Server server);
}
public static final class DefaultObjectAction implements ObjectAction {
@Override
public Object action(Object rev, Server server) {
System.out.println("处理并返回:" + rev);
return rev;
}
}
private int port;
private volatile boolean running = false;
private long receiveTimeDelay = 3000;
/**
* 存储接收到对象和对象类型的映射。
* 对象类型 <==> 对象。
* 因此client和server可以发送不同类型的对象消息
*/
private ConcurrentHashMap<Class, ObjectAction> actionMapping = new ConcurrentHashMap<Class, ObjectAction>();
/**
* 监听线程,用于接口client端的心跳连接
*/
private Thread connWatchDog;
public Server(int port) {
this.port = port;
}
public void start() {
if (running) {
return;
}
running = true;
connWatchDog = new Thread(new ConnWatchDog());
connWatchDog.start();
}
@SuppressWarnings("deprecation")
public void stop() {
if (running) {
running = false;
}
if (connWatchDog != null) {
connWatchDog.stop();
}
}
public void addActionMap(Class<Object> cls, ObjectAction action) {
actionMapping.put(cls, action);
}
class ConnWatchDog implements Runnable {
@Override
public void run() {
try {
ServerSocket ss = new ServerSocket(port, 5);
while (running) {
// 阻塞等待client端发生的socket,消息均会封装为socket
Socket s = ss.accept();
new Thread(new SocketAction(s)).start();
}
} catch (IOException e) {
Server.this.stop();
}
}
}
class SocketAction implements Runnable {
Socket s;
boolean run = true;
long lastReceiveTime = System.currentTimeMillis();
public SocketAction(Socket s) {
this.s = s;
}
@Override
public void run() {
while (running && run) {
// 若超过3s未收到客户端发送的数据,则认为客户端已经断开,那么断开服务端。
if (System.currentTimeMillis() - lastReceiveTime > receiveTimeDelay) {
overThis();
} else {
try {
InputStream in = s.getInputStream();
if (in.available() > 0) {
ObjectInputStream ois = new ObjectInputStream(in);
Object obj = ois.readObject();
lastReceiveTime = System.currentTimeMillis();
System.out.println("接收:" + obj);
ObjectAction oa = actionMapping.get(obj.getClass());
oa = oa == null ? new DefaultObjectAction() : oa;
Object out = oa.action(obj, Server.this);
if (out != null) {
ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
oos.writeObject(out);
oos.flush();
}
} else {
Thread.sleep(10);
}
} catch (Exception e) {
e.printStackTrace();
overThis();
}
}
}
}
private void overThis() {
if (run) {
run = false;
}
if (s != null) {
try {
s.close();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("关闭:" + s.getRemoteSocketAddress());
}
}
}
3:定义client端
/**
* <p>
* 客户端接口请求,并接口。
* 心跳方式:2s/次
* </p>
*/
public class Client {
public static void main(String[] args) throws UnknownHostException, IOException {
String serverIp = "127.0.0.1";
int port = 65432;
Client client = new Client(serverIp, port);
client.start();
}
public interface ObjectAction {
void action(Object obj, Client client);
}
public static final class DefaultObjectAction implements ObjectAction {
@Override
public void action(Object obj, Client client) {
System.out.println("处理:" + obj.toString());
}
}
private String serverIp;
private int port;
private Socket socket;
/**
* 连接状态
*/
private boolean running = false;
/**
* 最后一次发送数据的时间
*/
private long lastSendTime;
/**
* 用于保存接收消息对象类型及该类型消息处理的对象
*/
private ConcurrentHashMap<Class, ObjectAction> actionMapping = new ConcurrentHashMap<Class, ObjectAction>();
public Client(String serverIp, int port) {
this.serverIp = serverIp;
this.port = port;
}
public void start() throws IOException {
if (running) {
return;
}
socket = new Socket(serverIp, port);
System.out.println("本地端口:" + socket.getLocalPort());
lastSendTime = System.currentTimeMillis();
running = true;
/**
* 保持长连接的线程,每隔2秒项服务器发一个一个保持连接的心跳消息
*/
new Thread(new KeepAliveWatchDog()).start();
/**
* 接受服务端消息的线程,处理消息
*/
new Thread(new ReceiveWatchDog()).start();
}
public void stop() {
if (running) {
running = false;
}
}
/**
* 添加接收对象的处理对象。
*
* @param cls 待处理的对象,其所属的类。
* @param action 处理过程对象。
*/
public void addActionMap(Class<Object> cls, ObjectAction action) {
actionMapping.put(cls, action);
}
public void sendObject(Object obj) throws IOException {
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
oos.writeObject(obj);
System.out.println("发送:\t" + obj);
oos.flush();
}
class KeepAliveWatchDog implements Runnable {
long checkDelay = 10;
long keepAliveDelay = 2000;
@Override
public void run() {
while (running) {
if (System.currentTimeMillis() - lastSendTime > keepAliveDelay) {
try {
Client.this.sendObject(new KeepAlive());
} catch (IOException e) {
e.printStackTrace();
Client.this.stop();
}
lastSendTime = System.currentTimeMillis();
} else {
try {
Thread.sleep(checkDelay);
} catch (InterruptedException e) {
e.printStackTrace();
Client.this.stop();
}
}
}
}
}
class ReceiveWatchDog implements Runnable {
@Override
public void run() {
while (running) {
try {
InputStream in = socket.getInputStream();
if (in.available() > 0) {
ObjectInputStream ois = new ObjectInputStream(in);
Object obj = ois.readObject();
System.out.println("接收:" + obj);
ObjectAction oa = actionMapping.get(obj.getClass());
oa = oa == null ? new DefaultObjectAction() : oa;
oa.action(obj, Client.this);
} else {
Thread.sleep(10);
}
} catch (Exception e) {
e.printStackTrace();
Client.this.stop();
}
}
}
}
}
网友评论