美文网首页
Websocket通过Redis实现Session共享

Websocket通过Redis实现Session共享

作者: 普普通通的小斌 | 来源:发表于2019-12-23 18:26 被阅读0次
    架构图
    file
    测试代码搭建

    pom依赖

         <!-- redis -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
    
            <!-- webSocket -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-websocket</artifactId>
            </dependency>
    

    开启Websocket配置

    @Configuration
    @EnableWebSocket
    public class WebSocketConfig {
    
        @Bean
        public ServerEndpointExporter serverEndpointExporter(){
            return new ServerEndpointExporter();
        }
    }
    

    WebsocketPool类

    package com.chainter.rmblc.messaging.net;
    
    import lombok.extern.java.Log;
    import org.springframework.stereotype.Component;
    import org.springframework.util.ObjectUtils;
    
    import javax.websocket.Session;
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Map;
    import java.util.Optional;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author ningbin 2019/12/18 13:45
     * @Description:
     */
    @Log
    public class WebSocketPool {
        // 当前在线人数
        private static final AtomicInteger atomicNumber = new AtomicInteger(0);
        // 当前Websocket session连接
        private static Map<String, Session> onlineSession = new ConcurrentHashMap<>();
    
    
        public static Integer addAtomicNumber(){
            return atomicNumber.incrementAndGet();
        }
        public static Integer decrementNumber(){
            return atomicNumber.decrementAndGet();
        }
        public static Integer getNumber(){
            return atomicNumber.get();
        }
    
        public static void createOnlineSession(String userId,Session session){
            onlineSession.put(userId,session);
        }
        public static Map<String,Session> getOnlineSession(){
            return onlineSession;
        }
        public static Session getSesssionByUserId(String userId){
            return Optional.ofNullable(onlineSession.get(userId)).orElse(null);
        }
        public static void removeSession(String userId){
            Session session = onlineSession.get(userId);
            if(ObjectUtils.isEmpty(session)){
                return;
            }
            try {
                session.close();
                onlineSession.remove(userId);
            } catch (IOException e) {
                log.warning("关闭连接出现错误");
            }
        }
    
        public static void send(){
            onlineSession.values().stream().forEach(session -> {
                try {
                    Date date = new Date();
                    long time = date.getTime();
                    String dateString = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
                    session.getBasicRemote().sendText("时间:"+dateString+",毫秒:"+time);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    
    }
    
    

    ws连接类

    @Component
    @ServerEndpoint(value = "/WebSocketTest/{userId}")
    @Log
    public class WebsocketTest {
    
    
        @OnOpen
        public void onOpen(@PathParam("userId") String userId, Session session) {
    
            Integer number = WebSocketPool.addAtomicNumber();
            WebSocketPool.createOnlineSession(session.getId(),session);
    //        MessageTaskHandle.createTaskHandle(session,"服务器主动推送信息");
            log.info("建立连接,当前人数:"+number);
        }
    
        @OnClose
        public void onClose(@PathParam("userId") String userId, Session session){
            WebSocketPool.removeSession(session.getId());
            Integer number = WebSocketPool.decrementNumber();
            log.info("用户"+userId+"关闭连接,当前人数:"+number);
        }
    
        @OnError
        public void onError(@PathParam("userId") String userId, Session session,Throwable throwable){
            WebSocketPool.removeSession(session.getId());
            log.warning("WebSocket连接出现异常");
        }
    
    
    
    }
    

    添加redis监听类

    package com.chainter.rmblc.messaging.config;
    
    import com.chainter.rmblc.messaging.handle.RedisListenerHandle;
    import lombok.extern.java.Log;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
    import org.springframework.stereotype.Component;
    
    /**
     * @author ningbin 2019/12/19 18:20
     * @Description:
     */
    @Component
    @Log
    public class RedisListenerBean {
             
             // application.yml中配置allWSName
        @Value("${sub.channel.allWSName}")
        private String allWSName;
    
        /**
         * redis消息监听器容器
         * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
         * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
         * @param connectionFactory
         * @return
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
    
            // 监听msgToAll
            container.addMessageListener(listenerAdapter, new PatternTopic(allWSName));
            log.info("Subscribed Redis channel: " + allWSName);
            return container;
        }
    
        @Bean
        public MessageListenerAdapter messageListenerAdapter(RedisListenerHandle redisListenerHandle){
            return new MessageListenerAdapter(redisListenerHandle,"receiveMessage");
        }
    }
    
    

    创建RedisListenerHandle监听消息处理类

    package com.chainter.rmblc.messaging.handle;
    
    import org.springframework.stereotype.Component;
    
    /**
     * @author ningbin 2019/12/20 10:07
     * @Description:
     */
    @Component
    public class RedisListenerHandle {
    
        public void receiveMessage(String message){
            System.out.println("接收消息:"+message);
        }
    
    }
    
    
    个人联系方式QQ:944484545,欢迎大家的加入,分享学习是一件开心事

    相关文章

      网友评论

          本文标题:Websocket通过Redis实现Session共享

          本文链接:https://www.haomeiwen.com/subject/zyktoctx.html