美文网首页Java高开发Java 杂谈Java
spring+stomp+webSocket+SockJS 实现

spring+stomp+webSocket+SockJS 实现

作者: java高并发 | 来源:发表于2019-05-21 17:34 被阅读2次

    接触webSocket是因为做的项目里有一个图像报表类的监控页面,数据在页面加载后通过ajax后台获取后展示在页面,因为监控数据实时在变化,需要体现数据的时效性,所以采用webSocket实现客户端和服务端的长连接通信,避免客户端重复发起请求获取数据。实现思路是通过定时任务定期的获取最新数据,然后通过webSocket由服务端广播出去,客户端监听这个广播消息并把最新数据展示出来。不多废话。

    1. 首先这是一个ssm项目。加入spring的webSocket包,必须是4.0以上的包,这里用了4.1.1:
      spring-messaging-4.1.1.RELEASE.jar
      spring-websocket-4.1.1.RELEASE.jar
    2. web.xml 里的filter 和servlet 都需要加上<async-supported>true</async-supported>配置。
    3. 配置webSocket配置文件,此配置文件在springmvc的配置文件里引入。
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:mvc="http://www.springframework.org/schema/mvc"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:websocket="http://www.springframework.org/schema/websocket"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/mvc
        http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
        http://www.springframework.org/schema/aop 
        http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
        http://www.springframework.org/schema/tx 
        http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
        http://www.springframework.org/schema/websocket  
        http://www.springframework.org/schema/websocket/spring-websocket-4.0.xsd">
        <bean class="org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean">
            <property name="maxTextMessageBufferSize" value="8192"/>
            <property name="maxBinaryMessageBufferSize" value="8192"/>
            <property name="maxSessionIdleTimeout" value="900000"/>
            <property name="asyncSendTimeout" value="5000"/>
        </bean>
    
        <!-- 实时消息推送,在spring容器初始化此bean的时候就启动了监听线程, -->
        <bean id="webSocketMessageUtil" class="com.lancy.webSocket.common.WebSocketMessageUtil"/>
    </beans>
    

    用注解的方式定义的webSocket的配置类

    package com.lancy.webSocket.action;
    
    import org.springframework.messaging.simp.config.MessageBrokerRegistry;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
    import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
    import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
    
    import com.lancy.webSocket.handler.StompMessageHandshakeHandler;
    import com.lancy.webSocket.handler.WebSocketHandshakeInterceptor;
    
    
    @EnableWebSocketMessageBroker
    @Controller
    public class WebSocketMessageAction extends AbstractWebSocketMessageBrokerConfigurer{
    
        /**
         * 将"/webSocket"路径注册为STOMP端点,这个路径与发送和接收消息的目的路径有所不同,
         * 这是一个端点,客户端在订阅或发布消息到目的地址前,要连接该端点, 
         * 即用户发送请求url="/applicationName/webSocket"与STOMP server进行连接。之后再转发到订阅url;
         */
        @Override
        public void registerStompEndpoints(StompEndpointRegistry registry) {
            //注册websocket,客户端用ws://host:port/项目名/webSocket 访问
            registry.addEndpoint("/webSocket")
                    .setHandshakeHandler(new StompMessageHandshakeHandler())
                    .addInterceptors(new WebSocketHandshakeInterceptor())
                    .withSockJS();//表示支持以SockJS方式连接服务器  
        }
    
        @Override
        public void configureMessageBroker(MessageBrokerRegistry registry) {
            registry.enableSimpleBroker("/topic","/user");//这句话表示在topic和user这两个域上服务端可以向客户端发消息
            registry.setApplicationDestinationPrefixes("/ws");//这句话表示客户端向服务器端发送时的主题上面需要加"/ws"作为前缀
            registry.setUserDestinationPrefix("/user");//这句话表示服务端给客户端指定用户发送一对一的主题,前缀是"/user"
        }
    }
    
    

    其中StompMessageHandshakeHandler类代码如下

    package com.lancy.webSocket.handler;
    
    import java.security.Principal;
    import java.util.Map;
    
    import org.springframework.http.server.ServerHttpRequest;
    import org.springframework.web.socket.WebSocketHandler;
    import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
    
    /**
     * 获取客户端连接前对客户端的session、cookie等信息进行握手处理, 也就是可以在这里可以进行一些用户认证?
     * 这是我个人的理解。这里没有做任何处理
     *
     */
    public class StompMessageHandshakeHandler extends DefaultHandshakeHandler{
    
        @Override
        protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler,
                Map<String, Object> attributes) {
            return super.determineUser(request, wsHandler, attributes);
        }
    }
    

    WebSocketHandshakeInterceptor类:

    package com.lancy.webSocket.handler;
    
    import java.util.Map;
    
    import org.springframework.http.server.ServerHttpRequest;
    import org.springframework.http.server.ServerHttpResponse;
    import org.springframework.web.socket.WebSocketHandler;
    import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
    
    /**
     * 握手前后的拦截,这里没有处理,默认
     *
     */
    public class WebSocketHandshakeInterceptor extends HttpSessionHandshakeInterceptor{
    
        @Override
        public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
                Exception ex) {
            super.afterHandshake(request, response, wsHandler, ex);
        }
    
        @Override
        public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
                Map<String, Object> attributes) throws Exception {
            return super.beforeHandshake(request, response, wsHandler, attributes);
        }
    }
    

    WebSocketHandshakeHandler类:

    package com.lancy.webSocket.handler;
    
    import org.springframework.web.socket.CloseStatus;
    import org.springframework.web.socket.WebSocketMessage;
    import org.springframework.web.socket.WebSocketSession;
    import org.springframework.web.socket.handler.TextWebSocketHandler;
    
    /**
     * WebSocket 握手信息
     * @ClassName: WebSocketHandshakeHandler.java
     * @Description: WebSocket 握手信息
     */
    public class WebSocketHandshakeHandler extends TextWebSocketHandler {  
    
        @Override
        public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
            super.afterConnectionClosed(session, status);
        }
    
        @Override
        public void afterConnectionEstablished(WebSocketSession session) throws Exception {
            super.afterConnectionEstablished(session);
        }
    
        @Override
        public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
            super.handleMessage(session, message);
        }
    }  
    

    自定义一个消息实体类,方便封装消息

    package com.lancy.webSocket.common;
    
    /**
     * 自定义封装包含发送信息的实体类
     *
     */
    public class WebSocketMessage {
    
        /**
         * 发送广播消息,发送地址:/topic/* ,*为任意名字,如取名monitor,则客户端对应订阅地址为:/topic/monitor
         * 发送私人消息,发送地址:/*,*为任意名字,这里取名为message,客户端对应订阅地址:/user/{自定义客户端标识ID}/message
         */
        //可以自定义其他的属性
        private String distination;
        private Object data;//实际发送的数据对象
        private String userId;//如果不为空,则表示发送给个人而不是广播
    
        public String getDistination() {
            return distination;
        }
        public void setDistination(String distination) {
            this.distination = distination;
        }
        public Object getData() {
            return data;
        }
        public void setData(Object data) {
            this.data = data;
        }
        public String getUserId() {
            return userId;
        }
        public void setUserId(String userId) {
            this.userId = userId;
        }
    }
    

    webSocket消息发送的工具类

    package com.lancy.webSocket.common;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingDeque;
    
    import org.apache.log4j.Logger;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.simp.SimpMessagingTemplate;
    
    import com.lancy.myBatis.common.StringUtils;
    
    /**
     * 这里启动一个线程不停的监控队列是否有消息需要进行发送,如果有就发送出去。
     *
     */
    public class WebSocketMessageUtil implements Runnable{
    
        static Logger log = Logger.getLogger(WebSocketMessageUtil.class); 
    
        private static SimpMessagingTemplate messageingTemplate;
        private static BlockingQueue<WebSocketMessage> wsQueue = new LinkedBlockingDeque<>();
    
        public WebSocketMessageUtil() {
            new Thread(this).start();
        }
    
        @Autowired
        public void setTemplate(SimpMessagingTemplate t){
            WebSocketMessageUtil.messageingTemplate = t;
        }
    
        public static void addMessage(WebSocketMessage msg){
            try{
                wsQueue.put(msg);
            }catch(InterruptedException e){
                log.error("添加实时消息异常");
            }
        }
    
        public static void sendMessage(WebSocketMessage msg){
            if(StringUtils.isBlank(msg.getUserId())){
                messageingTemplate.convertAndSend(msg.getDistination(),msg);
            }else{
                messageingTemplate.convertAndSendToUser(msg.getUserId(), msg.getDistination(), msg);
            }
        }
    
        @Override
        public void run() {
            log.info(">>>>>>>推送消息线程启动,正在监听消息。");
            while (true) {
                try {
                    WebSocketMessage msg = wsQueue.take();
                    if(msg!=null){
                        WebSocketMessageUtil.sendMessage(msg);
                    }
                } catch (Exception ex) {}
            }
        }
    }
    

    然后配置spring定时任务,在定时执行的方法里进行消息的添加,参考代码

    public void testWebSocket() {
            WebSocketMessage msg = new WebSocketMessage();
            msg.setDistination("/topic/lancy/testWebSocket/new");
            msg.setData("test....");
            WebSocketMessageUtil.addMessage(msg);
    }
    

    前台界面
    首先引入两个js文件 sockjs.min.js 和 stomp.min.js

    <script type="text/javascript" src="${ctx}/js/socket/sockjs.min.js"></script>
    <script type="text/javascript" src="${ctx}/js/socket/stomp.min.js"></script>
    

    具体页面如下

    <%@ page language="java" contentType="text/html; charset=UTF-8"
        pageEncoding="UTF-8"%>
    <%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>
    <c:set var="ctx" value="${pageContext.request.contextPath}" />
    <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
    <html>
    <head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
    <title>Insert title here</title>
    
    <link rel="stylesheet" type="text/css" href="${ctx}/css/default.css" />
    <link rel="stylesheet" type="text/css" href="${ctx}/js/easyui/themes/default/easyui.css">
    <link rel="stylesheet" type="text/css" href="${ctx}/js/easyui/themes/icon.css">
    <link rel="stylesheet" type="text/css" href="${ctx}/js/easyui/demo/demo.css">
    <script type="text/javascript" src="${ctx}/js/jquery.min.js"></script>
    <script type="text/javascript" src="${ctx}/js/socket/sockjs.min.js"></script>
    <script type="text/javascript" src="${ctx}/js/socket/stomp.min.js"></script>
    <script type="text/javascript" src="${ctx}/js/app_common.js"></script>
    <script type="text/javascript" src="${ctx}/js/easyui/jquery.easyui.min.js"></script>
    <script type="text/javascript"> 
    
        var stompClient = null;  
    
        function setConnected(connected) {  
            document.getElementById('connect').disabled = connected;  
            document.getElementById('disconnect').disabled = !connected;  
            document.getElementById('conversationDiv').style.visibility = connected ? 'visible' : 'hidden';  
            document.getElementById('response').innerHTML = '';  
        }  
    
        function connect() {  
            var socket = new SockJS('/myBookstore/webSocket');//连接服务端的端点,连接以后才可以订阅广播消息和个人消息 
            stompClient = Stomp.over(socket);              
            stompClient.connect({}, function(frame) {  
                setConnected(true);  
                console.log('Connected: ' + frame); 
                //订阅广播消息
                stompClient.subscribe('/topic/lancy/testWebSocket/new', function(greeting){  
                    showGreeting(greeting.body);  
                });
    
                //订阅个人信息
                stompClient.subscribe('/user/1/testUser', function(greeting){  
                    showGreeting(greeting.body);  
                }); 
            });  
        }  
    
        function disconnect() {  
            if (stompClient != null) {  
                stompClient.disconnect();  
                setConnected(false);  
                console.log("Disconnected");  
            }  
        }  
    
        //发送到服务的消息
        function sendName() {  
            var name = document.getElementById('name').value;
            stompClient.send("/ws/webSocket/testWithServer", {'name': 'xiao','syn':'wang'}, JSON.stringify({'message': name }));  
        }  
    
        function showGreeting(message) {  
            var response = document.getElementById('response');  
            var p = document.createElement('p');  
            p.style.wordWrap = 'break-word';  
            p.appendChild(document.createTextNode(message));  
            response.appendChild(p);  
        }
    
    </script>
    </head>
    <body class="easyui-layout">
        <div>  
            <div>  
                <button id="connect" onclick="connect();">Connect</button>  
                <button id="disconnect" disabled="disabled" onclick="disconnect();">Disconnect</button>  
            </div>  
            <div id="conversationDiv">  
                <label>What is your name?</label><input type="text" id="name" />  
                <button id="sendName" onclick="sendName();">Send all</button>  
                <p id="response"></p>  
            </div>  
        </div> 
    
    
    </body>
    </html>
    

    js方法里的sendName 发送到服务端的消息对应的类:

    package com.lancy.webSocket.action;
    
    import java.util.Map;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.messaging.handler.annotation.Headers;
    import org.springframework.messaging.handler.annotation.MessageMapping;
    import org.springframework.messaging.simp.SimpMessagingTemplate;
    import org.springframework.stereotype.Controller;
    
    @Controller
    public class WebSocketTestAction {
    
        @Autowired
        private SimpMessagingTemplate simpMessagingTemplate;
    
        @MessageMapping("/webSocket/testWithServer")
        //与页面的stompClient.send("/ws/webSocket/testWithServer", 
        //{'name': 'xiao','syn':'wang'}, JSON.stringify({'message': name }));方法对应
        public String send(String message,@Header("name")String name,
                @Headers Map<String, Object> headers){
            System.out.println(message);
            System.out.println(name);
            System.out.println(headers);
            simpMessagingTemplate.convertAndSend("/user/1/testUser","服务端返回消息");
            return "";
        }
    }
    
    

    欢迎工作一到五年的Java工程师朋友们加入Java高并发QQ群:219571750,群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

    41bcdf6703584fc58d197b1b04af4990.jpg 616236adc0134e2daf4b2d4c60c30360.jpg

    相关文章

      网友评论

        本文标题:spring+stomp+webSocket+SockJS 实现

        本文链接:https://www.haomeiwen.com/subject/jfzjzqtx.html