spring websocket推送

作者: 勃列日涅夫 | 来源:发表于2018-07-03 14:29 被阅读18次

    spring websocket使用,在spring官网上已有相应的demo,这里主要描述推送到对应的浏览器客户端

    1. 基本的拦截配置处理
    @Component
    public class MessageHandshakeInterceptor implements HandshakeInterceptor {
    
        public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
                Map<String, Object> attributes) throws Exception {
            if (request instanceof ServletServerHttpRequest) {
                ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
                HttpSession session = servletRequest.getServletRequest().getSession(false);
                if (session != null) {
                    User user = (User) session.getAttribute("user");
                    Integer userId = (Integer) session.getAttribute("userId");
                    if(null !=user){
                        attributes.put("user", user);
                        attributes.put("userId", userId);
                    }
                }
            }
            return true;
        }
        public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
                Exception exception) {
    
        }
    }
    
    1. 配置服务端根据请求使用相应的handler去处理
    @Configuration
    @EnableWebMvc
    @EnableWebSocket
    public class NotifyWebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {
        @Autowired
        private NotifyHandlerNew noifyHandler;
        @Autowired
        private MessageHandshakeInterceptor notifyHandshakeInterceptor;
        @Bean
        public DefaultHandshakeHandler handshakeHandler() {
            return new DefaultHandshakeHandler();
        }
    //消息文本缓冲设置
        @Bean
        public ServletServerContainerFactoryBean createWebSocketContainer() {
            ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
            container.setMaxTextMessageBufferSize(8192);
            container.setMaxBinaryMessageBufferSize(5 * 1024 * 1024);
            return container;
        }
    
        //注册
        @Override
        public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
                registry.addHandler(this.noifyHandler, "pushMsg")
                .setAllowedOrigins("*")
                        .addInterceptors(notifyHandshakeInterceptor).setHandshakeHandler(handshakeHandler());
                registry.addHandler(this.noifyHandler, "/sockjs/pushMsg")
                .setAllowedOrigins("*")     .addInterceptors(notifyHandshakeInterceptor).setHandshakeHandler(handshakeHandler()).withSockJS();
        }
    }
    
    1. 对应的handler处理
    @Component
    public class NotifyHandlerNew extends TextWebSocketHandler {
        // 在线用户列表
        private static final Map<Integer, WebSocketSession> users = new ConcurrentHashMap<>();
        private static Logger logger = Logger.getLogger(NotifyHandlerNew.class);
        // 用户标识
        private static final String CLIENT_ID = "userId";
        @Resource
        private PushMsgDao pushMsgDao;
        @Resource
        private  EnvMapper envMapper;
        
        @Override
        public void afterConnectionEstablished(WebSocketSession session) throws Exception {
            Integer userId = getClientId(session);
            logger.info(userId+":成功建立连接");
            if (userId != null) {
                users.put(userId, session);
    //          session.sendMessage(new TextMessage("成功建立socket连接"));
            }
        }
    
    //浏览器发送请求,这里处理
        @Override
        public void handleTextMessage(WebSocketSession session, TextMessage message) {
            try {
                String receiver = message.getPayload();
                        Map<String, Object> result = new HashMap<>();
                        result.put("all", "hello");
                        TextMessage mes = new TextMessage(JSONObject.toJSONString(result));
                        session.sendMessage(mes);
                }
            } catch (IOException | SQLException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 发送信息给指定用户
         * 
         * @param clientId
         * @param message
         * @return
         */
        public boolean sendMessageToUser(Integer clientId, TextMessage message) {
            if (users.get(clientId) == null)
                return false;
            WebSocketSession session = users.get(clientId);
            if (!session.isOpen())
                return false;
            try {
                // 查询未读消息
                List<PushMsg> msgs = pushMsgDao.getPushMsg(new PushMsg(clientId));
                Map<String, List<PushMsg>> result = new HashMap<>();
                result.put("all", msgs);
                result.put("new", new ArrayList<>());
                TextMessage mes = new TextMessage(JSONObject.toJSONString(result));
                session.sendMessage(mes);
            } catch (IOException e) {
                e.printStackTrace();
                return false;
            } catch (SQLException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return true;
        }
    
        /**
         * 广播信息
         * 
         * @param message
         * @return
         */
        public boolean sendMessageToAllUsers(TextMessage message) {
            boolean allSendSuccess = true;
            Set<Integer> clientIds = users.keySet();
            WebSocketSession session = null;
            for (Integer clientId : clientIds) {
                try {
                    session = users.get(clientId);
                    if (session.isOpen()) {
                        session.sendMessage(message);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    allSendSuccess = false;
                }
            }
    
            return allSendSuccess;
        }
    
        @Override
        public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
            if (session.isOpen()) {
                session.close();
            }
            logger.error("连接出错");
            users.remove(getClientId(session));
        }
    
        @Override
        public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
            logger.warn("连接已关闭:" + status);
            if( null != getClientId(session))
                users.remove(getClientId(session));
        }
    
        @Override
        public boolean supportsPartialMessages() {
            return false;
        }
    
        /**
         * 获取用户标识
         * 
         * @param session
         * @return
         */
        private Integer getClientId(WebSocketSession session) {
            try {
                Integer clientId = (Integer) session.getAttributes().get(CLIENT_ID);
                return clientId;
            } catch (Exception e) {
                return null;
            }
        }
    }
    

    还有一种使用servlet3.0之后的,需要导入服务端支持的jar包

    <dependency>
                <groupId>org.apache.tomcat.embed</groupId>
                <artifactId>tomcat-embed-websocket</artifactId>
                <version>8.5.23</version>
            </dependency>
    

    使用来源网上的一个范例:

    @ServerEndpoint(value="/myHandler",configurator = SpringConfigurator.class)
    public class Progress
    {
        private Session session;  
        private static final Random random = new Random();  
        private Timer timer = null;  
    //    停止信息信息指令  
        private static final ByteBuffer stopbuffer  = ByteBuffer.wrap(new byte[]{1, 9, 2, 0, 1, 5, 1, 6});  
        
        
          
        /**  
         * 打开连接时执行  
         * @param session  
           */
        @OnOpen  
        public void start(Session session) {  
            this.session = session;  
            try {  
                System.out.println("open");  
                if (session.isOpen()) {  
    //                设置心跳发送信息。每2秒发送一次信息。  
                    timer = new Timer(true);  
                    timer.schedule(task, 1000, 2000);  
                }  
            } catch (Exception e) {  
                try {  
                    session.close();  
                } catch (IOException e1) {}  
            }  
        }  
      
        /**  
         * 接收信息时执行  
         * @param session  
         * @param msg 字符串信息  
         * @param last  
         */  
        @OnMessage  
        public void echoTextMessage(Session session, String msg, boolean last) {  
            try {  
                if (session.isOpen()) {  
                    System.out.println("string:" + msg);  
                    session.getBasicRemote().sendText(msg, last);  
                }  
            } catch (IOException e) {  
                try {  
                    session.close();  
                } catch (IOException e1) {  
    //                 Ignore  
                }  
            }  
        }  
      
        /**  
         * 接收信息时执行  
         * @param session  
         * @param bb 二进制数组  
         * @param last  
         */  
        @OnMessage  
        public void echoBinaryMessage(Session session, ByteBuffer bb, boolean last) {  
            try {  
                if (session.isOpen()) {  
    //                如果是停止心跳指令,则停止心跳信息  
                    if (bb.compareTo(stopbuffer) == 0) {  
                        if (timer != null) {  
                            timer.cancel();  
                        }  
                    } else {  
                        session.getBasicRemote().sendBinary(bb, last);  
                    }  
                }  
            } catch (IOException e) {  
                try {  
                    session.close();  
                } catch (IOException e1) {  
    //                 Ignore  
                }  
            }  
        }  
          
        /**  
         * 接收pong指令时执行。  
         *  
         * @param pm    Ignored.  
         */  
        @OnMessage  
        public void echoPongMessage(PongMessage pm) {  
    //         无处理  
        } 
        
          
        @OnClose  
        public void end(Session session) {  
            try {  
                System.out.println("close");  
                if (timer != null) {  
                    timer.cancel();  
                }  
            } catch(Exception e) {  
            }  
        }  
          
        /*  
         * 发送心跳信息  
         */  
        public void sendLong(long param) {  
            try {  
                if (session.isOpen()) {  
                    this.session.getBasicRemote().sendText(String.valueOf(param));  
                }  
            } catch (IOException e) {  
                try {  
                    this.session.close();  
                } catch (IOException e1) {}  
            }  
        }  
          
        /**  
         * 心跳任务。发送随机数。  
         */  
        TimerTask task = new TimerTask() {  
            public void run() {     
                long param = random.nextInt(100);  
                sendLong(param);  
            }     
        };  
        
    }
    
    • 经过上面三个步骤基本上浏览器和服务端就建立连接,如果使用消息中间件,可以参看
      springboot 整合rabbitmq

    相关文章

      网友评论

        本文标题:spring websocket推送

        本文链接:https://www.haomeiwen.com/subject/efnduftx.html