美文网首页
spring boot 1.x 对 websocket 的支持

spring boot 1.x 对 websocket 的支持

作者: info小欧 | 来源:发表于2019-08-23 11:30 被阅读0次

spring cloud zuul 如何转发 websocket 请求

    网上关于spring boot 使用websocket 的文章很多,但是涉及spring cloud zuul如何转发websocket请求的文章很少,据网上资料显示zuul 1.x不支持websocket,2.x支持。在考虑到当前项目从spring boot 1.x 更换spring boot 2.x 复杂度高,决定根据当前开发版本寻找解决方案(实在不行,开个服务端口就是;方法总比问题多)。

代码解析

    文章围绕github上的一个解决方案(https://github.com/mthizo247/spring-cloud-netflix-zuul-websocket)展开详细描述。作者提供的demo(https://github.com/mthizo247/zuul-websocket-support-demo)可以运行成功,但基于订阅topic广播的样例明显不够,点对点发送或将消息发送到指定客户端的业务场景也很常见,接下来针对websocket广播和点对点消息方式讲解具体的实现细节。
实现逻辑如下图:

websockt.png
  1. 网关添加微服务的endpoint、broken;
  2. 客户端向网关发送websocket请求,并转发订阅微服务websocket;
    /**
     * 网关接收到webSocket-client发送消息,
     * 并向微服务转发websocket请求
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message)
            throws Exception {
        super.handleMessage(session, message);
        handleMessageFromClient(session, message);
    }

    private void handleMessageFromClient(WebSocketSession session,
                                         WebSocketMessage<?> message) throws Exception {
        boolean handled = false;
        WebSocketMessageAccessor accessor = WebSocketMessageAccessor.create(message);
        if (StompCommand.SEND.toString().equalsIgnoreCase(accessor.getCommand())) {
            handled = true;
            sendMessageToProxiedTarget(session, accessor);
        }

        if (StompCommand.SUBSCRIBE.toString().equalsIgnoreCase(accessor.getCommand())) {
            handled = true;
            subscribeToProxiedTarget(session, accessor);
        }

        if (StompCommand.UNSUBSCRIBE.toString().equalsIgnoreCase(accessor.getCommand())) {
            handled = true;
            unsubscribeFromProxiedTarget(session, accessor);
        }

        if (StompCommand.CONNECT.toString().equalsIgnoreCase(accessor.getCommand())) {
            handled = true;
            connectToProxiedTarget(session);
        }

        if (!handled) {
            if (logger.isDebugEnabled()) {
                logger.debug("STOMP COMMAND " + accessor.getCommand()
                        + " was not explicitly handled");
            }
        }
    }

    /**
     * 根据请求获取微服务地址
     * 由ProxyWebSocketConnectionManager 代理websocket 连接
     */
    private void connectToProxiedTarget(WebSocketSession session) {
        URI sessionUri = session.getUri();
        ZuulWebSocketProperties.WsBrokerage wsBrokerage = getWebSocketBrokarage(
                sessionUri);

        Assert.notNull(wsBrokerage, "wsBrokerage must not be null");

        String path = getWebSocketServerPath(wsBrokerage, sessionUri);
        Assert.notNull(path, "Web socket uri path must be null");

        URI routeTarget = proxyTargetResolver.resolveTarget(wsBrokerage);

        Assert.notNull(routeTarget, "routeTarget must not be null");
        
        //微服务配置全局路径的情况下,需要添加微服务名
        path = "/" + wsBrokerage.getId() + path;

        String uri = ServletUriComponentsBuilder
                .fromUri(routeTarget)
                .path(path)
                .replaceQuery(sessionUri.getQuery())
                .toUriString();

        ProxyWebSocketConnectionManager connectionManager = new ProxyWebSocketConnectionManager(
                messagingTemplate, stompClient, session, headersCallback, uri);
        connectionManager.errorHandler(this.errorHandler);
        managers.put(session, connectionManager);
        connectionManager.start();
    }
  1. 网关接收到微服务发送的消息转发到客户端
    /**
     * 接收到微服务信息后调用
     */
    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        if (headers.getDestination() != null) {
            String destination = headers.getDestination();
            if (logger.isDebugEnabled()) {
                logger.debug("Received " + payload + ", To " + headers.getDestination());
            }

            Principal principal = userAgentSession.getPrincipal();
            String userDestinationPrefix = messagingTemplate.getUserDestinationPrefix();
            if (principal != null && destination.startsWith(userDestinationPrefix)) {
                destination = destination.substring(userDestinationPrefix.length());

                destination = destination.startsWith("/") ? destination
                        : "/" + destination;

                messagingTemplate.convertAndSendToUser(principal.getName(), destination,
                        payload, copyHeaders(headers.toSingleValueMap()));
            } else {
                messagingTemplate.convertAndSend(destination, payload,
                        copyHeaders(headers.toSingleValueMap()));
            }
        }
    }

