美文网首页
springboot使用websocket创建长连接

springboot使用websocket创建长连接

作者: SeekLife0 | 来源:发表于2022-12-18 19:56 被阅读0次

参考:
(54条消息) websocket简单运用前后端交互_大丸子~的博客-CSDN博客_websocket前后端交互
Nginx配置之WebSocket配置 - 简书 (jianshu.com)

前言:
一些需要即时的操作,比如推送消息,通讯之类的。

依赖

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-websocket</artifactId>
        </dependency>

1、websocket部署到线上需要配置nginx

    server {
        listen       443 ssl;
        server_name 你的网址域名;
    ssl_certificate "/etc/nginx/keys/test/test_server.crt";
        ssl_certificate_key "/etc/nginx/keys/test/test_server.key";
        include /etc/nginx/default.d/*.conf;
        client_max_body_size 20m;
        root         /usr/share/nginx/app;

        location / {
            proxy_pass http://121.37.140.174:8002;
            proxy_set_header X-Forwarded-Proto $scheme;
          proxy_set_header X-Forwarded-Port $server_port;
          proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
          proxy_set_header Upgrade $http_upgrade; #重点
          proxy_set_header Connection "upgrade"; #重点
          proxy_http_version 1.1;
          client_max_body_size 20m;
        }
    }

重点是一下两条:

proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";

2、连接的地址是http对应ws,https对应wss。
把http或者https替换成websocket链接的协议名ws或者wss

3、服务端配置


image.png

GetHttpSessionConfigurator

package co.gpthome.tools.keepalive;

import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import javax.websocket.server.ServerEndpointConfig.Configurator;

public class GetHttpSessionConfigurator extends Configurator{
    @Override
    public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
        HttpSession httpSession = (HttpSession) request.getHttpSession();
        if(httpSession != null){
            config.getUserProperties().put(HttpSession.class.getName(), httpSession);
        }
    }
}

WebSocketConfig

package co.gpthome.tools.keepalive;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

WebSokcet
这通过参数携带过来的userId来判断在线用户,方便后续针对该用户的一些操作。并新写一个返回Json数据格式的方法,一般前端需要接受一个json格式数据。

package co.gpthome.tools.keepalive;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author Administrator
 * @date 2019-1-31 10:39
 */
@Component
@ServerEndpoint(value = "/webSocket/{userId}")
@Slf4j
public class WebSocket {

//    @Autowired
//    private RedisUtils redisUtils;

    private Session session;

    private String userId;
    private String sessionId="";

    //    private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<>();
    private static ConcurrentHashMap<String, WebSocket> webSocketSet = new ConcurrentHashMap<String, WebSocket>();
    /**
     * 线程安全Map
     */
    private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();

    @OnOpen
    public void onOpen(@PathParam(value = "userId") String userId, Session session) {
        //TODO 通过header中获取token,进行check
        WebSocket s2 = null;
        s2 = this;
        webSocketSet.remove(sessionId);
        this.userId = userId;
        webSocketSet.put(userId, s2);
        sessionPool.put(userId,session);
        log.info("【webSocket消息】有新的连接,总数{}", webSocketSet.size());
        if(webSocketSet.size()>0){
            for (String key:  webSocketSet.keySet()) {
                log.debug("当前连接的key:"+key);
            }
        }
    }

    @OnClose
    public void onClose(Session session) {
        webSocketSet.remove(this.userId);
        log.info("【webSocket消息】连接断开,总数{}", webSocketSet.size());
        if(webSocketSet.size()>0){
            for (String key:  webSocketSet.keySet()) {
                log.debug("当前连接的key:"+key);
            }
        }
    }

    @OnMessage
    public void onMessage(String message) {
        log.debug("【webSocket消息】收到客户端发来的消息{}", message);
    }

    public void sendMessage(String message) {
        try {
            if (null != this.session) {
                this.session.getBasicRemote().sendText(message);
//                this.session.getAsyncRemote().sendText(message);
            }
        } catch (Exception e) {
            System.out.println("sendMessage异常:"+e);
            e.printStackTrace();
        }
    }

    /**
     * 当前通websocket绑定userId
     *
     * @param userId 用户Id
     */
    public void updateSessionByUerId(HttpServletRequest request,String userId) {
        HttpSession session = request.getSession();
        String sessionId = session.getId();
        this.sessionId = sessionId;
        WebSocket webSocket = webSocketSet.get(this.sessionId);
        if(null!=webSocket){
            webSocketSet.remove(userId);
            webSocketSet.put(userId, webSocket);
        }
    }

    public void sendUserMessage(String userId, String message) {
        if (null != webSocketSet.get(userId)) {
            webSocketSet.get(userId).sendMessage(message);
            log.debug("【webSocket消息】指定用户广播消息,message={}", message);
            if(webSocketSet.size()>0){
                for (String key:  webSocketSet.keySet()) {
                    log.debug("当前连接的key:"+key);
                }
            }
        } else {
            log.debug("【webSocket消息】未发现指定用户");
        }
    }

    public void sendAllMessage(String message) {
        Set<String> keys = webSocketSet.keySet();
        for (String key : keys) {
            webSocketSet.get(key).sendMessage(message);
        }
    }

    /**
     * 校验当前用户是否在线
     * @param userId 用户id
     * @return true:用户在线;false:用户还在线
     */
    public boolean checkUserOnline(String userId){
        Object obj = webSocketSet.get(userId);
        if(null!=obj){
            return true;
        }
        return false;
    }

    /**
     * 服务端推送消息-简单字符串
     *
     * @param userId
     * @param message
     */
    public void pushMessage(String userId, String message) {
        Session session = sessionPool.get(userId);
        if (session != null && session.isOpen()) {
            try {
                //update-begin-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
                synchronized (session){
                    log.info("【websocket消息】 单点消息:" + message);
                    session.getBasicRemote().sendText(message);
                }
                //update-end-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * @author seeklife
     * @date 2022/12/10 9:21
     * @param userId
     * @param message
     */
    public void pushJsonMessage(String userId, Map<String,String> message) {
        Session session = sessionPool.get(userId);
        if (session != null && session.isOpen()) {
            try {
                //update-begin-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
                synchronized (session){
                    log.info("【websocket消息】 单点消息:" + message);
                    String result = JSONObject.toJSONString(message);
                    session.getBasicRemote().sendText(result);
                }
                //update-end-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}


相关文章

网友评论

      本文标题:springboot使用websocket创建长连接

      本文链接:https://www.haomeiwen.com/subject/dpveqdtx.html