美文网首页
Springboot+WebSocket+Reids SUBSC

Springboot+WebSocket+Reids SUBSC

作者: 昵称已使用换一个吧 | 来源:发表于2023-06-16 18:02 被阅读0次

    一、使用场景
    前端实时展示Tail信息
    二、WebSocket注解介绍
    当使用 Spring WebSocket 来实现 WebSocket 端点时,可以使用 @ServerEndpoint 注解来标记 WebSocket 端点类,并在该类中实现相应的 WebSocket 处理方法。
    1.@ServerEndpoint 注解:@ServerEndpoint 注解用于标记一个类作为 WebSocket 端点。它将指定一个 URI,客户端可以使用该 URI 连接到 WebSocket 端点。示例代码如下:

    java
    import javax.websocket.*;
    import javax.websocket.server.ServerEndpoint;
    
    @ServerEndpoint("/websocket")
    public class MyWebSocketEndpoint {
        // ...
    }
    

    上述代码中,@ServerEndpoint("/websocket") 表示将 MyWebSocketEndpoint 类标记为一个 WebSocket 端点,并使用 /websocket 作为客户端连接的 URI。

    2.@OnOpen 注解:@OnOpen 注解用于标记一个方法,在客户端与 WebSocket 端点成功建立连接后执行。该方法可以包含一个 Session 参数,用于表示客户端的 WebSocket 会话。示例代码如下:

    java
    Copy code
    @ServerEndpoint("/websocket")
    public class MyWebSocketEndpoint {
    
        @OnOpen
        public void onOpen(Session session) {
            // 处理连接建立后的逻辑
        }
    
    }
    

    在上述示例中,当客户端与 WebSocket 端点建立连接后,onOpen() 方法将被调用,可以在该方法中执行连接建立后的逻辑处理。

    3.@OnMessage 注解:@OnMessage 注解用于标记一个方法,在接收到客户端发送的消息时执行。该方法可以包含一个 Session 参数表示客户端的 WebSocket 会话,以及一个 String 或 byte[] 参数表示接收到的消息内容。示例代码如下:

    java
    Copy code
    @ServerEndpoint("/websocket")
    public class MyWebSocketEndpoint {
    
        @OnMessage
        public void onMessage(Session session, String message) {
            // 处理接收到的消息
        }
    
    }
    

    在上述示例中,当客户端发送消息到 WebSocket 端点时,onMessage() 方法将被调用,可以在该方法中处理接收到的消息。

    4.@OnClose 注解:@OnClose 注解用于标记一个方法,在客户端与 WebSocket 端点断开连接时执行。该方法可以包含一个 Session 参数,用于表示客户端的 WebSocket 会话。示例代码如下:

    java
    Copy code
    @ServerEndpoint("/websocket")
    public class MyWebSocketEndpoint {
    
        @OnClose
        public void onClose(Session session) {
            // 处理连接关闭后的逻辑
        }
    
    }
    

    在上述示例中,当客户端与 WebSocket 端点断开连接时,onClose() 方法将被调用,可以在该方法中执行连接关闭后的逻辑处理。

    通过使用 @ServerEndpoint 注解和相应的 WebSocket 处理方法,您可以创建一个基于 Spring WebSocket 的 WebSocket 端点,并处理与客户端的连接、消息发送和连接断开等操作。

    三、需要注意的地方
    1.有个地方需要注意,使用@ServerEndpoint注解后,这个Controller还需要用@Component注解吗?
    当使用@ServerEndpoint("/websocket")注解标记一个类作为WebSocket端点时,该类已经被隐式地视为一个Spring组件,不需要额外添加@Component注解。

    @ServerEndpoint注解本身具有@Component注解的语义,因此被标记为WebSocket端点的类将被Spring自动扫描和管理,无需显式添加@Component注解。

    因此,您只需使用@ServerEndpoint("/websocket")注解来标记WebSocket端点类,无需添加其他Spring组件相关的注解。Spring将会自动识别和管理WebSocket端点。

    可是我在项目中去掉在Controller中@Component注解,用postman测试就404了,wscat也不好使,这是一个让我疑惑的地方,大家有好的实践结果也请评论一下,大家一块探讨一下。

    四、实践和应用:通过学到WebSocket内容结合实际的项目开发遇到的问题以及解决方案。
    问题一:在SpringBoot中使用WebSocket,类中无法通过@Resource和@Autowired注解,将Spring中的Bean注入,因为WebSocket容器和 Spring 容器是两个独立的容器,各自负责不同的任务。WebSocket 容器负责管理 WebSocket端点、处理WebSocket连接等 WebSocket 相关的功能。Spring 容器负责管理Spring 管理的 Bean、依赖注入、AOP等 Spring 框架相关的功能
    为了实现 WebSocket端点与Spring 容器的整合,Spring 提供了·ApplicationContextAware接口,并在 WebSocket 容器初始化时与 Spring 容器进行协作。具体流程如下:
    1.WebSocket 容器检测到需要实例化的“@ServerEndpoint’类,并进行实例化
    2.如果该“@ServerEndpoint'类实现了·ApplicationContextAware'接口,WebSocket容器会通过反射机制调用其setApplicationContext() 方法
    3.WebSocket容器会将Spring 的应用程序上下文对象传递给·setApplicationContext()方法,以供@ServerEndpoint’类使用。这种协议的实现使得在“@ServerEndpoint'类中可以访问Spring 的应用程序上下文,以便获取Spring 管理的 Bean、进行依赖注入等操作,需要注意的是当项目启动后会进入到setApplicationContext()方法中,之后每次访问WebSocket的接口,均不在进入setApplicationContext()方法,所以ApplicationContext需要用static修饰。
    问题二:在联调项目的时候,前端无法通过Hear传递Token,结合项目排查和测试,确实不能在Hear中传递Token。只能将Token当作参数传递到接口中,通过Token,反向获取Shiro中的用户信息,完成校验数据。
    问题三:在本地联调完毕后,部署测试环境,发现用WS方式请求接口,前端抛出了This request has beenblocked; this endpoint must be available over WSS.的异常。WS是不安全的方式,希望通过WSS的方式请求接口。要将WebSocket(@ServerEndpoint)改为使用WSS(WebSocket over SSL / TLS),需要执行以下步骤:
    1.获取SSL证书:您需要获得一个有效的SSL证书。通常,您可以从证书颁发机构(Certificate Authority,CA)获取SSL证书。
    2.配置服务器:在服务器上,您需要配置SSL连接以使用您的SSL证书。这涉及到配置服务器软件(如Nginx、Apache等)来处理SSL连接。

    五、代码逻辑,项目中有Shiro框架
    1.pom

          <!-- Spring Boot Websocket -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-websocket</artifactId>
            </dependency>
    

    2.Application

      @Bean
        public ServerEndpointExporter serverEndpointExporter() {
            return new ServerEndpointExporter();
        }
    

    3.Controller

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.log4j.Log4j2;
    
    import org.apache.shiro.session.mgt.SimpleSession;
    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.stereotype.Component;
    import org.springframework.util.CollectionUtils;
    import org.springframework.util.SerializationUtils;
    import redis.clients.jedis.JedisCluster;
    import redis.clients.jedis.JedisPubSub;
    
    import javax.websocket.OnClose;
    import javax.websocket.OnOpen;
    import javax.websocket.Session;
    import javax.websocket.server.PathParam;
    import javax.websocket.server.ServerEndpoint;
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicInteger;
    
    @Log4j2
    @Component
    @ServerEndpoint(value = "/data/ail/{id}/{did}/{token}")
    public class TailDetailController implements ApplicationContextAware {
        private static final int ONLINE_COUNT_MAX = 5;
        private static final AtomicInteger ONLINE_COUNT = new AtomicInteger();
    
        private JedisCluster jedis;
    
        private JdbcTemplate jdbcDsp;
    
        private JedisPubSub sub;
    
    
        //项目启动的时候能进这个setApplicationContext中不是null,当访问这个url请求接口的时候就是null,只能把他定义成static
        private static ApplicationContext applicationContext;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            DebugDeviceTailDetailController.applicationContext = applicationContext;
    
        }
    
        /**
         * 连接建立成功调用的方法
         */
        @OnOpen
        public void onOpen(Session session, @PathParam("did") String did, @PathParam("id") Long id, @PathParam("token") String token) throws IOException {
            if (jdbcDsp == null) {
                jdbcDsp = applicationContext.getBean("jdbcDsp", JdbcTemplate.class);
            }
            if (jedis == null) {
                jedis = applicationContext.getBean("jedis", JedisCluster.class);
            }
           //shiro 存在reidis的中key固定前缀,源码有,下边是根据token从reids取出用户信息
            token = "shiro:session:" + token;
            byte[] sessionBytes = jedis.get(token.getBytes());
            SimpleSession sessions = null;
            if (sessionBytes != null) {
                sessions = (SimpleSession) SerializationUtils.deserialize(sessionBytes);
            }
            if (sessions == null) {
                session.close();
                return;
            }
            //org.apache.shiro.subject.support.DefaultSubjectContext_PRINCIPALS_SESSION_KEY
            Object attribute = sessions.getAttribute("org.apache.shiro.subject.support.DefaultSubjectContext_PRINCIPALS_SESSION_KEY");
            if (attribute == null) {
                session.close();
                return;
            }
            JSONObject jsonObject = JSONObject.parseObject(attribute.toString());
            if (jsonObject == null) {
                session.close();
                return;
            }
            User user = JSONObject.parseObject(JSON.toJSONString(jsonObject), User.class);
            log.info(JSON.toJSONString(user));
    
            if (dsUmUser == null) {
                session.close();
                throw new IllegalStateException("please login first");
            }
    
          //这里是给前端打一个最大限制信息
         /*   if (ONLINE_COUNT.incrementAndGet() > ONLINE_COUNT_MAX) {
                try {
                    session.getBasicRemote().sendText("max connection limited");
                } catch (IOException e) {
                    log.debug("DeviceDebug tail error {}", e.toString());
                }
            }*/
    
            sub = new JedisPubSub() {
                @Override
                public void onMessage(String channel, String message) {
                    log.info("Received message: {}, on channel: {}", message, channel);
                    try {
                        session.getBasicRemote().sendText(message);
                    } catch (IOException e) {
                        log.error("send error", e);
                    }
                }
            };
            //当colse后这个线程会停止
            new Thread(() -> {
                String key = String.format("aa:%d:bb:%s", id, did);
                //订阅
                jedis.subscribe(sub, key);
            }, "SubscribeDetailThread").start();
    
        }
    
        @OnClose
        public void onClose() {
            if (sub != null) {
                sub.unsubscribe();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Springboot+WebSocket+Reids SUBSC

          本文链接:https://www.haomeiwen.com/subject/srvrydtx.html