开发实例

     基于spring boot 1.x (spring mvc)实现stomp协议的websocket,并由spring cloud zuul 路由转发。

开发websocket服务

  1. pom引入如下依赖:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.2.RELEASE</version>
    </parent>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Camden.SR5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
                <!---创建一个微服务工程的基础依赖包,网关可不引用-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
                <!--引入eureka 依赖包,将服务注册到注册中心-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
                <!--引入websocket 依赖包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
    </dependencies>

  配置STOMP的服务端点和请求订阅前缀

/**
 * 使用 STOMP 协议
 * @author Golden
 */
@Configuration
@EnableWebSocketMessageBroker
public class StompWebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    /**
     * 注册服务器端点
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //增加 gs-guide-websocket 端点
        registry.addEndpoint("/gs-guide-websocket")
                //添加握手处理器,将客户端传入的session_id封装为Principal对象,从而让服务端能通过getName()方法找到指定客户端
                .setHandshakeHandler(new DefaultHandshakeHandler() {
                    @Override
                    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler,
                            Map<String, Object> attributes) {
                         //【关键】
                        final String sessionid = (String) attributes.get("session_id");
                        Principal principal = new Principal() {
                            @Override
                            public String getName() {
                                return sessionid;
                            }
                        };
                        return principal;
                    }
                })
                // 添加socket拦截器,用于从请求中获取session_id
                .addInterceptors(new CustomHandshakeInterceptor())
                // bypasses spring web security
                .setAllowedOrigins("*").withSockJS();
    }

    /**
     * 定义服务器端点请求和订阅前缀
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // 客户端订阅请求前缀
        config.enableSimpleBroker("/topic","/queue");
        // 服务端点请求前缀
        config.setApplicationDestinationPrefixes("/app");
    }
    
}
/**
 * 添加socket拦截器
 * @author Golden
 */
public class CustomHandshakeInterceptor implements HandshakeInterceptor {

    @Override
    public void afterHandshake(ServerHttpRequest arg0, ServerHttpResponse arg1,
            org.springframework.web.socket.WebSocketHandler arg2, Exception arg3) {
    }

