美文网首页程序员
long polling 实现B/S架构的实时消息推送

long polling 实现B/S架构的实时消息推送

作者: 天涯笑笑生 | 来源:发表于2017-10-20 11:04 被阅读0次

    comet 4j 实现消息推送

    整体方案:

    PS:该方案的消息推送速度基本和socket相差无疑,至于和借助于rabbitmq、rocketmq等构建的long polling以及websocket,此处不讨论,该方案最大的优点在于可以忽略所有浏览器、jdk、Tomcat、spring等版本兼容问题
    响应时间 本人的服务器接收到消息,到把该消息推送到客户端仅需100ms左右,如果轮询失败,响应速度在20ms以内(没有过于复杂的消息处理),至于连接数量的影响,只测了大概20个客户端同时连接的情况,耗时上面基本波动很小

    客户端:

    使用jQuery的ajax方法,在请求成功的基础上进行递归调用,实现客户端对服务器的不间断少量数据请求。
    注:jQuery 最大程度避免浏览器不兼容问题
    ajax 少量数据请求,提高效率

    服务器端:

    • 创建一个响应long polling 线程的管理工具类,并使用Hashtable 存储这些线程,创建响应的方法
    • 在Controller 中,在处理long polling 请求的方法中,收到请求则添加到到线程列表,并让该线程睡眠,睡眠时间也就是轮询间隔时间(在不触发条件的情况下)
    • 为了测试,在服务器端写了一个socket的服务器端,当socket服务器端收到消息后,就调用long polling 线程的管理工具类的响应方法,查找线程,并进行打断该线程,Controller中的线程被打断,则去消息队列读取最新的消息,然后返回并在列表中移除该响应线程
    • 在客户端成功收到响应后,立即进行下一次long polling 请求,以此构成整个系统的实时响应

    注:本人使用Maven搭建springMVC的应用

    Hashtable 自身线程安全,省去很多麻烦,当然也可根据实际情况选用其他的集合类,该系统的key值是来自客户端的标示,只要提供一个唯一的long polling 请求标示就行,也许有同时出发多个请求线程的需求,就需要使用其他集合类,以及构建相应的逻辑。

    Controller springMVC中默认是单例多线程的,本系统也是基于此基础,如果更改,同样需要重新构建整个框架

    线程打断 使用了线程的interrupte方法,该方法具有一定缺陷,执行后打断sleep,同时会触发InterruptedException,追求完美的可以选择其他线程管理方案。

    线程睡眠时间 如果没有新消息触发该线程,则会一直睡到设置的时间结束,返回一个值,该时间也是轮询连接的最大时间,一般浏览器会对持续时间连接有限制,所以此处建议30s~60s

    消息队列 可以自己建一个,如果整个系统流畅,处理速度足够快,或者服务器端接收消息间隔不是很小,可以随时接收随时处理,加上消息队列主要起个缓冲的作用,当然使用者要构建完善的管理逻辑,线程安全是重中之重。

    源码

    以下是一部分源码,只是大概的逻辑流程

    Javaweb 服务器端

    Controller

     @RequestMapping(value="/msg", method = RequestMethod.POST)
        @ResponseBody
        public String msg(String param){
    
           System.out.println("long polling: tag="+param);
    
            //参数作为当前线程的标志
            if (!sharedPollingThread.addPollingThreadToList(param,Thread.currentThread())) {
                return "falled";
            }
    
            try {
                Thread.sleep(1000*30);
    
            } catch (InterruptedException e) {
    //            e.printStackTrace();
                return "daduan";
            }finally {
                sharedPollingThread.removePollingThread(param);
            }
    
    //        String msg = null;
    //        if (msgListened.msg() != null){
    //            return "new msg";
    //        }
    
            return "server msg"+param;
        }
    
    

    PollingUtil

    package com.jony.socket;
    
    import java.util.Enumeration;
    import java.util.Hashtable;
    
    /**
     * Manage the polling thread
     * Created by jony on 17/10/19.
     */
    public class PollingUtil {
    
        //单例模式
        private PollingUtil(){
    
        }
    
        private static final PollingUtil shareadPollingUtil = new PollingUtil();
        public static PollingUtil getInstance(){
            return shareadPollingUtil;
        }
    
        //polling thread list
        //hashtable相对于hashmap线程安全,不需要再去为保证线程安全而做工作
        Hashtable<String, Thread> pollingThreadList = new Hashtable<String, Thread>();
    
        //add thread to list
        public boolean addPollingThreadToList(String tag, Thread pollingThread){
    
            if (findPollingThread(tag) == null) {
                pollingThreadList.put(tag, pollingThread);
                return true;
            }else {
                return false;
            }
    //        pollingThreadList.put(tag, pollingThread);
    //        return true;
        }
    
        //interrupte and remove
        //在收到消息是调用改方法
        public void interruptePollingThread(String tag){
    
    //        System.out.println("find ...... ");
    
            Thread pollingThread = findPollingThread(tag);
            if (pollingThread != null){
    
    //            System.out.println("interrupt ...... ");
                pollingThread.interrupt();
    
                //移除放在返回睡眠
    //            removePollingThread(tag);
            }
        }
    
        //find thread in list
        private Thread findPollingThread(String tag){
            Enumeration<String> e = pollingThreadList.keys();
    
    //        System.out.println("find count : "+pollingThreadList.size());
    
            while (e.hasMoreElements()){
    
                String key = e.nextElement();
    
    //            System.out.println("find: "+key+" ,tag: "+tag);
    
                if (tag.equals(key)){
    
    //                System.out.println("find success!");
                    return pollingThreadList.get(key);
                }
            }
    
            return null;
        }
    
        //remove thread
        public boolean removePollingThread(String tag){
    
            Thread pollingThread = findPollingThread(tag);
    
            if (pollingThread != null){
                pollingThreadList.remove(tag);
                return true;
            }else {
                return false;
            }
        }
    
    
        //view list infomation
        public void viewListInfo(){
    
            System.out.println("List infomation: count: "+pollingThreadList.size());
        }
    
    }
    
    

    **SocketUtil **

    package com.jony.socket;
    
    import java.net.DatagramSocket;
    import java.net.SocketException;
    
    /**
     * Created by jony on 17/10/19.
     * 单类模式
     */
    public class SocketUtil {
    
        private static final int INPORT = 5000;
        private static DatagramSocket serverSocket = null;
        private static UDPServer udpServer = null;
    
        static {
            //初始化数据
            try {
                serverSocket = new DatagramSocket(INPORT);
            } catch (SocketException e) {
                e.printStackTrace();
            }
    
            if (serverSocket != null){
                udpServer = new UDPServer(serverSocket);
    
            }else {
                System.out.println("Server is null !");
            }
        }
    
        //私有化构造方法
        private SocketUtil(){
            startReceivingUDPMessages();
        }
    
        //创建对象并提供外部方法
        private static final SocketUtil socketUtil = new SocketUtil();
        public static SocketUtil getInstance(){
            return socketUtil;
        }
    
        //开启本地接收线程
        private  void  startReceivingUDPMessages(){
    
            if (serverSocket != null){
                new Thread(udpServer).start();
            }else {
                System.out.println("UDPServer open error !");
            }
    
        }
        //停止接收udp消息
        private void stopReceivingUDPMessages(){
            serverSocket.close();
    
        }
    
        //发送udp消息
        public void sendUDPMessages(String msg){
    
        }
    
    }
    
    

    UDPServer

    package com.jony.socket;
    
    import java.io.IOException;
    import java.net.DatagramPacket;
    import java.net.DatagramSocket;
    
    /**
     * Created by jony on 17/10/19.
     */
    public class UDPServer implements Runnable{
    
        //轮询线程管理类
        private PollingUtil sharedPollingUtil = PollingUtil.getInstance();
    
        private DatagramSocket serverSocket;
        private static final int dataLength = 1024;
        private byte[] recvBuf = new byte[dataLength];
        private DatagramPacket packet = new DatagramPacket(recvBuf, recvBuf.length);
    
        public UDPServer(DatagramSocket serverSocket) {
            this.serverSocket = serverSocket;
        }
    
        @Override
        public void run() {
    
            System.out.println("Server Start receiving !");
    
            while (true){
                try {
                    serverSocket.receive(packet);
                    final String recvString = new String(packet.getData(), 0, packet.getLength());
                    System.out.println("UDP receviving: "+recvString);
    
                    if (recvString.equals("761399")){
                        sharedPollingUtil.viewListInfo();
                        continue;
                    }
    
                    //通知
                    sharedPollingUtil.interruptePollingThread(recvString);
    
    //                new MsgListened(){
    //                    @Override
    //                    public String msg() {
    //                        return recvString;
    //                    }
    //                };
    
                } catch (IOException e) {
                    e.printStackTrace();
    
                    System.out.println("UDP received thread error !");
                    serverSocket.close();
                    return;
    
                }
    
            }
        }
    }
    
    

    客户端

    jsp页面

    <%--
      Created by IntelliJ IDEA.
      User: jony
      Date: 17/10/17
      Time: 下午3:17
      To change this template use File | Settings | File Templates.
    --%>
    <%@ page contentType="text/html;charset=UTF-8" language="java" %>
    <html>
    <head>
        <title>长轮询测试</title>
    
        <script src="<%=request.getContextPath()%>/js/jquery-3.2.1.js" type="text/javascript" language="JavaScript"></script>
        <script src="<%=request.getContextPath()%>/js/msgtest.js" type="text/javascript" language="JavaScript"></script>
    </head>
    <body>
    
    <p>
        长轮询测试页面
    </p>
    
    <p>
        <textarea rows="1" cols="10" id="tag"></textarea>
        <button type="button" id="start">开始轮询</button>
    </p>
    <p id="dataShow">轮询过程:</p>
    </body>
    </html>
    
    

    js

    /**
     * Created by jony on 17/10/17.
     */
    
    //记录轮询次数
    var count = 0;
    
    //记录轮询开始时间
    
    //轮询标示
    var tag;
    
    
    $(Document).ready(function(){
    
        $("#start").click(function(){
            tag = $("#tag").val();
            getMsg();
        });
    
    });
    
    
    function getMsg() {
    
        count++;
        var currentTime = (new Date()).getTime();
    
        $.ajax({
            url:"/polling/msg",
            type:"post",
            global:true, //默认值,会触发全局的ajax
            async:true,
            data:{"param":tag},
            success:function(data)
            {
                // if(data != null && data!="")
                //     alertShow(data.msg);
    
                var intervalTime = (new Date()).getTime() - currentTime;
    
                $("#dataShow").append("<p>第"+count+"次, data: "+data+",interval time:"+intervalTime+"</p>");
                if (data == "falled"){
                    alert("轮询失败,该标识已被使用!");
                    return;
    
                }
                getMsg();
            }
        });
    }
    
    $.ajaxError(function () {
        alert("ajax error !");
    });
    
    setInterval()
    

    至于pom、springMVC配置等其余代码就不贴了
    该文章只是个大概方案流程,还有很多不完善的地方,仅供参考,敬请指正。

    相关文章

      网友评论

        本文标题:long polling 实现B/S架构的实时消息推送

        本文链接:https://www.haomeiwen.com/subject/tpvmuxtx.html