整体方案:
PS:该方案的消息推送速度基本和socket相差无疑,至于和借助于rabbitmq、rocketmq等构建的long polling以及websocket,此处不讨论,该方案最大的优点在于可以忽略所有浏览器、jdk、Tomcat、spring等版本兼容问题
响应时间 本人的服务器接收到消息,到把该消息推送到客户端仅需100ms左右,如果轮询失败,响应速度在20ms以内(没有过于复杂的消息处理),至于连接数量的影响,只测了大概20个客户端同时连接的情况,耗时上面基本波动很小
客户端:
使用jQuery的ajax方法,在请求成功的基础上进行递归调用,实现客户端对服务器的不间断少量数据请求。
注:jQuery 最大程度避免浏览器不兼容问题
ajax 少量数据请求,提高效率
服务器端:
- 创建一个响应long polling 线程的管理工具类,并使用Hashtable 存储这些线程,创建响应的方法
- 在Controller 中,在处理long polling 请求的方法中,收到请求则添加到到线程列表,并让该线程睡眠,睡眠时间也就是轮询间隔时间(在不触发条件的情况下)
- 为了测试,在服务器端写了一个socket的服务器端,当socket服务器端收到消息后,就调用long polling 线程的管理工具类的响应方法,查找线程,并进行打断该线程,Controller中的线程被打断,则去消息队列读取最新的消息,然后返回并在列表中移除该响应线程
- 在客户端成功收到响应后,立即进行下一次long polling 请求,以此构成整个系统的实时响应
注:本人使用Maven搭建springMVC的应用
Hashtable 自身线程安全,省去很多麻烦,当然也可根据实际情况选用其他的集合类,该系统的key值是来自客户端的标示,只要提供一个唯一的long polling 请求标示就行,也许有同时出发多个请求线程的需求,就需要使用其他集合类,以及构建相应的逻辑。
Controller springMVC中默认是单例多线程的,本系统也是基于此基础,如果更改,同样需要重新构建整个框架
线程打断 使用了线程的interrupte方法,该方法具有一定缺陷,执行后打断sleep,同时会触发InterruptedException,追求完美的可以选择其他线程管理方案。
线程睡眠时间 如果没有新消息触发该线程,则会一直睡到设置的时间结束,返回一个值,该时间也是轮询连接的最大时间,一般浏览器会对持续时间连接有限制,所以此处建议30s~60s
消息队列 可以自己建一个,如果整个系统流畅,处理速度足够快,或者服务器端接收消息间隔不是很小,可以随时接收随时处理,加上消息队列主要起个缓冲的作用,当然使用者要构建完善的管理逻辑,线程安全是重中之重。
源码
以下是一部分源码,只是大概的逻辑流程
Javaweb 服务器端
Controller
@RequestMapping(value="/msg", method = RequestMethod.POST)
@ResponseBody
public String msg(String param){
System.out.println("long polling: tag="+param);
//参数作为当前线程的标志
if (!sharedPollingThread.addPollingThreadToList(param,Thread.currentThread())) {
return "falled";
}
try {
Thread.sleep(1000*30);
} catch (InterruptedException e) {
// e.printStackTrace();
return "daduan";
}finally {
sharedPollingThread.removePollingThread(param);
}
// String msg = null;
// if (msgListened.msg() != null){
// return "new msg";
// }
return "server msg"+param;
}
PollingUtil
package com.jony.socket;
import java.util.Enumeration;
import java.util.Hashtable;
/**
* Manage the polling thread
* Created by jony on 17/10/19.
*/
public class PollingUtil {
//单例模式
private PollingUtil(){
}
private static final PollingUtil shareadPollingUtil = new PollingUtil();
public static PollingUtil getInstance(){
return shareadPollingUtil;
}
//polling thread list
//hashtable相对于hashmap线程安全,不需要再去为保证线程安全而做工作
Hashtable<String, Thread> pollingThreadList = new Hashtable<String, Thread>();
//add thread to list
public boolean addPollingThreadToList(String tag, Thread pollingThread){
if (findPollingThread(tag) == null) {
pollingThreadList.put(tag, pollingThread);
return true;
}else {
return false;
}
// pollingThreadList.put(tag, pollingThread);
// return true;
}
//interrupte and remove
//在收到消息是调用改方法
public void interruptePollingThread(String tag){
// System.out.println("find ...... ");
Thread pollingThread = findPollingThread(tag);
if (pollingThread != null){
// System.out.println("interrupt ...... ");
pollingThread.interrupt();
//移除放在返回睡眠
// removePollingThread(tag);
}
}
//find thread in list
private Thread findPollingThread(String tag){
Enumeration<String> e = pollingThreadList.keys();
// System.out.println("find count : "+pollingThreadList.size());
while (e.hasMoreElements()){
String key = e.nextElement();
// System.out.println("find: "+key+" ,tag: "+tag);
if (tag.equals(key)){
// System.out.println("find success!");
return pollingThreadList.get(key);
}
}
return null;
}
//remove thread
public boolean removePollingThread(String tag){
Thread pollingThread = findPollingThread(tag);
if (pollingThread != null){
pollingThreadList.remove(tag);
return true;
}else {
return false;
}
}
//view list infomation
public void viewListInfo(){
System.out.println("List infomation: count: "+pollingThreadList.size());
}
}
**SocketUtil **
package com.jony.socket;
import java.net.DatagramSocket;
import java.net.SocketException;
/**
* Created by jony on 17/10/19.
* 单类模式
*/
public class SocketUtil {
private static final int INPORT = 5000;
private static DatagramSocket serverSocket = null;
private static UDPServer udpServer = null;
static {
//初始化数据
try {
serverSocket = new DatagramSocket(INPORT);
} catch (SocketException e) {
e.printStackTrace();
}
if (serverSocket != null){
udpServer = new UDPServer(serverSocket);
}else {
System.out.println("Server is null !");
}
}
//私有化构造方法
private SocketUtil(){
startReceivingUDPMessages();
}
//创建对象并提供外部方法
private static final SocketUtil socketUtil = new SocketUtil();
public static SocketUtil getInstance(){
return socketUtil;
}
//开启本地接收线程
private void startReceivingUDPMessages(){
if (serverSocket != null){
new Thread(udpServer).start();
}else {
System.out.println("UDPServer open error !");
}
}
//停止接收udp消息
private void stopReceivingUDPMessages(){
serverSocket.close();
}
//发送udp消息
public void sendUDPMessages(String msg){
}
}
UDPServer
package com.jony.socket;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
/**
* Created by jony on 17/10/19.
*/
public class UDPServer implements Runnable{
//轮询线程管理类
private PollingUtil sharedPollingUtil = PollingUtil.getInstance();
private DatagramSocket serverSocket;
private static final int dataLength = 1024;
private byte[] recvBuf = new byte[dataLength];
private DatagramPacket packet = new DatagramPacket(recvBuf, recvBuf.length);
public UDPServer(DatagramSocket serverSocket) {
this.serverSocket = serverSocket;
}
@Override
public void run() {
System.out.println("Server Start receiving !");
while (true){
try {
serverSocket.receive(packet);
final String recvString = new String(packet.getData(), 0, packet.getLength());
System.out.println("UDP receviving: "+recvString);
if (recvString.equals("761399")){
sharedPollingUtil.viewListInfo();
continue;
}
//通知
sharedPollingUtil.interruptePollingThread(recvString);
// new MsgListened(){
// @Override
// public String msg() {
// return recvString;
// }
// };
} catch (IOException e) {
e.printStackTrace();
System.out.println("UDP received thread error !");
serverSocket.close();
return;
}
}
}
}
客户端
jsp页面
<%--
Created by IntelliJ IDEA.
User: jony
Date: 17/10/17
Time: 下午3:17
To change this template use File | Settings | File Templates.
--%>
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<html>
<head>
<title>长轮询测试</title>
<script src="<%=request.getContextPath()%>/js/jquery-3.2.1.js" type="text/javascript" language="JavaScript"></script>
<script src="<%=request.getContextPath()%>/js/msgtest.js" type="text/javascript" language="JavaScript"></script>
</head>
<body>
<p>
长轮询测试页面
</p>
<p>
<textarea rows="1" cols="10" id="tag"></textarea>
<button type="button" id="start">开始轮询</button>
</p>
<p id="dataShow">轮询过程:</p>
</body>
</html>
js
/**
* Created by jony on 17/10/17.
*/
//记录轮询次数
var count = 0;
//记录轮询开始时间
//轮询标示
var tag;
$(Document).ready(function(){
$("#start").click(function(){
tag = $("#tag").val();
getMsg();
});
});
function getMsg() {
count++;
var currentTime = (new Date()).getTime();
$.ajax({
url:"/polling/msg",
type:"post",
global:true, //默认值,会触发全局的ajax
async:true,
data:{"param":tag},
success:function(data)
{
// if(data != null && data!="")
// alertShow(data.msg);
var intervalTime = (new Date()).getTime() - currentTime;
$("#dataShow").append("<p>第"+count+"次, data: "+data+",interval time:"+intervalTime+"</p>");
if (data == "falled"){
alert("轮询失败,该标识已被使用!");
return;
}
getMsg();
}
});
}
$.ajaxError(function () {
alert("ajax error !");
});
setInterval()
至于pom、springMVC配置等其余代码就不贴了
该文章只是个大概方案流程,还有很多不完善的地方,仅供参考,敬请指正。
网友评论