美文网首页
WebSocket详解及实践

WebSocket详解及实践

作者: bit_拳倾天下 | 来源:发表于2020-11-26 13:50 被阅读0次

1.背景

  最近在项目中遇到需要在网页上实时展示某些设备信息的需求。一开始还好,接入的设备会定时推送数据给系统(一分钟间隔),时效性要求不是很高,于是在前端写了定时方法,每分钟刷新一次数据,虽不优雅,但也勉强可以满足需求。
  但是系统陆续又接入了一些新设备,这次数据是不定时的推送(可能一天推送几条,也可能一秒钟推送几条)。这样一来,之前的方法就不合适了,定时方法的时间间隔设的太大,就有会延迟,也谈不上实时了;设置的太小,则会频繁的请求,浪费资源不说,还有可能请求不到数据(因为是不定时推送)。因此,就想,要是服务器主动向前端推送数据就好了,一旦服务器就收到设备数据,服务器就立马推送给前端。
  听前辈说WebSocket可以做到,于是一顿搜罗。简单地说,WebSocket是H5的一部分,是可以满足实时性的全双工通信的一种协议。
  我们最常用的http协议,都是发出一次请求,才能获取响应,服务器并不能主动向客户端推送数据。为了解决这个短板,在WebSocket之前,有定期轮询、comet等,能够实现类似服务器推送效果,但本质上还是请求/响应模式。WebSocket则不同,通过一次握手,会在服务器和客户端建立一个连接,基于此连接,客户端和服务器就可以双向发送数据。
  WebSocket详解
  聊天室示例
  我的需求和聊天室有些区别,聊天室是有数据变更,所有人都能看到;而我的需求是,数据有变更则推送给对应的客户端而不是所有客户端。这里先做个demo试一试深浅。
demo:https://gitee.com/bit-fist/websocket-test.git

2.环境

Vue + sockjs-client + stompjs
SpringBoot + RabitMQ

//判断当前浏览器是否支持WebSocket
if('WebSocket' in window){
   ws = new WebSocket("ws://localhost:8080/Demo/websocketTest/user000");
   console.log("link success")
}else{
   alert('Not support websocket')
}
//连接成功建立的回调方法
 ws.onopen = function(){ws.send("Test!"); }; 
//接收到消息的回调方法
 ws.onmessage = function(evt){console.log(evt.data);ws.close();}; 
//连接关闭的回调方法
 ws.onclose = function(evt){console.log("WebSocketClosed!");}; 
//连接发生错误的回调方法
 ws.onerror = function(evt){console.log("WebSocketError!");};

  上面的方法算是原生的,WebSocket是比较新的协议,不是所有浏览器都支持。因此需要有兼容WebSocket的方案,这就要用到SockJS了,它是一个浏览器上运行的 JavaScript 库,如果浏览器不支持 WebSocket,该库可以模拟对 WebSocket 的支持,实现浏览器和 Web 服务器之间低延迟、全双工、跨域的通讯通道。

  至于STOMP,直接使用WebSocket(或SockJS)就很类似于使用TCP套接字来编写Web应用。因为没有高层级的线路协议(wire protocol),因此就需要我们定义应用之间所发送消息的语义,还需要确保连接的两端都能遵循这些语义。
  不过,好消息是我们并非必须要使用原生的WebSocket连接。就像HTTP在TCP套接字之上添加了请求-响应模型层一样,STOMP在WebSocket之上提供了一个基于帧的线路格式(frame-based wire format)层,用来定义消息的语义。
  对于STOMP理解不深,直接摘抄 自 使用WebSocket和STOMP实现消息功能 (读书人的事,怎么能算偷呢→_→)

安装命令:
cnpm install sockjs-client
cnpm install stompjs
前端就算准备完成了。

至于后端,只要添加两个依赖就行了

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

RabbitMQ解析

