美文网首页
SpringBoot+Webocket 初步使用

SpringBoot+Webocket 初步使用

作者: Noah牛YY | 来源:发表于2018-07-13 07:58 被阅读210次

    写在前面

    本文介绍在 SpringBoot 项目中使用 WebSocket, 借助 STOMP 和 SocketJS

    效果图

    客户端向服务端发消息

    客户端向服务端发消息.gif

    服务端向客户端发消息

    广播式

    广播式.gif

    点对点

    点对点.gif

    开始搭建

    新建 SpringBoot 项目, 依赖勾选 WebSocketThymeleaf

    image.png

    WebSocket 配置

    /**
     * WebSocket 配置
     *
     * @author niuyy
     * @date 2018/3/23
     */
    @Configuration
    @EnableWebSocketMessageBroker
    public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
        @Override
        public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
            stompEndpointRegistry.setErrorHandler(this.webSocketHandler())
                    .addEndpoint("/endpointNiu")
                    .setAllowedOrigins("*").withSockJS();
        }
    
        @Override
        public void configureMessageBroker(MessageBrokerRegistry registry) {
            registry.enableSimpleBroker("/topic", "/queue");
            registry.setUserDestinationPrefix("/user");
        }
    
        /**
         * WebSocket Error 处理
         *
         * @return WebSocket Error 处理器
         */
        @Bean
        public StompSubProtocolErrorHandler webSocketHandler() {
            return new WebSocketErrorHandler();
        }
    }
    

    其中:

    • @EnableWebSocketMessageBroker 开启使用 STOMP 协议来传输基于代理的消息,Broker是代理
    • setErrorHandler 设置一个错误处理的 Handler, 以便捕捉错误信息, 文章末尾有贴出代码
    • addEndpoint 切入点, 客户端在 new SockJs 的时候用到
    • setAllowedOrigins 设置为「*」表示接收 httphttps 的请求
    • withSockJS 使用 SockJS
    • enableSimpleBroker 参数是多个 destinationPrefixes, 服务端发送消息的 destination 要有这些前缀
    • setUserDestinationPrefix 设置点对点时, destination 的前缀, 如客户端订阅 /user/{userId}/getPoint, 服务端
      发送消息时, 调用 messagingTemplate.convertAndSendToUser(userId, "/getPoint", msg)

    控制层

    /**
     * @author niuyy
     * @date 2018/3/23
     */
    @Controller
    @Slf4j
    public class WebController {
    
        /**
         * 接收消息
         * @param name 姓名
         * @return welcome, [姓名] !
         */
        @MessageMapping("/welcome")
        @SendTo("/topic/getBro")
        public String say(String name) {
            log.info("name: " + name);
            return "welcome, " + name + " !";
        }
    
        @Autowired
        private SimpMessagingTemplate messagingTemplate;
    
        /**
         * 广播式发送消息给订阅了「/topic/getBro」的客户端
         */
        @RequestMapping("sendMsgBro")
        @ResponseBody
        public void sendMsg() {
            messagingTemplate.convertAndSend("/topic/getBro", "服武器主动推送的广播消息");
        }
    
        /**
         * 发送消息给指定 sessionId 的客户端, 且该客户端订阅了「/topic/getBro」
         *
         * @param sessionId 客户端的 sessionId
         */
        @RequestMapping("sendMsgPoint")
        @ResponseBody
        public void sendMsgPoint(String sessionId) {
            messagingTemplate.convertAndSendToUser(sessionId, "/queue/getPoint", "服武器主动推送的点对点消息", createHeaders(sessionId));
        }
    
        private MessageHeaders createHeaders(String sessionId) {
            SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
            headerAccessor.setSessionId(sessionId);
            headerAccessor.setLeaveMutable(true);
            return headerAccessor.getMessageHeaders();
        }
    
    }
    

    其中:

    • @MessageMapping 类似于 @RequestMapping, 只不过映射的是 webSocket 的请求地址
    • @SendTo 指定该方法响应给哪个 topic, 客户端订阅了 /topic/getBro 的都能收到方法响应
    • convertAndSendconvertAndSendToUser 本质是一样的, 底层调用同一方法, 是服务端主动发送消息

    这里说明一点, 本文中使用的是客户端的 sessionId 实现的点对点消息发送, 另外, 还有客户端订阅 /user/{userId}/topic, 服务端
    调用 messagingTemplate.convertAndSendToUser(userId, "/topic", msg) 的方法并未给出, 原因是笔者认为该方法和广播式类似

    监听器

    新客户端连接

    /**
     * @author niuyy
     * @date 2018/3/26
     */
    @Slf4j
    @Component
    public class WebSocketConnectListener implements ApplicationListener<SessionConnectEvent> {
    
        @Override
        public void onApplicationEvent(SessionConnectEvent event) {
            StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());
            String sessionId = sha.getSessionId();
            log.info("sessionId: {} 已连接", sessionId);
        }
    }
    

    其中

    • sessionId 用于点对点发送消息
    • @Component 自动注入

    断开连接监听器

    /**
     * @author niuyy
     * @date 2018/3/26
     */
    @Slf4j
    @Component
    public class WebSocketDisconnectListener implements ApplicationListener<SessionDisconnectEvent> {
    
        @Override
        public void onApplicationEvent(SessionDisconnectEvent event) {
            StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());
            String sessionId = sha.getSessionId();
            log.info("sessionId: {} 已断开", sessionId);
        }
    }
    

    读者可根据不同需求, 在断开连接时执行不同操作

    客户端

    <html lang="en" xmlns:th="http://www.thymeleaf.org">
    <head>
        <meta charset="UTF-8"/>
        <title>WebSocket</title>
        <script th:src="@{js/sockjs.min.js}"></script>
        <script th:src="@{js/stomp.js}"></script>
        <script th:src="@{js/jquery-3.1.1.js}"></script>
    </head>
    <body onload="disconnect()">
    <noscript><h2 style="color: #e80b0a;">Sorry,浏览器不支持WebSocket</h2></noscript>
    <div>
        <div>
            <button id="connect" onclick="connect();">连接</button>
            <button id="disconnect" disabled="disabled" onclick="disconnect();">断开连接</button>
        </div>
    
        <div id="conversationDiv">
            <label>输入你的名字</label><input type="text" id="name"/>
            <button id="sendName" onclick="sendName();">发送</button>
            <p id="response"></p>
        </div>
    </div>
    <script type="text/javascript">
        var stompClient = null;
        function setConnected(connected) {
            document.getElementById("connect").disabled = connected;
            document.getElementById("disconnect").disabled = !connected;
            document.getElementById("conversationDiv").style.visibility = connected ? 'visible' : 'hidden';
    //        $("#connect").disabled = connected;
    //        $("#disconnect").disabled = !connected;
            $("#response").html();
        }
        function connect() {
            var socket = new SockJS('/endpointNiu');
            stompClient = Stomp.over(socket);
            stompClient.connect({}, function (frame) {
                setConnected(true);
                console.log('Connected:' + frame);
                stompClient.subscribe('/user/queue/getPoint', function (response) {
                    showResponse("getPoint " + response.body);
                });
                stompClient.subscribe('/topic/getBro', function (response) {
                    showResponse("getBro " + response.body);
                })
            });
        }
        function disconnect() {
            if (stompClient != null) {
                stompClient.disconnect();
            }
            setConnected(false);
            console.log('Disconnected');
        }
        function sendName() {
            var name = $('#name').val();
            console.log('name:' + name);
            stompClient.send("/welcome", {}, name);
        }
        function showResponse(message) {
            $("#response").html(message);
        }
    </script>
    </body>
    </html>
    

    其中

    • 引入的 js 文件在案例源码中有
    • 创建 SockJS: var socket = new SockJS('/endpointNiu');, 参数为在服务端设置的 endpoint
    • 订阅了两个 topic, 「/topic/getBro」接受广播消息, 「/user/queue/getPoint」接受点对点消息, 服务端在发送
      点对点消息的时候, destination 是没有「/user」的, 但是在 WebSocket 中我们已经配置过, 再看源码就懂了
    @Override
    public void convertAndSendToUser(String user, String destination, Object payload,
            @Nullable Map<String, Object> headers, @Nullable MessagePostProcessor postProcessor)
            throws MessagingException {
        Assert.notNull(user, "User must not be null");
        user = StringUtils.replace(user, "/", "%2F");
        destination = destination.startsWith("/") ? destination : "/" + destination;
        super.convertAndSend(this.destinationPrefix + user + destination, payload, headers, postProcessor);
    }
    

    添加地址映射

    /**
     * WebMvc 配置类
     *
     * @author niuyy
     * @date 2018/3/23
     */
    @Configuration
    public class WebMvcConfig extends WebMvcConfigurationSupport {
    
        @Override
        public void addViewControllers(ViewControllerRegistry registry) {
            registry.addViewController("/ws").setViewName("/ws");
        }
    
        @Override
        protected void addResourceHandlers(ResourceHandlerRegistry registry) {
            registry.addResourceHandler("/js/**")
                    .addResourceLocations("classpath:/static/js/");
        }
    }
    

    其中

    • addViewController 新增视图
    • addResourceHandlers 新增资源(静态文件等)

    原理

    • HTTP: 握手 -> 交换数据, 握手 -> 交换数据, 握手 -> 交换数据 ...
    • WebSocket: 握手 -> 交换数据 -> 交换数据 -> 交换数据 ... , 建立连接后, 直接使用 tcp 交换数据

    总结

    使用情景

    • 服务端需要主动发送消息给客户端
    • 以前客户端 ajax 轮询的需求都可以用这个替换, 减少资源开销

    参考

    1. 在Spring Boot框架下使用WebSocket实现消息推送
    2. WebSocket 是什么原理?为什么可以实现持久连接?
    3. SpringBoot学习-(十三)SpringBoot中建立WebSocket连接(STOMP)
    4. Socket 与 WebSocket

    案例源码

    案例源码

    WebSocketErrorHandler.java

    /**
     * @author niuyy
     * @date 2018/3/26
     */
    @Slf4j
    public class WebSocketErrorHandler extends StompSubProtocolErrorHandler {
        public WebSocketErrorHandler() {
            super();
        }
    
        @Override
        public Message<byte[]> handleClientMessageProcessingError(Message<byte[]> clientMessage, Throwable ex) {
            log.error("handleClientMessageProcessingError:clientMessage-" + clientMessage + ", error-"+ex.getMessage());
            return super.handleClientMessageProcessingError(clientMessage, ex);
        }
    
        @Override
        public Message<byte[]> handleErrorMessageToClient(Message<byte[]> errorMessage) {
            log.error("handleErrorMessageToClient:errorMessage-" + errorMessage);
            return super.handleErrorMessageToClient(errorMessage);
        }
    
        @Override
        protected Message<byte[]> handleInternal(StompHeaderAccessor errorHeaderAccessor, byte[] errorPayload, Throwable cause, StompHeaderAccessor clientHeaderAccessor) {
            log.error("handleInternal:errorHeaderAccessor-" + errorHeaderAccessor + ", errorPayload-" + errorPayload + ", error-" + cause.getMessage() + ", clientHeaderAccessor-"+clientHeaderAccessor);
            return super.handleInternal(errorHeaderAccessor, errorPayload, cause, clientHeaderAccessor);
        }
    }
    

    目录截图

    目录截图.png

    相关文章

      网友评论

          本文标题:SpringBoot+Webocket 初步使用

          本文链接:https://www.haomeiwen.com/subject/cjgdpftx.html