美文网首页IT@程序员猿媛SpringBoot精选
WebSocket一对一不间断实时接收信息

WebSocket一对一不间断实时接收信息

作者: 程就人生 | 来源:发表于2019-10-11 08:44 被阅读0次
image.png

SpringBoot整合WebSocket做一个自说自话的demo
仅仅能够接收自己发送的消息。今天再做一点扩展,修改为一对一的聊天。但是,有一个问题,如果超时了,链接关闭了怎么办?谁都不希望和好友聊天,停了一会儿,就再也不能发送信息了。让这个demo告诉你怎么办。

环境说明:
SpringBoot2.1.4
HTML5 WebSocket

首先,在pom中引入必要的架包;

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-thymeleaf</artifactId>
    </dependency>
    <!-- springboot和websocket的集成 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.61</version>
    </dependency>

第二步,websocket的config的常规配置;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * websocket配置文件配置
 * @author 程就人生
 * @date 2019年9月23日
 */
@Configuration  
public class WebSocketConfig {  
  
  @Bean  
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();  
    }  
}

第三步,websocket服务端代码;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;

/**
 * websocket聊天信息一对一的发送、接收
 * @author FengJuan
 * @date 2019年9月24日
 * @Description 
 *
 */
@ServerEndpoint("/websocket2/{userUid}")  
@Component
public class WebSocketServer2 {
  
    private static Logger log = LoggerFactory.getLogger(WebSocketServer2.class);
    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
    private static CopyOnWriteArraySet<WebSocketServer2> webSocketSet = new CopyOnWriteArraySet<WebSocketServer2>();  
    //记录在线的用户
    private static Map<String,Session> userWsSession = new HashMap<String,Session>();  
  
    //与某个客户端的连接会话,需要通过它来给客户端发送数据  
    private String userUid;  
  
    /** 
     * 连接建立成功调用的方法
     */  
    @OnOpen  
    public void onOpen(@PathParam("userUid") String userUid, Session session) {
         //获取当前登录的用户
        this.userUid = userUid;
        //记住当前登录的用户
        userWsSession.put(userUid, session);
        //加入set中  
        webSocketSet.add(this);     
        log.info("有新连接加入!当前在线人数为" +  webSocketSet.size());        
    }  
  
    /** 
     * 连接关闭调用的方法 
     */  
    @OnClose  
    public void onClose() {  
      //从缓存中移除
      userWsSession.remove(userUid);
      //从set中删除  
        webSocketSet.remove(this);       
        log.info("有一连接关闭!当前在线人数为" + webSocketSet.size());  
    }  
  
    /** 
     * 收到客户端消息后调用的方法 
     * 
     * @param message 客户端发送过来的消息
     * @param session
     */  
    @SuppressWarnings("static-access")
  @OnMessage  
    public void onMessage(String message, Session session) { 
      log.info("来自客户端的消息:" + message); 
      try{
        //将收到的消息转换成json对象
        JSONObject object = JSONObject.parseObject(message);
        //如果该用户在线,那么就直接发送信息
        if(this.userWsSession.containsKey(object.getString("toUser"))){
          this.userWsSession.get(object.getString("toUser")).getBasicRemote().sendText(object.toJSONString());  
        }else{
          //把未发出去的信息存储起来,等他上线后在发送
          //TODO
        }
      }catch(Exception e){
        e.printStackTrace();
      }
    }  
  
    /** 
     *  
     * @param session 
     * @param error 
     */  
    @OnError  
    public void onError(Session session, Throwable error) {  
        log.error("发生错误");  
        error.printStackTrace();  
    }  

}

第四步,客户端页面编码;

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org" >
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
</head>
<body>
<p>发送信息:<input type="text" id="txt" ></input></p>
<p>接收人:<input type="text" id="toUser" ></input></p>
<p><button id="button" >发送消息</button></p>
<p id="recvContent">