3.前端代码

 methods:{
    connect(){
      // /server是vue.config.js中配置的代理,指向后端。如果没有配置代理,可以直接用http://{ip}:{port}替换。
      // 其中,/ws 是后端配置的端点,用于创建连接。
      let sock = new SockJS("/server/ws");
      this.socket = Stomp.over(sock);
      this.socket.connect({}, this.afterConnect, this.connectError)
    },
    disconnect(){
      this.socket.disconnect(()=>{

      })
    },
    afterConnect(){
      console.log("连接成功")
    },
    connectError(){
      console.error("连接失败!")
    },
    subscribe1(){
      this.order = this.socket.subscribe("/topic/subscribe/first", this.afterSubscribe)
    },
    sendMessage1(){
      this.socket.send("/topic/send/first", {}, this.message1 || "nothing")
    },
    subscribe2(){
      this.order = this.socket.subscribe("/topic/subscribe/second")
    },
    sendMessage2(){
      this.socket.send("/topic/send/second/" + this.name + "/" + this.gender, this.afterSubscribe, this.message2 || "nothing")
    },
    subscribe3(){
      this.order = this.socket.subscribe("/topic/subscribe/third", this.afterSubscribe, {name: this.name1, gender: this.gender1})
    },
    sendMessage3(){
      this.socket.send("/topic/send/third", {}, this.message3)
    },
    subscribe31(){
      this.order = this.socket.subscribe("/topic/subscribe/third1", this.afterSubscribe)
    },
    sendMessage31(){
      this.socket.send("/topic/send/third1", {name: this.name2, gender: this.gender2}, this.message4)
    },
    subscribe4(){
      this.order = this.socket.subscribe("/user/topic/order", this.afterSubscribe)
    },
    sendMessage4(){
      this.socket.send("/user/topic/message", {}, this.message5 || "nothing")
    },
    afterSubscribe(m){
      console.log("afterSubscribe", JSON.parse(m.body))
    },
    afterSendMessage(m){
      console.log("afterSendMessage", JSON.parse(m.body))
    }
  },
  beforeDestroy(){
    if(this.socket){
      this.disconnect()
    }
  }
}

stompjs方法

4.后端代码

1.配置

package com.bit.fist.websocketdemo.common.config;

import org.springframework.boot.SpringBootConfiguration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@SpringBootConfiguration
@EnableWebSocketMessageBroker
public class WebSocketConfigurer implements WebSocketMessageBrokerConfigurer {
    /**
     * 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs
     * 也可以设置拦截器、处理器等
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")//用于首次建立连接
                .setAllowedOrigins("*")//允许跨域
                .withSockJS();//使用sockjs兼容
    }

    /**
     * 配置消息代理,哪种路径的消息会进行代理处理
     * @param registry
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //订阅广播 Broker(消息代理)名称
        registry.enableSimpleBroker("/topic", "/talk");
        //点对点(一对一)使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
        registry.setUserDestinationPrefix("/user");
    }
}

2.拦截器


3.controller

package com.bit.fist.websocketdemo.controller;

import com.bit.fist.websocketdemo.common.entity.ResponseEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.handler.annotation.*;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Map;

@Slf4j
@RestController
public class SubscribeAndSendController {
    /**
     * 1.普通的订阅与推送
     * @return
     */
    @SubscribeMapping("/topic/subscribe/first")
    public ResponseEntity subscribe1(){
        log.info("订阅1----------");
        return ResponseEntity.successToResponse("订阅1成功");
    }
    @MessageMapping("/topic/send/first")
    @SendTo("/topic/subscribe/first")
    public ResponseEntity recieve1(String msg){
        log.info("发送1:{}", msg);
        return ResponseEntity.successToResponse(msg);
    }

    /**
     * 2.url传参的订阅与推送(Message适用,Subscribe不适用, 因为@SendTo识别不到)
     * @return
     */
    @SubscribeMapping("/topic/subscribe/second")
    public ResponseEntity subscribe2(){
        return ResponseEntity.successToResponse("订阅2成功");
    }
    @MessageMapping("/topic/send/second/{name}/{gender}")
    @SendTo("/topic/subscribe/second")
    public ResponseEntity recieve2(String msg,@DestinationVariable("name") String name, @DestinationVariable("gender") String gender){
        log.info("发送2----------:{},{},{}",msg, name, gender);
        return ResponseEntity.successToResponse(msg);
    }