    /**
     * handler处理前调用,attributes属性最终在WebSocketSession里,可能通过webSocketSession.getAttributes().get(key值)获得
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse arg1,
            org.springframework.web.socket.WebSocketHandler arg2, Map<String, Object> attributes) throws Exception {

        if (request instanceof ServletServerHttpRequest) {
            // 【关键】,header中的session_id是通过zuul端创建websocket conenction中传递过来
            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
            
            String session_id = servletRequest.getServletRequest().getHeader("session_id");
//          String session_id = servletRequest.getServletRequest().getParameter("session_id");
            
            attributes.put("session_id", session_id);
            
            return true;
        }
        return true;
    }

}

zuul 网关配置

  spring cloud 使用网关(zuul)需要使用spring-cloud-netflix-zuul-websocket代码,直接引入jar 满足需要。

源码修改

  修改ZuulWebSocketConfiguration 类中的addStompEndpoint方法,添加服务端点的握手处理器、拦截器。
  拦截器从websocket的请求链接requestURI中获取到sockjssession的id,并用于user;握手处理器,将客户端传入的session_id封装为Principal对象,从而让服务端能通过getName()方法找到指定客户端。代码如下:

package com.github.mthizo247.cloud.netflix.zuul.web.socket

public class ZuulWebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer
        implements ApplicationListener<ContextRefreshedEvent> {

    private SockJsServiceRegistration addStompEndpoint(StompEndpointRegistry registry, String... endpoint) {
        return registry.addEndpoint(endpoint)
                // bypasses spring web security
                .setHandshakeHandler(new DefaultHandshakeHandler() {
                    @Override
                    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler,
                            Map<String, Object> attributes) {
                        // 利用client_id用于点对点发送
                        final String sessionId = (String) attributes.get("session_id");
                        Principal principal = new Principal() {
                            @Override
                            public String getName() {
                                return sessionId;
                            }
                        };
                        return principal;
                    }
                })
                .addInterceptors(new HandshakeInterceptor() {

                    @Override
                    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                            WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
                        if (request instanceof ServletServerHttpRequest) {
                            // 从websocket的请求链接requestURI中获取到sockjssession的id,并用于user
                            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
                            String uri = servletRequest.getServletRequest().getRequestURI();
                            System.out.println("----------" + uri);
                            int lastLashIndex = uri.lastIndexOf("/");
                            uri = uri.substring(0, lastLashIndex);
                            uri = uri.substring(uri.lastIndexOf("/") + 1);
                            attributes.put("session_id", uri);
                            return true;
                        }
                        return true;
                    }

                    @Override
                    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
                            WebSocketHandler wsHandler, Exception exception) {

                    }
                })
                .setAllowedOrigins("*").withSockJS();
    }
}

  接着需要将session_id传递给微服务,修改ProxyWebSocketConnectionManager的buildWebSocketHttpHeaders方法,将session_id添加到socket connection的WebSocketHttpHeaders中。

    private WebSocketHttpHeaders buildWebSocketHttpHeaders() {
        WebSocketHttpHeaders wsHeaders = new WebSocketHttpHeaders();
        if (httpHeadersCallback != null) {
            httpHeadersCallback.applyHeaders(userAgentSession, wsHeaders);
            List<String> list = new ArrayList<>();
            list.add(userAgentSession.getId());
            wsHeaders.put("session_id", list);
        }
        return wsHeaders;
    }

  修改完成后再调试的过程中发现点对点发送依然无法接收到消息,网关出现消息转换的异常。通过调试发现订阅topic和点对点两种模式返回的数据类型不一致。

  1. 订阅topic 返回数据类型
    contentType=application/json;charset=UTF-8
  2. 点对点发送返回数据类型
    contentType=text/plain;charset=UTF-8

  紧接着修改ProxyWebSocketConnectionManager中的 getPayloadType方法,添加类型判断,如下:

    @Override
    public Type getPayloadType(StompHeaders headers) {
        String type = headers.getContentType().getType();
        //content-type=[text/plain;charset=UTF-8]
        if("text".equals(type)) {
            return String.class;
        }
        //content-type=[application/json;charset=UTF-8]
        return Object.class;
    }

    代码改造完成.

配置zuul

  引入的pom依赖如下:

        <!--spring boot 1.5.2 -->
        <!--使用spring cloud Camden.SR5-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-zuul</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <!-- 从github 上下载源码码到本地修改,自行打包 -->
        <dependency>
            <groupId>com.github.mthizo247</groupId>
            <artifactId>spring-cloud-netflix-zuul-websocket</artifactId>
            <version>1.0.7.RELEASE</version>
        </dependency>

  yml 指定websocket 端点、订阅路径前缀、服务端点请求前缀

zuul:
  routes:
    web-ui:  # websocket 服务名
      path: /**
      #url: http://localhost:8080 在连接eureka的情况下不需要
      service-id:  web-ui
      customSensitiveHeaders: true
  ws:
    brokerages:
      web-ui:   # websocket 服务名
        end-points: /gs-guide-websocket
        brokers:  /topic,/queue
        destination-prefixes: /app

  启动类添加源码的注解

@SpringBootApplication
@EnableZuulProxy
@EnableAsync
@EnableEurekaClient
@EnableZuulWebSocket
@EnableWebSocketMessageBroker
public class ZuulApplication 
{
    public static void main( String[] args )
    { 
        SpringApplication.run(ZuulApplication.class, args);
    }
    
    @Bean
    @LoadBalanced
    RestTemplate restTemplate() {
        return new RestTemplate();
    }

  前端订阅代码如下:

function connect() {
    var socket = new SockJS('/gs-guide-websocket');
    stompClient = Stomp.over(socket);
    stompClient.connect({}, function (frame) {
        setConnected(true);
        console.log('Connected: ' + frame);
        stompClient.subscribe('/topic/greetings', function (greeting) {
            showGreeting(JSON.parse(greeting.body).content);
        });
        //订阅时间
        stompClient.subscribe('/topic/time', function (greeting) {
            showTime(JSON.parse(greeting.body).content);
        });
        //订阅用户通知消息,/user/ 需要添加
        stompClient.subscribe('/user/queue/customer',function(message){
            console.log("/queue/customer: " + message.body);
            showUserListening(message.body);
        });
    });
}

演示效果:


image.png

最后感谢参考的以下几篇博客
https://blog.csdn.net/weixin_34389926/article/details/86262894
https://www.jianshu.com/p/32fae52c61f6
https://my.oschina.net/u/3706162/blog/1935071

相关文章

网友评论

      本文标题:spring boot 1.x 对 websocket 的支持

      本文链接:https://www.haomeiwen.com/subject/xfdysctx.html