</p>
<script src="http://www.jq22.com/jquery/jquery-1.10.2.js"></script>
<script th:inline="javascript" >
    var userUid=[[${userUid}]];
  <!-- ws客户端   -->
  var socket;  
  var wsUrl = "ws://localhost:8080/websocket2/"+userUid;
  //避免重复连接
  var lockReconnect = false;
  var tt;
  //创建websocket
  createWebSocket();  
    //发送信息回车键
  $("#txt").keydown(function(event){ 
    if(event.keyCode==13){ 
      $("#button").click();
    } 
    }); 
    //创建连接
  function createWebSocket() {
      try {
        if(typeof(WebSocket) == "undefined") {  
            console.log("您的浏览器不支持WebSocket");  
        }else{
          console.log("您的浏览器支持WebSocket");  
        }
          socket = new WebSocket(wsUrl);
          //初始化
          init();
      } catch(e) {
          console.log('catch');
          //异常后重新连接
          reconnect();
      }
  }
  //初始化
  function init() {
    socket.onclose = function () {
        console.log('链接关闭');
        //关闭后重新连接
        reconnect();
    };
    socket.onerror = function() {
        console.log('发生异常了');
        //出错后重新连接
        reconnect();
    };
    socket.onopen = function () {
        //心跳检测重置
        heartCheck.start();
    };
    socket.onmessage = function (event) {
        // 将json字符串转换为对象
        var resData = JSON.parse(event.data);
        console.log(resData);  
        //好友列表初始化
        if(resData!=undefined) {
          $("#recvContent").append('<div style="width:300px;text-align:left;"><span >'+resData.fromUser + '发送:' + resData.content + '</span></div><br/>');
        } 
        heartCheck.start();
    }
  }
  //重新连接
  function reconnect() {
    if(lockReconnect) {
      return;
    };
    lockReconnect = true;
    //没连接上会一直重连,设置延迟避免请求过多
    tt && clearTimeout(tt);
    tt = setTimeout(function () {
      createWebSocket();
      lockReconnect = false;
    }, 4000);
  }
  //心跳检测
  var heartCheck = {
      timeout: 210000,
      timeoutObj: null,
      serverTimeoutObj: null,
      start: function(){
            console.log(getNowTime() +" Socket 心跳检测");  
          var self = this;
          this.timeoutObj && clearTimeout(this.timeoutObj);
          this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
          this.timeoutObj = setTimeout(function(){
              //这里发送一个心跳,后端收到后,返回一个心跳消息,
              //onmessage拿到返回的心跳就说明连接正常
              console.log(getNowTime() +' Socket 连接重试');
              //socket.send("连接成功");
              self.serverTimeoutObj = setTimeout(function() {
                  console.log(socket);
                  socket.close();
              }, self.timeout);
          }, this.timeout)
      }
  }
  //按钮点击事件
  $("#button").click(function(){
    var object={}
    object.content = $("#txt").val();
    object.toUser = $("#toUser").val();
    object.fromUser= userUid;
    $("#txt").val("");
    $("#recvContent").append('<div style="width:300px;text-align:right;"><span >发送给'+object.toUser + ':' + object.content + '</span></div><br/>');
    socket.send(JSON.stringify(object));
  });
    /**
     * 获取系统当前时间
     * @returns
     */
    function p(s) {
        return s < 10 ? '0' + s : s;
    }
    function getNowTime() {
        var myDate = new Date();
        //获取当前年
        var year = myDate.getFullYear();
        //获取当前月
        var month = myDate.getMonth() + 1;
        //获取当前日
        var date = myDate.getDate();
        var h = myDate.getHours();       //获取当前小时数(0-23)
        var m = myDate.getMinutes();     //获取当前分钟数(0-59)
        var s = myDate.getSeconds();
        return year + '-' + p(month) + "-" + p(date) + " " + p(h) + ':' + p(m) + ":" + p(s);
    }
</script>

</body>
</html>

注意:在客户端,也就是前端页面做了心跳检测,如果连接关闭了,则及时重新打开连接,保持通讯道路的畅通。

第五步,Controller层代码,页面的渲染;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;

import com.alibaba.fastjson.JSONObject;
import com.example.demo.vo.User;

/**
 * websocket接口测试模拟
 * @author 程就人生
 * @date 2019年9月24日
 */
@RestController
public class TestController {
/**
   * 模拟聊天,可以看见收发信息
   * @param userUid
   * @return
   */
  @GetMapping("/index2/{userUid}")
  public ModelAndView index2(@PathVariable("userUid") String userUid){
    ModelAndView mv = new ModelAndView("/index2");    
    mv.addObject("userUid", userUid);
    return mv;
  }
}

最后,测试;

测试一览图

总结
即时通讯,要讲究消息能够快速送达,websocket是基于TCP协议的全双工通讯,能够满足这个要求;但是websocket的连接是有时间限制的,如果在这段时间内没有消息的交互,那么服务端就认为连接已关闭。

为了防止这种情况,需要进行心跳检测,服务器端检测客户端的心跳检测是通过发消息进行检测的,这也就产生了一条消息,还需要特殊处理。所以心跳检测最好放在客户端,客户端检测服务器的心跳,在连接关闭的时候,马上重新建立连接,这样就可以实时接收消息了。

相关文章

网友评论

    本文标题:WebSocket一对一不间断实时接收信息

    本文链接:https://www.haomeiwen.com/subject/regnuctx.html