美文网首页Java大数据
Spring+websocket+quartz实现消息定时推送

Spring+websocket+quartz实现消息定时推送

作者: Java弟中弟 | 来源:发表于2021-11-02 14:23 被阅读0次

    简单的说,websocket是真正实现了全双工通信的服务器向客户端推的互联网技术。

    全双工与单工、半双工的区别?

    • 全双工:简单地说,就是可以同时进行信号的双向传输(A->B且B->A),是瞬时同步的。
    • 单工、半双工:一个时间段内只有一个动作发生。

    推送和拉取的区别?

    • 推:由服务器主动发消息给客户端,就像广播。优势在于,信息的主动性和及时性。
    • 拉:由客户端主动请求所需要的数据。

    实现消息通信的几种方式?

    • 传统的http协议实现方式:。
    • 传统的socket技术。
    • websocket协议实现方式。

    接下来我们主要讲第三种,使用websocket协议,来实现服务端定时向客户端推送消息。

    • 开发环境:jdk1.8、tomcat7
    • 后台:springmvc、websocket、quartz
    • 前台:html5中新增的API
    • 开发工具:IDEA、maven

    实现步骤

    一、环境搭建

    (1)导入相关约束:

    在pom文件中加入需要的约束,spring相关的约束,请各位自己导入,这里我就不贴出来了。

    <!-- 定时器的包 -->
        <dependency>
          <groupId>org.quartz-scheduler</groupId>
          <artifactId>quartz</artifactId>
          <version>2.3.0</version>
        </dependency>
    <!-- 
     spring-support.jar 这个jar 文件包含支持UI模版(Velocity,FreeMarker,JasperReports),邮件服务,脚本服务(JRuby),缓存Cache(EHCache),任务计划Scheduling(uartz)方面的类。 
     外部依赖spring-context, (spring-jdbc, Velocity, FreeMarker, JasperReports, BSH, Groovy, JRuby, Quartz, EHCache) 
     -->
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-context-support</artifactId>
          <version>5.1.1.RELEASE</version>
        </dependency>
    <!-- websocket的包 -->
        <dependency>
          <groupId>javax.websocket</groupId>
          <artifactId>javax.websocket-api</artifactId>
          <version>1.1</version>
          <scope>provided</scope>
        </dependency>
    
    <!--
    ps:如果使用原始的配置方式,需要导入spring-websocket、spring-messaging的包,我们这里就通过注解实现
    -->
    
    

    (2)配置xml文件

    web.xml中就配置前端控制器,大家自行配置。然后,加载springmvc的配置文件。

    springmvc.xml文件中

        <!-- 自动将控制器加载到bean -->
        <context:component-scan base-package="com.socket.web" />
     <!-- 配置视图解析器 -->
        <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
            <property name="prefix" value="/WEB-INF/views/"/>
            <property name="suffix" value=".jsp"/>
            <property name="contentType" value="text/html; charset=utf-8"/>
        </bean>
        <!-- 自动注册 DefaultAnnotationHandlerMapping 与 AnnotationMethodHandlerAdapter 两个 bean, 解决了 @Controller 注解的使用前提配置 -->
        <mvc:annotation-driven/>
    
        <!-- 使用fastjson 解析json   因为本人的项目中用到了fastjson,所以这段配置大家可以忽略。 -->
        <mvc:annotation-driven>
            <mvc:message-converters register-defaults="true">
                <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter">
                    <property name="supportedMediaTypes">
                        <list>
                            <value>text/html;charset=UTF-8</value>
                            <value>application/json</value>
                        </list>
                    </property>
                    <property name="features">
                        <list>
                            <value>WriteMapNullValue</value>
                            <value>QuoteFieldNames</value>
                        </list>
                    </property>
                </bean>
            </mvc:message-converters>
        </mvc:annotation-driven>
    
    

    到此,环境就基本搭建完成了。

    二、完成后台的功能

    这里我就直接贴出代码了,上面有相关的注释。

    首先,完成websocket的实现类。

    package com.socket.web.socket;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    import javax.websocket.*;
    import javax.websocket.server.PathParam;
    import javax.websocket.server.ServerEndpoint;
    import java.io.IOException;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * @Author: 清风一阵吹我心
     * @ProjectName: socket
     * @Package: com.socket.web.socket
     * @ClassName: WebSocketServer
     * @Description:
     * @Version: 1.0
     **/
    //ServerEndpoint它的功能主要是将目前的类定义成一个websocket服务器端。注解的值将被用于监听用户连接的终端访问URL地址。
    @ServerEndpoint(value = "/socket/{ip}")
    @Component
    public class WebSocketServer {
    
        //使用slf4j打日志
        private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);
    
        //用来记录当前在线连接数
        private static int onLineCount = 0;
    
        //用来存放每个客户端对应的WebSocketServer对象
        private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<String, WebSocketServer>();
    
        //某个客户端的连接会话,需要通过它来给客户端发送数据
        private Session session;
    
        //客户端的ip地址
        private String ip;
    
        /**
         * 连接建立成功,调用的方法,与前台页面的onOpen相对应
         * @param ip ip地址
         * @param session 会话
         */
        @OnOpen
        public void onOpen(@PathParam("ip")String ip,Session session){
            //根据业务,自定义逻辑实现
            this.session = session;
            this.ip = ip;
            webSocketMap.put(ip,this);  //将当前对象放入map中
            addOnLineCount();  //在线人数加一
            LOGGER.info("有新的连接加入,ip:{}!当前在线人数:{}",ip,getOnLineCount());
        }
    
        /**
         * 连接关闭调用的方法,与前台页面的onClose相对应
         * @param ip
         */
        @OnClose
        public void onClose(@PathParam("ip")String ip){
            webSocketMap.remove(ip);  //根据ip(key)移除WebSocketServer对象
            subOnLineCount();
            LOGGER.info("WebSocket关闭,ip:{},当前在线人数:{}",ip,getOnLineCount());
        }
    
        /**
         * 当服务器接收到客户端发送的消息时所调用的方法,与前台页面的onMessage相对应
         * @param message
         * @param session
         */
        @OnMessage
        public void onMessage(String message,Session session){
            //根据业务,自定义逻辑实现
            LOGGER.info("收到客户端的消息:{}",message);
        }
    
        /**
         * 发生错误时调用,与前台页面的onError相对应
         * @param session
         * @param error
         */
        @OnError
        public void onError(Session session,Throwable error){
            LOGGER.error("WebSocket发生错误");
            error.printStackTrace();
        }
    
        /**
         * 给当前用户发送消息
         * @param message
         */
        public void sendMessage(String message){
            try{
                //getBasicRemote()是同步发送消息,这里我就用这个了,推荐大家使用getAsyncRemote()异步
                this.session.getBasicRemote().sendText(message);
            }catch (IOException e){
                e.printStackTrace();
                LOGGER.info("发送数据错误:,ip:{},message:{}",ip,message);
            }
        }
    
        /**
         * 给所有用户发消息
         * @param message
         */
        public static void sendMessageAll(final String message){
            //使用entrySet而不是用keySet的原因是,entrySet体现了map的映射关系,遍历获取数据更快。
            Set<Map.Entry<String, WebSocketServer>> entries = webSocketMap.entrySet();
            for (Map.Entry<String, WebSocketServer> entry : entries) {
                final WebSocketServer webSocketServer = entry.getValue();
                //这里使用线程来控制消息的发送,这样效率更高。
                new Thread(new Runnable() {
                    public void run() {
                        webSocketServer.sendMessage(message);
                    }
                }).start();
            }
        }
    
        /**
         * 获取当前的连接数
         * @return
         */
        public static synchronized int getOnLineCount(){
            return WebSocketServer.onLineCount;
        }
    
        /**
         * 有新的用户连接时,连接数自加1
         */
        public static synchronized void addOnLineCount(){
            WebSocketServer.onLineCount++;
        }
    
        /**
         * 断开连接时,连接数自减1
         */
        public static synchronized void subOnLineCount(){
            WebSocketServer.onLineCount--;
        }
    
        public Session getSession(){
            return session;
        }
        public void setSession(Session session){
            this.session = session;
        }
    
        public static ConcurrentHashMap<String, WebSocketServer> getWebSocketMap() {
            return webSocketMap;
        }
    
        public static void setWebSocketMap(ConcurrentHashMap<String, WebSocketServer> webSocketMap) {
            WebSocketServer.webSocketMap = webSocketMap;
        }
    }
    
    

    然后写我们的定时器(quartz),这里我就不详解定时器了。大家可以自行去了解。

    这里我使用的是xml注解的方式,创建一个job类,此类不需要继承任何类和实现任何接口。

    package com.socket.web.quartz;
    
    import com.socket.web.socket.WebSocketServer;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * @Author: 清风一阵吹我心
     * @ProjectName: socket
     * @Package: com.socket.web.quartz
     * @ClassName: TestJob
     * @Description:
     * @Version: 1.0
     **/
    public class TestJob {
    
        public void task(){
            //获取WebSocketServer对象的映射。
            ConcurrentHashMap<String, WebSocketServer> map = WebSocketServer.getWebSocketMap();
            if (map.size() != 0){
                for (Map.Entry<String, WebSocketServer> entry : map.entrySet()) {
                    WebSocketServer webSocketServer = entry.getValue();
                    try {
                        //向客户端推送消息
                        webSocketServer.getSession().getBasicRemote().sendText("每隔两秒,向客户端推送一次数据");
                    }catch (IOException e){
                        e.printStackTrace();
                    }
                }
            }else {
                System.out.println("WebSocket未连接");
            }
        }
    }
    
    

    定时器的实现类就完成了,我们还需要在springmvc.xml中进行配置

    springmvc.xml配置:

    <!-- 要执行的任务类 -->
        <bean id="testJob" class="com.socket.web.quartz.TestJob"></bean>
    
        <!-- 将需要执行的定时任务注入job中 -->
        <bean id="jobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
            <property name="targetObject" ref="testJob"/>
            <!-- 任务类中需要执行的方法 -->
            <property name="targetMethod" value="task"></property>
            <!-- 上一次未执行完成的,要等待有再执行。 -->
            <property name="concurrent" value="false" />
        </bean>
    
        <!-- 基本的定时器,会绑定具体的任务。 -->
        <bean id="trigger" class="org.springframework.scheduling.quartz.SimpleTriggerFactoryBean">
            <property name="jobDetail" ref="jobDetail"/>
            <property name="startDelay" value="3000"/>
            <property name="repeatInterval" value="2000"/>
        </bean>
    
        <bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
            <property name="triggers">
                <list>
                    <ref bean="trigger"/>
                </list>
            </property>
        </bean>
    
    

    接下来是controller层的代码,就一个登录的功能。

    package com.socket.web.controller;
    
    import com.socket.domain.User;
    import com.sun.org.apache.bcel.internal.generic.RETURN;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpSession;
    import java.util.UUID;
    
    /**
     * @Author: 清风一阵吹我心
     * @ProjectName: socket
     * @Package: com.socket.web
     * @ClassName: ChatController
     * @Description:
     * @CreateDate: 2018/11/9 11:04
     * @Version: 1.0
     **/
    @RequestMapping("socket")
    @Controller
    public class ChatController {
    
        /**
         * 跳转到登录页面
         * @return
         */
        @RequestMapping(value = "/login",method = RequestMethod.GET)
        public String goLogin(){
            return "login";
        }
    
        /**
         * 跳转到聊天页面
         * @param request
         * @return
         */
        @RequestMapping(value = "/home",method = RequestMethod.GET)
        public String goMain(HttpServletRequest request){
            HttpSession session = request.getSession();
            if (null == session.getAttribute("USER_SESSION")){
                return "login";
            }
            return "home";
        }
    
        @RequestMapping(value = "/login",method = RequestMethod.POST)
        public String login(User user, HttpServletRequest request){
            HttpSession session = request.getSession();
            //将用户放入session
            session.setAttribute("USER_SESSION",user);
            return "redirect:home";
        }
    
    }
    
    

    以上就是登录的代码了,基本上就是伪代码,只要输入用户名就可以了,后面的逻辑,大家可以根据自己的业务来实现。

    最后就是前台页面的设计了,登录,login.jsp

    <%@ page contentType="text/html;charset=UTF-8" language="java" %>
    <c:set var="path" value="${pageContext.request.contextPath}"/>
    <html>
    <head>
        <title>登录</title>
    </head>
    <body>
    <form action="${path}/socket/login" method="post">
        登录名:<input type="text" name="username"/>
        <input type="submit" value="登录"/>
    </form>
    </body>
    </html>
    
    

    消息接收页面,home.jsp

    <%@ page contentType="text/html;charset=UTF-8" language="java" %>
    <html>
    <head>
        <title>聊天</title>
    
        <script type="text/javascript">
            //判断当前浏览器是否支持WebSocket
            var webSocket = null;
            if ('WebSocket' in window) {
                webSocket = new WebSocket("ws://localhost:9001/socket/127.0.0.1");
            }
            else if ('MozWebSocket' in window) {
                webSocket = new MozWebSocket("ws://localhost:9001/socket/127.0.0.1");
            }
            else {
                alert('Not support webSocket');
            }
    
            //打开socket,握手
            webSocket.onopen = function (event) {
                alert("websocket已经连接");
            }
            //接收推送的消息
            webSocket.onmessage = function (event) {
                console.info(event);
                alert(event.data);
            }
            //错误时
            webSocket.onerror = function (event) {
                console.info("发生错误");
                alert("websocket发生错误" + event);
            }
    
            //关闭连接
            webSocket.onclose = function () {
                console.info("关闭连接");
            }
    
            //监听窗口关闭
            window.onbeforeunload = function (event) {
                webSocket.close();
            }
        </script>
    </head>
    <body>
    
    </body>
    </html>
    
    

    基本上,数据推送的功能就完成了,下面附上效果图。

    启动tomcat。后台定时器两秒刷新一次,判断是否有websocket连接。


    登录页面:

    数据推送页面:

    服务器定时向客户端推送数据的功能就完成了,有不明白的可以给博主留言,如果有什么错误,也希望各位朋友指出,谢谢大家。

    本文源码:

    https://github.com/Qingfengchuiwoxin/websocket

    相关文章

      网友评论

        本文标题:Spring+websocket+quartz实现消息定时推送

        本文链接:https://www.haomeiwen.com/subject/mknxzltx.html