美文网首页
websocket推送信息

websocket推送信息

作者: 半日孤独 | 来源:发表于2020-10-19 11:37 被阅读0次

    1.添加依赖

     <!--websocket依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-websocket</artifactId>
            </dependency>
    

    2.websocket配置

    
    /**
     * 配置WebSocket
     */
    @Configuration
    //注解开启使用STOMP协议来传输基于代理(message broker)的消息,这时控制器支持使用@MessageMapping,就像使用@RequestMapping一样
    @EnableWebSocketMessageBroker
    public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
        @Override
        //注册STOMP协议的节点(endpoint),并映射指定的url
        public void registerStompEndpoints(StompEndpointRegistry registry) {
            //注册一个STOMP的endpoint,并指定使用SockJS协议
            registry.addEndpoint("/endpointOyzc").setAllowedOrigins("*").withSockJS();
        }
    
        @Override
        //配置消息代理(Message Broker)
        public void configureMessageBroker(MessageBrokerRegistry registry) {
            //点对点应配置一个/user消息代理,广播式应配置一个/topic消息代理
            registry.enableSimpleBroker("/topic", "/user");
            //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
            registry.setUserDestinationPrefix("/user");
        }
    }
    

    3.使用

              //使用springboot封装好的调用方法
              @Autowired
              private SimpMessagingTemplate template;
               //给所有人推送
                this.template.convertAndSend("/topic/getResponse", message);
                //单独给某个用户推送
                this.template.convertAndSendToUser("thfyth", "/queue/getResponse", "今天是个好日子");
    

    4.前端

    var stompClient = null; 
        //加载完浏览器后  调用connect(),打开双通道
        $(function(){   
        //打开双通道
        connect()
        })
        //强制关闭浏览器  调用websocket.close(),进行正常关闭
        window.onunload = function() {
            disconnect()
        }
        function connect(){
            
            var socket = new SockJS('http://127.0.0.1:9004/hr-user/endpointOyzc'); //连接SockJS的endpoint名称为"endpointOyzc"
            stompClient = Stomp.over(socket);//使用STMOP子协议的WebSocket客户端
            stompClient.connect({},function(frame){//连接WebSocket服务端     
                console.log('Connected:' + frame);
                //通过stompClient.subscribe订阅/topic/getResponse 目标(destination)发送的消息
               //广播信息
                stompClient.subscribe('/topic/getResponse',function(response){
                    showResponse(JSON.parse(response.body));
                  //个人信息
                 //stompClient.subscribe('user'+'thfyth'+/queue/getResponse',function(response){
                });
            });
        }
     
        //关闭双通道
        function disconnect(){
            if(stompClient != null) {
                stompClient.disconnect();
            }
            console.log("Disconnected");
        }
        function showResponse(message){
            var response = $("#response");
            response.append("<p>"+message.userName+"</p>");
        }
    

    4.gataway集成websocket

    4.1 协议修改

    websocket推送信息前,会进行一个连接握手操作,发送一个/*/info请求,
    但是协议是ws的,所以我们需要进行拦截修改操作
     @Component
    public class WebsocketFilter implements GlobalFilter, Ordered {
        //项目是分布式的,user表示项目名,如不是微服务项目,则去掉
        private final static String DEFAULT_FILTER_PATH = "/user/info";
    
        /**
         * 设置请求方式,将ws协议改为http
         * @param exchange ServerWebExchange是一个HTTP请求-响应交互的契约。提供对HTTP请求和响应的访问,
         *                 并公开额外的 服务器 端处理相关属性和特性,如请求属性
         * @param chain
         * @return
         */
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            URI requestUrl =  exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
            String scheme = requestUrl.getScheme();
            if (!"ws".equals(scheme) && !"wss".equals(scheme)) {
                return chain.filter(exchange);
            } else if (DEFAULT_FILTER_PATH.equals(requestUrl.getPath())) {
                String wsScheme = convertWsToHttp(scheme);
                URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build().toUri();
                exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, wsRequestUrl);
            }
            return chain.filter(exchange);
        }
    
        @Override
        public int getOrder() {
            return Ordered.LOWEST_PRECEDENCE - 2;
        }
    
        static String convertWsToHttp(String scheme) {
            scheme = scheme.toLowerCase();
            return "ws".equals(scheme) ? "http" : "wss".equals(scheme) ? "https" : scheme;
        }
    
    }
    

    4.2 跨域问题配置

    @Component
    public class ResponseFilter implements GlobalFilter, Ordered {
    
    
        private static final String ALL = "*";
        private static final String MAX_AGE = "18000L";
    
        @Bean
        public RouteDefinitionLocator discoveryClientRouteDefinitionLocator(DiscoveryClient discoveryClient,
                                                                            DiscoveryLocatorProperties properties) {
            return new DiscoveryClientRouteDefinitionLocator(discoveryClient, properties);
        }
    
        @Bean
        public ServerCodecConfigurer serverCodecConfigurer() {
            return new DefaultServerCodecConfigurer();
        }
      //添加请求头 
        @Bean
        public WebFilter corsFilter() {
            return (ServerWebExchange ctx, WebFilterChain chain) -> {
                ServerHttpRequest request = ctx.getRequest();
                if (!CorsUtils.isCorsRequest(request)) {
                    return chain.filter(ctx);
                }
                HttpHeaders requestHeaders = request.getHeaders();
                ServerHttpResponse response = ctx.getResponse();
                HttpMethod requestMethod = requestHeaders.getAccessControlRequestMethod();
                HttpHeaders headers = response.getHeaders();
                headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, requestHeaders.getOrigin());
                headers.addAll(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, requestHeaders.getAccessControlRequestHeaders());
                if (requestMethod != null) {
                    headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, requestMethod.name());
                }
                headers.add(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
                headers.add(HttpHeaders.ACCESS_CONTROL_EXPOSE_HEADERS, ALL);
                headers.add(HttpHeaders.ACCESS_CONTROL_MAX_AGE, MAX_AGE);
                if (request.getMethod() == HttpMethod.OPTIONS) {
                    response.setStatusCode(HttpStatus.OK);
                    return Mono.empty();
                }
                return chain.filter(ctx);
            };
        }
      //将请求头中有多个值的去掉 这是该版本的一个bug
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            return chain.filter(exchange).then(Mono.defer(() -> {
                exchange.getResponse().getHeaders().entrySet().stream()
                        .filter(kv -> (kv.getValue() != null && kv.getValue().size() > 1))
                        .filter(kv -> (kv.getKey().equals(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN)
                                || kv.getKey().equals(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS)))
                        .forEach(kv -> {
                            kv.setValue(new ArrayList<String>() {{
                                add(kv.getValue().get(0));
                            }});
                        });
    
                return chain.filter(exchange);
            }));
        }
    
        @Override
        public int getOrder() {
            // 指定此过滤器位于NettyWriteResponseFilter之后
            // 即待处理完响应体后接着处理响应头
            return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER + 1;
        }
    
    }
    

    4.3 路由配置

    gateway:
          discovery:
            locator:
              enabled: true #开启从注册中心动态创建路由的功能,利用微服务名进行路由
          routes:
          - id: user_routh #user_route    #路由的ID,没有固定规则但要求唯一,建议配合服务名
            order: 3
            uri: lb://user #匹配后提供服务的路由地址
            predicates:
            - Path=/api/user/*/**         # 断言,路径相匹配的进行路由
            filters:
            - StripPrefix=1
          - id: user_routh
            uri: lb:ws://user
            order: 2
            predicates:
              - Path=/user/endpointOyzc/**
            filters:
              - StripPrefix=1
    

    5.备注

    在网关使用websocket时,跨域配置中access_control_allow_credentials这个设置为true之后,access_control_allow_origin设置最好不要为'*'

    如有问题,请自觉查找,因为不一定完全匹配。

    相关文章

      网友评论

          本文标题:websocket推送信息

          本文链接:https://www.haomeiwen.com/subject/tarkmktx.html