spring cloud zuul 如何转发 websocket 请求
网上关于spring boot 使用websocket 的文章很多,但是涉及spring cloud zuul如何转发websocket请求的文章很少,据网上资料显示zuul 1.x不支持websocket,2.x支持。在考虑到当前项目从spring boot 1.x 更换spring boot 2.x 复杂度高,决定根据当前开发版本寻找解决方案(实在不行,开个服务端口就是;方法总比问题多)。
代码解析
文章围绕github上的一个解决方案(https://github.com/mthizo247/spring-cloud-netflix-zuul-websocket)展开详细描述。作者提供的demo(https://github.com/mthizo247/zuul-websocket-support-demo)可以运行成功,但基于订阅topic广播的样例明显不够,点对点发送或将消息发送到指定客户端的业务场景也很常见,接下来针对websocket广播和点对点消息方式讲解具体的实现细节。
实现逻辑如下图:
![](https://img.haomeiwen.com/i17664280/196c51911c90719d.png)
- 网关添加微服务的endpoint、broken;
- 客户端向网关发送websocket请求,并转发订阅微服务websocket;
/**
* 网关接收到webSocket-client发送消息,
* 并向微服务转发websocket请求
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message)
throws Exception {
super.handleMessage(session, message);
handleMessageFromClient(session, message);
}
private void handleMessageFromClient(WebSocketSession session,
WebSocketMessage<?> message) throws Exception {
boolean handled = false;
WebSocketMessageAccessor accessor = WebSocketMessageAccessor.create(message);
if (StompCommand.SEND.toString().equalsIgnoreCase(accessor.getCommand())) {
handled = true;
sendMessageToProxiedTarget(session, accessor);
}
if (StompCommand.SUBSCRIBE.toString().equalsIgnoreCase(accessor.getCommand())) {
handled = true;
subscribeToProxiedTarget(session, accessor);
}
if (StompCommand.UNSUBSCRIBE.toString().equalsIgnoreCase(accessor.getCommand())) {
handled = true;
unsubscribeFromProxiedTarget(session, accessor);
}
if (StompCommand.CONNECT.toString().equalsIgnoreCase(accessor.getCommand())) {
handled = true;
connectToProxiedTarget(session);
}
if (!handled) {
if (logger.isDebugEnabled()) {
logger.debug("STOMP COMMAND " + accessor.getCommand()
+ " was not explicitly handled");
}
}
}
/**
* 根据请求获取微服务地址
* 由ProxyWebSocketConnectionManager 代理websocket 连接
*/
private void connectToProxiedTarget(WebSocketSession session) {
URI sessionUri = session.getUri();
ZuulWebSocketProperties.WsBrokerage wsBrokerage = getWebSocketBrokarage(
sessionUri);
Assert.notNull(wsBrokerage, "wsBrokerage must not be null");
String path = getWebSocketServerPath(wsBrokerage, sessionUri);
Assert.notNull(path, "Web socket uri path must be null");
URI routeTarget = proxyTargetResolver.resolveTarget(wsBrokerage);
Assert.notNull(routeTarget, "routeTarget must not be null");
//微服务配置全局路径的情况下,需要添加微服务名
path = "/" + wsBrokerage.getId() + path;
String uri = ServletUriComponentsBuilder
.fromUri(routeTarget)
.path(path)
.replaceQuery(sessionUri.getQuery())
.toUriString();
ProxyWebSocketConnectionManager connectionManager = new ProxyWebSocketConnectionManager(
messagingTemplate, stompClient, session, headersCallback, uri);
connectionManager.errorHandler(this.errorHandler);
managers.put(session, connectionManager);
connectionManager.start();
}
- 网关接收到微服务发送的消息转发到客户端
/**
* 接收到微服务信息后调用
*/
@Override
public void handleFrame(StompHeaders headers, Object payload) {
if (headers.getDestination() != null) {
String destination = headers.getDestination();
if (logger.isDebugEnabled()) {
logger.debug("Received " + payload + ", To " + headers.getDestination());
}
Principal principal = userAgentSession.getPrincipal();
String userDestinationPrefix = messagingTemplate.getUserDestinationPrefix();
if (principal != null && destination.startsWith(userDestinationPrefix)) {
destination = destination.substring(userDestinationPrefix.length());
destination = destination.startsWith("/") ? destination
: "/" + destination;
messagingTemplate.convertAndSendToUser(principal.getName(), destination,
payload, copyHeaders(headers.toSingleValueMap()));
} else {
messagingTemplate.convertAndSend(destination, payload,
copyHeaders(headers.toSingleValueMap()));
}
}
}
开发实例
基于spring boot 1.x (spring mvc)实现stomp协议的websocket,并由spring cloud zuul 路由转发。
开发websocket服务
1. pom引入如下依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!---创建一个微服务工程的基础依赖包,网关可不引用-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--引入eureka 依赖包,将服务注册到注册中心-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<!--引入websocket 依赖包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
配置STOMP的服务端点和请求订阅前缀
/**
* 使用 STOMP 协议
* @author Golden
*/
@Configuration
@EnableWebSocketMessageBroker
public class StompWebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
/**
* 注册服务器端点
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//增加 gs-guide-websocket 端点
registry.addEndpoint("/gs-guide-websocket")
//添加握手处理器,将客户端传入的session_id封装为Principal对象,从而让服务端能通过getName()方法找到指定客户端
.setHandshakeHandler(new DefaultHandshakeHandler() {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler,
Map<String, Object> attributes) {
//【关键】
final String sessionid = (String) attributes.get("session_id");
Principal principal = new Principal() {
@Override
public String getName() {
return sessionid;
}
};
return principal;
}
})
// 添加socket拦截器,用于从请求中获取session_id
.addInterceptors(new CustomHandshakeInterceptor())
// bypasses spring web security
.setAllowedOrigins("*").withSockJS();
}
/**
* 定义服务器端点请求和订阅前缀
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 客户端订阅请求前缀
config.enableSimpleBroker("/topic","/queue");
// 服务端点请求前缀
config.setApplicationDestinationPrefixes("/app");
}
}
/**
* 添加socket拦截器
* @author Golden
*/
public class CustomHandshakeInterceptor implements HandshakeInterceptor {
@Override
public void afterHandshake(ServerHttpRequest arg0, ServerHttpResponse arg1,
org.springframework.web.socket.WebSocketHandler arg2, Exception arg3) {
}
/**
* handler处理前调用,attributes属性最终在WebSocketSession里,可能通过webSocketSession.getAttributes().get(key值)获得
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse arg1,
org.springframework.web.socket.WebSocketHandler arg2, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
// 【关键】,header中的session_id是通过zuul端创建websocket conenction中传递过来
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
String session_id = servletRequest.getServletRequest().getHeader("session_id");
// String session_id = servletRequest.getServletRequest().getParameter("session_id");
attributes.put("session_id", session_id);
return true;
}
return true;
}
}
zuul 网关配置
spring cloud 使用网关(zuul)需要使用spring-cloud-netflix-zuul-websocket代码,直接引入jar 满足需要。
源码修改
修改ZuulWebSocketConfiguration 类中的addStompEndpoint方法,添加服务端点的握手处理器、拦截器。
拦截器从websocket的请求链接requestURI中获取到sockjssession的id,并用于user;握手处理器,将客户端传入的session_id封装为Principal对象,从而让服务端能通过getName()方法找到指定客户端。代码如下:
package com.github.mthizo247.cloud.netflix.zuul.web.socket
public class ZuulWebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer
implements ApplicationListener<ContextRefreshedEvent> {
private SockJsServiceRegistration addStompEndpoint(StompEndpointRegistry registry, String... endpoint) {
return registry.addEndpoint(endpoint)
// bypasses spring web security
.setHandshakeHandler(new DefaultHandshakeHandler() {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler,
Map<String, Object> attributes) {
// 利用client_id用于点对点发送
final String sessionId = (String) attributes.get("session_id");
Principal principal = new Principal() {
@Override
public String getName() {
return sessionId;
}
};
return principal;
}
})
.addInterceptors(new HandshakeInterceptor() {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
// 从websocket的请求链接requestURI中获取到sockjssession的id,并用于user
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
String uri = servletRequest.getServletRequest().getRequestURI();
System.out.println("----------" + uri);
int lastLashIndex = uri.lastIndexOf("/");
uri = uri.substring(0, lastLashIndex);
uri = uri.substring(uri.lastIndexOf("/") + 1);
attributes.put("session_id", uri);
return true;
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {
}
})
.setAllowedOrigins("*").withSockJS();
}
}
接着需要将session_id传递给微服务,修改ProxyWebSocketConnectionManager的buildWebSocketHttpHeaders方法,将session_id添加到socket connection的WebSocketHttpHeaders中。
private WebSocketHttpHeaders buildWebSocketHttpHeaders() {
WebSocketHttpHeaders wsHeaders = new WebSocketHttpHeaders();
if (httpHeadersCallback != null) {
httpHeadersCallback.applyHeaders(userAgentSession, wsHeaders);
List<String> list = new ArrayList<>();
list.add(userAgentSession.getId());
wsHeaders.put("session_id", list);
}
return wsHeaders;
}
修改完成后再调试的过程中发现点对点发送依然无法接收到消息,网关出现消息转换的异常。通过调试发现订阅topic和点对点两种模式返回的数据类型不一致。
- 订阅topic 返回数据类型
contentType=application/json;charset=UTF-8- 点对点发送返回数据类型
contentType=text/plain;charset=UTF-8
紧接着修改ProxyWebSocketConnectionManager中的 getPayloadType方法,添加类型判断,如下:
@Override
public Type getPayloadType(StompHeaders headers) {
String type = headers.getContentType().getType();
//content-type=[text/plain;charset=UTF-8]
if("text".equals(type)) {
return String.class;
}
//content-type=[application/json;charset=UTF-8]
return Object.class;
}
代码改造完成.
配置zuul
引入的pom依赖如下:
<!--spring boot 1.5.2 -->
<!--使用spring cloud Camden.SR5-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zuul</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- 从github 上下载源码码到本地修改,自行打包 -->
<dependency>
<groupId>com.github.mthizo247</groupId>
<artifactId>spring-cloud-netflix-zuul-websocket</artifactId>
<version>1.0.7.RELEASE</version>
</dependency>
yml 指定websocket 端点、订阅路径前缀、服务端点请求前缀
zuul:
routes:
web-ui: # websocket 服务名
path: /**
#url: http://localhost:8080 在连接eureka的情况下不需要
service-id: web-ui
customSensitiveHeaders: true
ws:
brokerages:
web-ui: # websocket 服务名
end-points: /gs-guide-websocket
brokers: /topic,/queue
destination-prefixes: /app
启动类添加源码的注解
@SpringBootApplication
@EnableZuulProxy
@EnableAsync
@EnableEurekaClient
@EnableZuulWebSocket
@EnableWebSocketMessageBroker
public class ZuulApplication
{
public static void main( String[] args )
{
SpringApplication.run(ZuulApplication.class, args);
}
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
前端订阅代码如下:
function connect() {
var socket = new SockJS('/gs-guide-websocket');
stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
setConnected(true);
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/greetings', function (greeting) {
showGreeting(JSON.parse(greeting.body).content);
});
//订阅时间
stompClient.subscribe('/topic/time', function (greeting) {
showTime(JSON.parse(greeting.body).content);
});
//订阅用户通知消息,/user/ 需要添加
stompClient.subscribe('/user/queue/customer',function(message){
console.log("/queue/customer: " + message.body);
showUserListening(message.body);
});
});
}
演示效果:
![](https://img.haomeiwen.com/i17664280/2715542371ab91c6.png)
最后感谢参考的以下几篇博客
https://blog.csdn.net/weixin_34389926/article/details/86262894
https://www.jianshu.com/p/32fae52c61f6
https://my.oschina.net/u/3706162/blog/1935071
网友评论