STOMP协议的java客户端实现
背景介绍
- 如果某个业务场景是需要通过数据库中某个表的更新,触发后台对应的方法去取出数据推送到前台,如果是写原生websocket客户端又必须新建一个服务,如果是socket也得重新写一个服务端,所以最好是有一个java版的stomp客户端。还好万能的spring爸爸在设计它的时候也考虑到了这一点,因此也提供了创建java客户端的方式。
直接上代码:
package seisys.rpf.StompClient;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSession.Receiptable;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
import org.springframework.web.socket.sockjs.frame.Jackson2SockJsMessageCodec;
public class Client {
final static Logger LOGGER=LoggerFactory.getLogger(Client.class);
private final static WebSocketHttpHeaders headers = new WebSocketHttpHeaders();//请求头
private WebSocketStompClient client=null;//stomp客户端
private SockJsClient SockJsClient=null;//socket客户端
private ThreadPoolTaskScheduler Ttask=null;//连接池
private StompSession session=null;//连接会话
private static Map<String, String> WebSocketConfig;//配置参数
public volatile boolean RecvFlag=false;//期待的返回标志位,当收到的消息与配置中exceptionRecv相等时为true
public static void main(String[] args) throws Exception {
String sendMsg="我是java版的stomp over websocket的客户端";
sendMsg=(args!=null&&args.length!=0)? args[0]:sendMsg;
Client myClient = new Client();
//读取配置文件
WebSocketConfig=myClient.readConfig();
if (WebSocketConfig!=null) {
//连接到客户端
myClient.runStompClient(myClient, WebSocketConfig.get("URI"),WebSocketConfig.get("subscribe"),WebSocketConfig.get("send"),sendMsg);
LOGGER.info("成功使用配置文件加载客户端");
}else {//默认参数连接到客户端
myClient.runStompClient(myClient,
"ws://localhost:8080/StompWithSSM/stomp",
"/topic/message","/app/send",sendMsg);
LOGGER.info("使用默认参数加载客户端");
}
while (!myClient.RecvFlag) {
//持续等待返回标志位为true
LOGGER.info("-------------------持续等待返回验证消息中……,当前flag:"+myClient.RecvFlag);
Thread.sleep(3000);
}
//关闭所有连接终止程序
myClient.Ttask.destroy();
myClient.SockJsClient.stop();
myClient.client.stop();
myClient.session.disconnect();
System.exit(0);
}
public void runStompClient(Client client,String URI,String subscribe,String send, final String sendMsg) throws ExecutionException, InterruptedException, UnsupportedEncodingException{
//连接到对应的endpoint点上,也就是建立起websocket连接
ListenableFuture<StompSession> f = client.connect(URI);
//建立成功后返回一个stomp协议的会话
StompSession stompSession = f.get();
LOGGER.info("Subscribing to greeting topic using session " + stompSession);
//绑定订阅的消息地址subscribe
client.subscribeGreetings(subscribe, stompSession);
//设置Receipt头,不设置无法接受返回消息
stompSession.setAutoReceipt(true);
//绑定发送的的地址send,注意这里使用的字节方式发送数据
Receiptable rec= stompSession.send(send,sendMsg.getBytes("UTF-8"));
//添加消息发送成功的回调
rec.addReceiptLostTask(new Runnable() {
public void run() {
LOGGER.info("消息发送成功,发送内容为:"+sendMsg);
}
});
}
public ListenableFuture<StompSession> connect(String url) {
Transport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
List<Transport> transports = Collections.singletonList(webSocketTransport);
SockJsClient sockJsClient = new SockJsClient(transports);
//设置对应的解码器,理论支持任意的pojo自带转json格式发送,这里只使用字节方式发送和接收数据
sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
stompClient.setReceiptTimeLimit(300);
stompClient.setDefaultHeartbeat(new long[]{10000l,10000l});
ThreadPoolTaskScheduler task=new ThreadPoolTaskScheduler();
task.initialize();
stompClient.setTaskScheduler(task);
client=stompClient;
SockJsClient=sockJsClient;
Ttask=task;
return stompClient.connect(url, headers, new MyHandler(), "localhost", 8080);
}
public void subscribeGreetings(String url, StompSession stompSession) throws ExecutionException, InterruptedException {
stompSession.subscribe(url, new StompFrameHandler() {
public Type getPayloadType(StompHeaders stompHeaders) {
return byte[].class;//设置订阅到消息用字节方式接收
}
public void handleFrame(StompHeaders stompHeaders, Object o) {
String recv=null;
try {
recv = new String((byte[]) o,"UTF-8");
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
LOGGER.info("收到返回的消息" + recv);
if (WebSocketConfig!=null&&recv.equals("exceptionRecv")) {
RecvFlag=true;
}else if (recv.equals("success")) {
RecvFlag=true;
}
}
});
}
private class MyHandler extends StompSessionHandlerAdapter {
public void afterConnected(StompSession stompSession, StompHeaders stompHeaders) {
session=stompSession;
LOGGER.info("连接成功");
}
@Override
public void handleTransportError(StompSession session, Throwable exception) {
LOGGER.error("连接出现异常");
exception.printStackTrace();
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
super.handleFrame(headers, payload);
LOGGER.info("=========================handleFrame");
}
}
private Map<String, String> readConfig() {
Map<String, String> ConfigMap=null;
String [] keys= {"URI","subscribe","send","exceptionRecv"};
//D:\\dkWorkSpace\\Java\\SocketGettingStart\\StompClient\\WebSocketConfig.properties
File file =new File("src/resource/WebSocketConfig.properties");
if (file.exists()) {
LOGGER.info("开始读取配置文件");
ConfigMap=new HashMap<String, String>();
FileInputStream FIS=null;
InputStreamReader ISReader=null;
BufferedReader reader=null;
try {
FIS=new FileInputStream(file);
ISReader=new InputStreamReader(FIS,"UTF-8");
reader=new BufferedReader(ISReader);
String readline=null;
LOGGER.info("开始按行读取配置文件");
while ((readline=reader.readLine())!=null) {
LOGGER.info("当前行内容:"+readline);
String readStr []=readline.split("=");
if (readStr==null||readStr.length!=2) {
LOGGER.error("配置文件格式不符合规范,必须一行一个配置,并用‘=’分割,当前行内容:"+readline);
}
ConfigMap.put(readStr[0], readStr[1]);
}
LOGGER.info("文件读取完成,最终的配置信息:"+ConfigMap);
boolean notice=false;
for (int i = 0; i < keys.length; i++) {
if (!ConfigMap.containsKey(keys[i])) {
LOGGER.error("缺少对关键参数:"+keys[i]+"的配置,配置将无法生效");
notice=true;
}
}
ConfigMap=notice? null:ConfigMap;
} catch (Exception e) {
LOGGER.info("文件读取过程发生异常:"+e.getMessage());
}finally{
if (reader!=null) {
try {
FIS.close();
ISReader.close();
reader.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}else {
LOGGER.info("不存在配置文件,请检查路径:");
LOGGER.info("开始使用默认socketConfig");
}
return ConfigMap;
}
}
经过测试,完全是既可以收消息,也可以发消息的。发送的消息统一使用的byte的方式发送,因为实在搞不懂spring提供的消息转换容器(
stompClientsetMessageCover()
)的用法,它的作用是自动的将发送的或是消息对象(pojo),给你自动序列化传输,你接收消息的时候也自动根据设置的pojo反射,把消息封装到对象中,很强大的功能。而byte类型则可以直接传输。
测试结果:
stomp客户端发送消息
[12:29:18:965] [INFO] - seisys.rpf.StompClient.Client.runStompClient(Client.java:102) - -------------------持续等待返回验证消息中……,当前flag:false
[12:29:19:266] [INFO] - seisys.rpf.StompClient.Client$1.run(Client.java:99) - 消息发送成功,发送内容为:我是java版的stomp over websocket的客户端
[12:29:21:967] [INFO] - seisys.rpf.StompClient.Client.runStompClient(Client.java:102) - -------------------持续等待返回验证消息中……,当前flag:false
[12:29:22:266] [INFO] - seisys.rpf.StompClient.Client$1.run(Client.java:99) - 消息发送成功,发送内容为:我是java版的stomp over websocket的客户端
[12:29:24:968] [INFO] - seisys.rpf.StompClient.Client.runStompClient(Client.java:102) - -------------------持续等待返回验证消息中……,当前flag:false
[12:29:25:268] [INFO] - seisys.rpf.StompClient.Client$1.run(Client.java:99) - 消息发送成功,发送内容为:我是java版的stomp over websocket的客户端
[12:29:27:969] [INFO] - seisys.rpf.StompClient.Client.runStompClient(Client.java:102) - -------------------持续等待返回验证消息中……,当前flag:false
[12:29:28:270] [INFO] - seisys.rpf.StompClient.Client$1.run(Client.java:99) - 消息发送成功,发送内容为:我是java版的stomp over websocket的客户端
[12:29:30:972] [INFO] - seisys.rpf.StompClient.Client.runStompClient(Client.java:102) - -------------------持续等待返回验证消息中……,当前flag:false
[12:29:31:271] [INFO] - seisys.rpf.StompClient.Client$1.run(Client.java:99) - 消息发送成功,发送内容为:我是java版的stomp over websocket的客户端
stomp服务端的PresenceChannelInterceptor.afterSendCompletion
方法拦截到的消息数据为:
当前的User: FastPrincipal [name=, destinations=null, privateDes=null]
主题地址:/topic/sub
sent: true
消息内容: 我是java版的stomp over websocket的客户端
First NativeHeader:null
name---> null
FastPrincipal [name=, destinations=null, privateDes=null]
destination:/topic/sub
当前状态码:SEND
afterSendCompletion
[12:29:30:971][ cn.seisys.rpf.StompConfig.PresenceChannelInterceptor.postSend(PresenceChannelInterceptor.java:90)] {simpMessageType=MESSAGE, stompCommand=SEND, nativeHeaders={destination=[/topic/sub], receipt=[4], content-length=[48]}, simpSessionAttributes={name=}, simpHeartbeat=[J@74461ef8, simpUser=FastPrincipal [name=, destinations=null, privateDes=null], simpSessionId=355fc42722ba48128a0757b4edf74397, simpDestination=/topic/sub}
当前的User: FastPrincipal [name=, destinations=null, privateDes=null]
主题地址:/topic/sub
sent: true
消息内容: 我是java版的stomp over websocket的客户端
在订阅同一个地址的web端也能直接接收到消息:
<<< MESSAGE
destination:/topic/sub
receipt:39
subscription:sub-0
message-id:hqqgsrpb-13
content-length:48
我是java版的stomp over websocket的客户端
当web发送指定指令exceptionRecv的时候,stomp的java客户端对消息解码后,断开连接。
首先是web在指定地址 /topic/response
发送指定的shutdown内容exceptionRecv
然后在stomp客户端的subscribeGreetings方法中通过指定订阅地址和建立好连接后返回的stompSession绑定监听:
public void subscribeGreetings(String url, StompSession stompSession) throws ExecutionException, InterruptedException {
stompSession.subscribe(url, new StompFrameHandler() {
public Type getPayloadType(StompHeaders stompHeaders) {
return byte[].class;//设置订阅到消息用字节方式接收
}
public void handleFrame(StompHeaders stompHeaders, Object o) {
String recv=null;
try {
recv = new String((byte[]) o,"UTF-8");
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
LOGGER.info("收到返回的消息" + recv);
if (WebSocketConfig!=null&&recv.equals("exceptionRecv")) {
RecvFlag=true;
}else if (recv.equals("success")) {
RecvFlag=true;
}
}
});
}
调试打印结果:
可以看到客户端也成功读取到其他客户端在订阅地址中丢进来的消息,并根据消息内容shutdown连接
本客户端分两部分,一部分是读取配置文件,也就是根据配置文件去绑定的端点,和订阅发送的地址,一部分是纯客户端部分,如果没有配置文件就会使用默认的配置去连接到服务器。
最后贴出pom 依赖:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>5.0.7.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
<version>5.0.7.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>javax-websocket-server-impl</artifactId>
<version>9.4.7.v20170914</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-slf4j-impl -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.6</version>
</dependency>
</dependencies>
总结:
通过java客户端主要服务于数据库去推送消息到后台,甚至是前端,假如你的业务中某个表的数据实时性要求较高,你就可以尝试将java客户端打包,然后通过触发器调用客户端去通知后台获取数据,后台再将获取到的数据处理好后推送到前端显示。这样不仅性能上只是牺牲下websocket建立连接占用的一点带宽,对于服务器的性能占用相对于轮询来说会小很多很多。
网友评论