    /**
     * 3.headers传参(Subscribe、Message都适用)
     * 可以通过@Headers取得所有参数,也可以通过@Header单独取出需要的数据
     * @param name
     * @param gender
     * @param headers
     * @return
     */
    @SubscribeMapping("/topic/subscribe/third")
    public ResponseEntity subscribe3(@Header("name")String name, @Header("gender")String gender, @Headers Map<String, Object> headers){
        log.info("订阅3----------:{},{},{}",name, gender, headers);
        return ResponseEntity.successToResponse("订阅3成功");
    }
    @MessageMapping("/topic/send/third")
    @SendTo("/topic/subscribe/third")
    public ResponseEntity recieve3(String msg){
        log.info("发送3:{}", msg);
        
    /**
     * 3.1
     * @return
     */
    @SubscribeMapping("/topic/subscribe/third1")
    public ResponseEntity subscribe31(){
        log.info("订阅3.1");
        return ResponseEntity.successToResponse("订阅3.1成功");
    }
    @MessageMapping("/topic/send/third1")
    @SendTo("/topic/subscribe/third1")
    public ResponseEntity recieve31(String msg, @Header(value = "name")String name, @Header(value = "gender", required = false)String gender, @Headers Map<String, Object> headers){
        log.info("发送3.1----------:{},{},{},{}", msg, name, gender, headers);
        return ResponseEntity.successToResponse(msg);
    }

    /**
     * 点对点的订阅与推送
     * @return
     */
    @SubscribeMapping("/user/topic/order")
    public ResponseEntity subscribe4(){
        log.info("订阅4成功");
        return ResponseEntity.successToResponse("订阅4成功");
    }
    @MessageMapping("/user/topic/message")
    @SendToUser("/topic/order")
    public ResponseEntity recieve4(String msg){
        log.info("发送4----------:{}", msg);
        return ResponseEntity.successToResponse(msg);
    }

}

5.遇到的问题

看着挺简单,做起来净毛病,各种匪夷所思

1.建立连接失败

WebSocketConfig 中,registerStompEndpoints方法的registry.setAllowedOrigins("*"),是用来声明允许跨域的,但是创建连接就报错:

java.lang.IllegalArgumentException: When allowCredentials is true, allowedOrigins cannot contain the special value "*"since that cannot be set on the "Access-Control-Allow-Origin" response header. To allow credentials to a set of origins, list them explicitly or consider using "allowedOriginPatterns" instead.

可以试着换成.setAllowedOrigins("http://{ip}:{port}").

2.订阅失败,Payload value must not be empty

org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public com.kes.common.core.base.ApiResponseBody com.laowang.socket.controller.WebSocketController.subscribe(java.lang.String): 1 error(s): [Error in object 'prjId': codes []; arguments []; default message [Payload value must not be empty]] 

原因是@SubscribeMapping标注的方法定义了入参,在订阅时却没有传参给后端。
两种办法:
(1)@SubscribeMapping标注的方法不定义入参,也不用传参(如果参数可有可无);
(2)如果一定要传参数,可以@SubscribeMapping配合@DestinationVariable注解,和@PathVariable类似

    @SubscribeMapping("/topic/subscribe/{prjId}")
    public ApiResponseBody subscribe(@DestinationVariable("prjId") String prjId){
        log.info("订阅 /first,项目编号 : {}", prjId);
        return ApiResponseBody.defaultSuccess("订阅成功!");
    }

3.发送消息报错(一步一榔头 →_→),nested exception is java.lang.IllegalArgumentException: Expected destination pattern "/user/{userId}/**"

org.springframework.messaging.MessageDeliveryException: Failed to handle GenericMessage [payload=byte[122], headers={simpMessageType=MESSAGE, conversionHint=method 'text' parameter -1, contentType=application/json, simpSessionId=2elhhrww, simpDestination=/user/first}] to org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask@53a64803 in UserDestinationMessageHandler[DefaultUserDestinationResolver[prefix=/user/]]; nested exception is java.lang.IllegalArgumentException: Expected destination pattern "/user/{userId}/**"

原因:增加了@SendTo("/user/first")注解,路径错误,"/user"比较特殊,配置类中,已经将它注册为点对点的端点,registry.setUserDestinationPrefix("/user"),需要用@SendToUser
@SendTo 与 @SendToUser

解决办法:
(1)检查、更正路径;
(2)换成@SendToUser注解

4.@SendToUser无效

    @SubscribeMapping("/user/receive/test")
    public ApiResponseBody subscribe(){
        log.info("订阅2");
        return ApiResponseBody.defaultSuccess("订阅成功!");
    }

    @MessageMapping("/topic/receive2")
    @SendToUser("/receive/test")
    public ApiResponseBody text2(String text){
        log.info("接收消息: {}", text);
        return ApiResponseBody.defaultSuccess("发送信息成功!");
    }

讲道理,当前端发送触发@MessageMapping("/topic/receive2")标注的方法后,返回后应该转发给"/user/receive/test",发送信息到时没问题,但就是转发不到订阅的目标,也不报错,费解。。。。

后来发现和配置有关,registry.enableSimpleBroker("/topic")只配置了 "/topic",所以"/user/receive/test"无法应用于@SendToUser
解决办法:
(1)增加配置,registry.enableSimpleBroker("/topic","/receive");
(2)将路径修改为/topic/test,如下:

    @SubscribeMapping("/user/topic/test")//修改
    public ApiResponseBody subscribe(){
        log.info("订阅2");
        return ApiResponseBody.defaultSuccess("订阅成功!");
    }

    @MessageMapping("/topic/receive2")
    @SendToUser("/topic/test")//修改
    public ApiResponseBody text2(String text){
        log.info("接收消息: {}", text);
        return ApiResponseBody.defaultSuccess("发送信息成功!");
    }

当然前端订阅路径也要修改为 "/user/topic/test"

  1. 前端订阅、发送信息的端点都要以"/"开头,否则会定于不到,而且不报错

相关文章

网友评论

      本文标题:WebSocket详解及实践

      本文链接:https://www.haomeiwen.com/subject/ccvmiktx.html