美文网首页互联网科技Java 杂谈
实战Spring4+ActiveMQ整合实现消息队列(生产者+消

实战Spring4+ActiveMQ整合实现消息队列(生产者+消

作者: 风平浪静如码 | 来源:发表于2019-07-15 10:32 被阅读13次

    引言:

    最近公司做了一个以信息安全为主的项目,其中有一个业务需求就是,项目定时监控操作用户的行为,对于一些违规操作严重的行为,以发送邮件(FoxMail)的形式进行邮件告警,可能是多人,也可能是一个人,第一次是以单人的形式,,直接在业务层需要告警的地方发送邮件即可,可是后边需求变更了,对于某些告警邮件可能会发送多人,这其中可能就会有阻塞发邮件的可能,直到把所有邮件发送完毕后再继续做下边的业务,领导说这样会影响用户体验,发邮件的时候用户一直处于等待状态,不能干别的事情。最后研究说用消息队列,当有需要发送邮件告警的时候,就向队列中添加一个标识消息,ActiveMQ通过监听器的形式,实时监听队列里边的小时,收到消息后,判断是不是需要发送告警的标识,是的话就自行就行发送邮件!这是就研究的消息队列ActiveMQ,下边就是具体内容:

    一、ActiveMQ

    1.1). ActiveMQ

    ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范。JMS是一组Java应用程序接口,它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信。

    1. 2). Java Message Service(JMS)

    JMS支持两种消息发送和接收模型。

    • 一种称为P2P(Ponit to Point)模型(点对点一对一),即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输称为可能,P2P模型在点对点的情况下进行消息传递时采用。
    • 另一种称为Pub/Sub(Publish/Subscribe,即发布-订阅)模型,发布-订阅模型定义了如何向一个内容节点发布和订阅消息,这个内容节点称为topic(主题)。主题可以认为是消息传递的中介,消息发布这将消息发布到某个主题,而消息订阅者则从主题订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布-订阅模型在消息的一对多广播时采用。
    1.3). JMS术语
    1. Provider/MessageProvider:生产者
    2. Consumer/MessageConsumer:消费者
    3. PTP:Point To Point,点对点通信消息模型
    4. Pub/Sub:Publish/Subscribe,发布订阅消息模型
    5. Queue:队列,目标类型之一,和PTP结合
    6. Topic:主题,目标类型之一,和Pub/Sub结合
    7. ConnectionFactory:连接工厂,JMS用它创建连接
    8. Connnection:JMS Client到JMS Provider的连接
    9. Destination:消息目的地,由Session创建
    10. Session:会话,由Connection创建,实质上就是发送、接受消息的一个线程,因此生产者、消费者都是 Session创建的
    1.4). ActiveMQ应用场景

    类似送快递,快递员(producer)将快递(Message)放到指定地点(destination)后,就可以离开了,拿快递的人(customer)在接收到通知后,到指定地点(destination)去取快递(Message)就可以了。当然,取快递时可能要进行身份验证,这就涉及到创建连接(connection)时,需要指定用户名和密码了。还有就是,实际生活中,当快递员把快递放好之后,照理说应该通知客户去哪里取快递,而ActiveMq帮我们做好了一切,通知的工作Activemq会帮我们实现,而无需我们亲自编码通知消费者,生产者只需要将Message放到Mq中即可,通知消费者的工作,mq会帮我们处理

    用途就是用来处理消息,也就是处理JMS在大型电子商务类网站,如京东、淘宝、去哪儿等网站有着深入的应用,队列的主要作用是消除高并发访问高峰,加快网站的响应速度。

    在不使用消息队列的情况下,用户的请求数据直接写入数据库,高发的情况下,会对数据库造成巨大的压力,同时也使得系统响应延迟加剧,但使用队列后,用户的请求发给队列后立即返回。

    1.5). ActiveMQ下载
    1.6). 启动

    /apache-activemq-5.15.3/bin/win64/目录下双击activemq.bat文件,在浏览器中输入http://localhost:8161/admin/, 用户名和密码输入admin即可

    二、Srping+ActiveMQ应用实例

    2,1). 项目结构
    ** 2,2). 导入maven依赖,pom.xml文件**
      1 <?xml version="1.0" encoding="UTF-8"?>
      2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      3   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      4   <modelVersion>4.0.0</modelVersion>
      5 
      6   <groupId>www.cnblogs.com.hongmoshu</groupId>
      7   <artifactId>test_actmq</artifactId>
      8   <version>0.0.1-SNAPSHOT</version>
      9   <packaging>war</packaging>
     10   <name>test_actmq Maven Webapp</name>
     11   <url>http://www.example.com</url>
     12   
     13    <!-- 版本管理 -->
     14   <properties>
     15     <springframework>4.1.8.RELEASE</springframework>
     16   </properties>
     17  
     18 
     19    <dependencies>
     20    
     21      <!-- junit单元测试 -->
     22     <dependency>
     23       <groupId>junit</groupId>
     24       <artifactId>junit</artifactId>
     25       <version>4.11</version>
     26       <scope>test</scope>
     27     </dependency>
     28     
     29     <!-- JSP相关 -->
     30   <dependency>
     31     <groupId>jstl</groupId>
     32     <artifactId>jstl</artifactId>
     33     <version>1.2</version>
     34   </dependency>
     35   <dependency>
     36     <groupId>javax.servlet</groupId>
     37     <artifactId>servlet-api</artifactId>
     38     <scope>provided</scope>
     39     <version>2.5</version>
     40   </dependency>
     41 
     42      <!-- spring -->
     43     <dependency>
     44       <groupId>org.springframework</groupId>
     45       <artifactId>spring-core</artifactId>
     46       <version>${springframework}</version>
     47     </dependency>
     48     <dependency>
     49       <groupId>org.springframework</groupId>
     50       <artifactId>spring-context</artifactId>
     51       <version>${springframework}</version>
     52     </dependency>
     53     <dependency>
     54       <groupId>org.springframework</groupId>
     55       <artifactId>spring-tx</artifactId>
     56       <version>${springframework}</version>
     57     </dependency>
     58     <dependency>
     59       <groupId>org.springframework</groupId>
     60       <artifactId>spring-webmvc</artifactId>
     61       <version>${springframework}</version>
     62     </dependency>
     63     <dependency>
     64       <groupId>org.springframework</groupId>
     65       <artifactId>spring-jms</artifactId>
     66       <version>${springframework}</version>
     67     </dependency>
     68     
     69     <!-- xbean 如<amq:connectionFactory /> -->
     70     <dependency>
     71       <groupId>org.apache.xbean</groupId>
     72       <artifactId>xbean-spring</artifactId>
     73       <version>3.16</version>
     74     </dependency>
     75     
     76     <!-- activemq -->
     77     <dependency>
     78       <groupId>org.apache.activemq</groupId>
     79       <artifactId>activemq-core</artifactId>
     80       <version>5.7.0</version>
     81     </dependency>
     82     <dependency>
     83       <groupId>org.apache.activemq</groupId>
     84       <artifactId>activemq-pool</artifactId>
     85       <version>5.12.1</version>
     86     </dependency>
     87     
     88     <!-- gson -->
     89     <dependency>
     90       <groupId>com.google.code.gson</groupId>
     91       <artifactId>gson</artifactId>
     92       <version>1.7.1</version>
     93     </dependency>
     94     
     95       <!-- JSON -->
     96     <dependency>
     97         <groupId>net.sf.json-lib</groupId>
     98         <artifactId>json-lib</artifactId>
     99         <version>2.4</version>
    100         <classifier>jdk15</classifier>
    101     </dependency>
    102     
    103   </dependencies>
    104 
    105   <build>
    106     <finalName>test_actmq</finalName>
    107   
    108   </build>
    109 </project>
    
    2,3). ActiveMQ的配置文件ActiveMQ.xml
     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <beans xmlns="http://www.springframework.org/schema/beans"
     3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4        xmlns:amq="http://activemq.apache.org/schema/core"
     5        xmlns:jms="http://www.springframework.org/schema/jms"
     6        xmlns:context="http://www.springframework.org/schema/context"
     7        xmlns:mvc="http://www.springframework.org/schema/mvc"
     8        xsi:schemaLocation="
     9         http://www.springframework.org/schema/beans
    10         http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
    11         http://www.springframework.org/schema/context
    12         http://www.springframework.org/schema/context/spring-context-4.1.xsd
    13         http://www.springframework.org/schema/mvc
    14         http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
    15         http://www.springframework.org/schema/jms
    16         http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
    17         http://activemq.apache.org/schema/core
    18         http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd"
    19 >
    20 
    21     <context:component-scan base-package="com.svse.service" />
    22     <mvc:annotation-driven />
    23 
    24     <!-- jms.useAsyncSend=true 允许异步接收消息 -->
    25     <amq:connectionFactory id="amqConnectionFactory"
    26                            brokerURL="tcp://192.168.6.111:61616?jms.useAsyncSend=true"
    27                            userName="admin"
    28                            password="admin" />
    29 
    30     <!-- 配置JMS连接工 厂 -->
    31     <bean id="connectionFactory"
    32           class="org.springframework.jms.connection.CachingConnectionFactory">
    33         <constructor-arg ref="amqConnectionFactory" />
    34         <property name="sessionCacheSize" value="100" />
    35     </bean>
    36 
    37     <!-- 定义消息队列名称(Queue) -->
    38     <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    39         <!-- 设置消息队列的名字 -->
    40         <constructor-arg>
    41             <value>Jaycekon</value>
    42         </constructor-arg>
    43     </bean>
    44 
    45     <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
    46     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    47         <property name="connectionFactory" ref="connectionFactory" />
    48         <property name="defaultDestination" ref="demoQueueDestination" />
    49         <property name="receiveTimeout" value="10000" />
    50         <!-- true是topic,false是queue,默认是false,此处显示写出false -->
    51         <property name="pubSubDomain" value="false" />
    52         <!-- 消息转换器 -->    
    53         <property name="messageConverter" ref="userMessageConverter"/>  
    54     </bean>
    55 
    56      <!-- 类型转换器 -->    
    57     <bean id="userMessageConverter" class="com.svse.util.ObjectMessageConverter"/>    
    58 
    59 
    60     <!-- 配置消息队列监听者(Queue) -->
    61      <bean id="queueMessageListener" class="com.svse.util.QueueMessageListener" /> 
    62 
    63     <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
    64     <bean id="queueListenerContainer"
    65           class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    66         <property name="connectionFactory" ref="connectionFactory" />
    67         <property name="destination" ref="demoQueueDestination" />
    68         <property name="messageListener" ref="queueMessageListener" />
    69     </bean> 
    70     
    71 </beans>
    
    2,4). Spring的配置文件 spring-mvc.xml
     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <beans xmlns="http://www.springframework.org/schema/beans"
     3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4     xmlns:context="http://www.springframework.org/schema/context"
     5     xmlns:mvc="http://www.springframework.org/schema/mvc"
     6     xsi:schemaLocation="http://www.springframework.org/schema/beans 
     7         http://www.springframework.org/schema/beans/spring-beans.xsd
     8         http://www.springframework.org/schema/context
     9         http://www.springframework.org/schema/context/spring-context-4.1.xsd
    10         http://www.springframework.org/schema/mvc 
    11         http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd">
    12          
    13     <context:component-scan base-package="com.svse.controller" />
    14     <mvc:annotation-driven />
    15     <bean id="viewResolver" class="org.springframework.web.servlet.view.UrlBasedViewResolver">
    16         <property name="viewClass"
    17             value="org.springframework.web.servlet.view.JstlView" />
    18         <property name="prefix" value="/WEB-INF/views/" />
    19         <property name="suffix" value=".jsp" />
    20     </bean>
    21      
    22 </beans>
    
    2,5). web.xml
     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     3          xmlns="http://xmlns.jcp.org/xml/ns/javaee" 
     4          xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" 
     5          id="WebApp_ID" version="3.1">
     6   <display-name>mydemo</display-name>
     7   
     8   <welcome-file-list>
     9     <welcome-file>index.jsp</welcome-file>
    10   </welcome-file-list>
    11   
    12   <!-- 加载spring及active的配置文件,classpath为项目src下的路径 -->
    13   <context-param>
    14     <param-name>contextConfigLocation</param-name>
    15     <param-value>
    16           classpath:spring-mvc.xml;
    17           classpath:ActiveMQ.xml;
    18     </param-value>
    19     </context-param>
    20 
    21 
    22  <listener>
    23     <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    24   </listener>
    25 
    26   <servlet>
    27     <servlet-name>springMVC</servlet-name>
    28     <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    29     <init-param>
    30       <param-name>contextConfigLocation</param-name>
    31       <param-value>classpath:spring-mvc.xml</param-value>
    32     </init-param>
    33     <load-on-startup>1</load-on-startup>
    34   </servlet>
    35   <servlet-mapping>
    36     <servlet-name>springMVC</servlet-name>
    37     <url-pattern>/</url-pattern>
    38   </servlet-mapping>
    39 
    40   <!-- 处理编码格式 -->
    41   <filter>
    42     <filter-name>characterEncodingFilter</filter-name>
    43     <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
    44     <init-param>
    45       <param-name>encoding</param-name>
    46       <param-value>UTF-8</param-value>
    47     </init-param>
    48     <init-param>
    49       <param-name>forceEncoding</param-name>
    50       <param-value>true</param-value>
    51     </init-param>
    52   </filter>
    53   <filter-mapping>
    54     <filter-name>characterEncodingFilter</filter-name>
    55     <url-pattern>/*</url-pattern>
    56   </filter-mapping>
    57   
    58 </web-app>
    
    2,6). 实体类Users对象
     1 package com.svse.entity;
     2 import java.io.Serializable;
     3 
     4 public class Users implements Serializable{
     5 
     6     private String userId;
     7     private String userName;
     8     private String sex;
     9     private String age;
    10     private String type;
    11     
    12     
    13     public Users() {
    14         super();
    15     }
    16     public Users(String userId, String userName, String sex, String age,
    17             String type) {
    18         super();
    19         this.userId = userId;
    20         this.userName = userName;
    21         this.sex = sex;
    22         this.age = age;
    23         this.type = type;
    24     }
    25     public String getUserId() {
    26         return userId;
    27     }
    28     public void setUserId(String userId) {
    29         this.userId = userId;
    30     }
    31     public String getUserName() {
    32         return userName;
    33     }
    34     public void setUserName(String userName) {
    35         this.userName = userName;
    36     }
    37     public String getSex() {
    38         return sex;
    39     }
    40     public void setSex(String sex) {
    41         this.sex = sex;
    42     }
    43     public String getAge() {
    44         return age;
    45     }
    46     public void setAge(String age) {
    47         this.age = age;
    48     }
    49     public String getType() {
    50         return type;
    51     }
    52     public void setType(String type) {
    53         this.type = type;
    54     }
    55     @Override
    56     public String toString() {
    57         return "Users [userId=" + userId + ", userName=" + userName + ", sex="
    58                 + sex + ", age=" + age + ", type=" + type + "]";
    59     }
    60     
    61     
    62 }
    
    2,7). 核心代码(生产者ProducerService)
     1 package com.svse.service;
     2 
     3 import javax.annotation.Resource;
     4 import javax.jms.Destination;
     5 import javax.jms.JMSException;
     6 import javax.jms.Message;
     7 import javax.jms.Session;
     8 
     9 import org.springframework.jms.core.JmsTemplate;
    10 import org.springframework.jms.core.MessageCreator;
    11 import org.springframework.stereotype.Service;
    12 
    13 import com.svse.entity.Users;
    14 
    15 @Service
    16 public class ProducerService {
    17 
    18     @Resource(name="jmsTemplate")
    19     private JmsTemplate jmsTemplate;
    20     
    21     
    22     /**
    23      * 向指定队列发送消息 (发送文本消息)
    24      */
    25     public void sendMessage(Destination destination,final String msg){
    26         
    27         jmsTemplate.setDeliveryPersistent(true);
    28         
    29         System.out.println(Thread.currentThread().getName()+" 向队列"+destination.toString()+"发送消息---------------------->"+msg);
    30         jmsTemplate.send(destination, new MessageCreator() {
    31             public Message createMessage(Session session) throws JMSException {
    32                 return session.createTextMessage(msg);
    33             }
    34         });
    35     }
    36     
    37     /**
    38      * 向指定队列发送消息以对象的方式 (发送对象消息)
    39      */
    40     public void sendMessageNew(Destination destination,Users user){
    41         System.out.println(Thread.currentThread().getName()+" 向队列"+destination.toString()+"发送消息---------------------->"+user);
    42         jmsTemplate.convertAndSend(user);
    43     }
    44 
    45     /**
    46      * 向默认队列发送消息
    47      */
    48     public void sendMessage(final String msg){
    49         String destination = jmsTemplate.getDefaultDestinationName();
    50         System.out.println(Thread.currentThread().getName()+" 向队列"+destination+"发送消息---------------------->"+msg);
    51         jmsTemplate.send(new MessageCreator() {
    52             public Message createMessage(Session session) throws JMSException {
    53                 return session.createTextMessage(msg);
    54             }
    55         });
    56     }
    57 }
    
    2,8). 核心代码(消费产者ConsumerService)
     1 package com.svse.service;
     2 
     3 import javax.annotation.Resource;
     4 import javax.jms.Destination;
     5 import javax.jms.JMSException;
     6 import javax.jms.ObjectMessage;
     7 import javax.jms.TextMessage;
     8 
     9 import net.sf.json.JSONObject;
    10 
    11 import org.springframework.jms.core.JmsTemplate;
    12 import org.springframework.stereotype.Service;
    13 
    14 import com.svse.entity.Users;
    15 
    16 @Service
    17 public class ConsumerService {
    18 
    19      @Resource(name="jmsTemplate")
    20      private JmsTemplate jmsTemplate;
    21      //接收文本消息
    22      public TextMessage receive(Destination destination){
    23             TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
    24             try{
    25                 JSONObject json=JSONObject.fromObject(textMessage.getText());
    26                 System.out.println("name:"+json.getString("userName"));
    27                 System.out.println("从队列" + destination.toString() + "收到了消息:\t"
    28                         + textMessage.getText());
    29             } catch (JMSException e) {
    30                 e.printStackTrace();
    31             }
    32             return textMessage;
    33         }
    34      //接收对象消息
    35      public ObjectMessage receiveNew(Destination destination){
    36              ObjectMessage objMsg=(ObjectMessage) jmsTemplate.receive(destination);
    38              try{
    39                 Users users= (Users) objMsg.getObject();
    44                 System.out.println("name:"+users.getUserName());
    47                 System.out.println("从队列" + destination.toString() + "收到了消息:\t"
    48                         + users);
    49             } catch (JMSException e) {
    50                 e.printStackTrace();
    51             }
    52             return objMsg;
    53         }
    54 }
    
    2,9). 核心代码(控制器ConsumerService)
      1 package com.svse.controller.mq;
      2 
      3 import java.io.IOException;
      4 import java.text.SimpleDateFormat;
      5 import java.util.Date;
      7 import javax.annotation.Resource;
      8 import javax.jms.DeliveryMode;
      9 import javax.jms.Destination;
     10 import javax.jms.JMSException;
     11 import javax.jms.ObjectMessage;
     12 import javax.jms.TextMessage;
     13 import javax.management.MBeanServerConnection;
     14 import javax.management.remote.JMXConnector;
     15 import javax.management.remote.JMXConnectorFactory;
     16 import javax.management.remote.JMXServiceURL;
     18 import org.springframework.stereotype.Controller;
     19 import org.springframework.web.bind.annotation.RequestMapping;
     20 import org.springframework.web.bind.annotation.RequestMethod;
     21 import org.springframework.web.bind.annotation.RequestParam;
     22 import org.springframework.web.servlet.ModelAndView;
     24 import com.google.gson.Gson;
     25 import com.svse.entity.Users;
     26 import com.svse.service.ConsumerService;
     27 import com.svse.service.ProducerService;
     28 
     29 @Controller
     30 public class DemoController {
     35     
     36      //队列名Jaycekon (ActiveMQ中设置的队列的名称)
     37     @Resource(name="demoQueueDestination")
     38     private Destination demoQueueDestination;
     39 
     40     //队列消息生产者
     41     @Resource(name="producerService")
     42     private ProducerService producer;
     43     
     44    //队列消息消费者
     45     @Resource(name="consumerService")
     46     private ConsumerService consumer;
     47     
     48     /*
     49      * 准备发消息
     50      */
     51     @RequestMapping(value="/producer",method=RequestMethod.GET)
     52     public ModelAndView producer(){
     53         System.out.println("------------go producer");
     54         
     55         Date now = new Date(); 
     56         SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     57         String time = dateFormat.format( now ); 
     58         System.out.println(time);
     59         
     60         ModelAndView mv = new ModelAndView();
     61         mv.addObject("time", time);
     62         mv.setViewName("producer");        
     63         return mv;
     64     }
     65     
     66     /*
     67      * 发消息
     68      */
     69     @RequestMapping(value="/onsend",method=RequestMethod.POST)
     70     public ModelAndView producer(@RequestParam("message") String message) {
     71         System.out.println("------------send to jms");
     72         ModelAndView mv = new ModelAndView();
     73         for(int i=0;i<5;i++){
     74             try {
     75                 Users users=new Users("10"+(i+1),"赵媛媛"+(i+1),"女","27","影视演员");
     76                 Gson gson=new Gson();
     77                 String sendMessage=gson.toJson(users);
     78                 System.out.println("发送的消息sendMessage:"+sendMessage.toString());
     79              // producer.sendMessage(demoQueueDestination,sendMessage.toString());//以文本的形式
     80               producer.sendMessageNew(demoQueueDestination, users);//以对象的方式
     81              
     82             } catch (Exception e) {
     83                 e.printStackTrace();
     84             }
     85         }
     86         mv.setViewName("index");
     87         return mv;
     88     }
     89     /*
     90      * 手动接收消息
     91      */
     92     @RequestMapping(value="/receive",method=RequestMethod.GET)
     93     public ModelAndView queue_receive() throws JMSException {
     94         System.out.println("------------receive message");
     95         ModelAndView mv = new ModelAndView();
     96         
     97       //TextMessage tm = consumer.receive(demoQueueDestination);//接收文本消息
     98         
     99         ObjectMessage objMsg=consumer.receiveNew(demoQueueDestination);//接收对象消息
    100         Users users= (Users) objMsg.getObject();
    101         //mv.addObject("textMessage", tm.getText());
    102         mv.addObject("textMessage", users.getUserId()+" || "+users.getUserName());
    103         mv.setViewName("receive");
    104         return mv;
    105     }
    106     
    107     /*
    108      * ActiveMQ Manager Test
    109      */
    110     @RequestMapping(value="/jms",method=RequestMethod.GET)
    111     public ModelAndView jmsManager() throws IOException {
    112         System.out.println("------------jms manager");
    113         ModelAndView mv = new ModelAndView();
    114         mv.setViewName("index");
    115         
    116         JMXServiceURL url = new JMXServiceURL("");
    117         JMXConnector connector = JMXConnectorFactory.connect(url);
    118         connector.connect();
    119         MBeanServerConnection connection = connector.getMBeanServerConnection();
    120         
    121         return mv;
    122     }
    123     
    124 }
    

    三、.对象转换器MessageConverter和消息监听器MessageListener

    在上边的ProducerService和ConsumerService中,不论是发送消息还是接收消息,都可以以文本TextMessage的方式和ObjectMessage的方式.如果是简单的文本消息可以以TextMessage,但是如果需要发送的内容比较多,结构比较复杂,这时候就建议用对象文本ObjectMessage的方式向队列queue中发送消息了.但是这时候就需要用到对象消息转换器MessageConverter.

    3,1). 消息转换器MessageageConverte

    MessageConverter的作用主要有两方面,一方面它可以把我们的非标准化Message对象转换成我们的目标Message对象,这主要是用在发送消息的时候;另一方面它又可以把我们的Message对象
    转换成对应的目标对象,这主要是用在接收消息的时候。

     1 package com.svse.util;
     2 
     3 import java.io.Serializable;
     4 
     5 import javax.jms.JMSException;
     6 import javax.jms.Message;
     7 import javax.jms.ObjectMessage;
     8 import javax.jms.Session;
     9 
    10 import org.springframework.jms.support.converter.MessageConversionException;
    11 import org.springframework.jms.support.converter.MessageConverter;
    12 
    13 /**
    14  *功能说明:通用的消息对象转换类
    15  *@author:zsq
    16  *create date:2019年7月12日 上午9:28:31
    17  *修改人   修改时间  修改描述
    18  *Copyright (c)2019北京智华天成科技有限公司-版权所有
    19  */
    20 public class ObjectMessageConverter implements MessageConverter {
    21 
    22     
    23     //把一个Java对象转换成对应的JMS Message (生产消息的时候)
    24     public Message toMessage(Object object, Session session)
    25             throws JMSException, MessageConversionException {
    26         
    27         return session.createObjectMessage((Serializable) object);  
    28     }
    29 
    30     //把一个JMS Message转换成对应的Java对象 (消费消息的时候)
    31     public Object fromMessage(Message message) throws JMSException,
    32             MessageConversionException {
    33         ObjectMessage objMessage = (ObjectMessage) message;  
    34         return objMessage.getObject();  
    35     }
    36 
    37 }
    

    注意:写了消息转化器之后还需要的ActiveMQ.xml中进行配置

    3,2). 消息监听器MessageageListe

    MessageageListe作用就是动态的自行监听消息队列的生产者发送的消息,不需要人工手动接收!

     1 package com.svse.util;
     2 import javax.jms.JMSException;
     3 import javax.jms.Message;
     4 import javax.jms.MessageListener;
     5 import javax.jms.ObjectMessage;
     6 import javax.jms.TextMessage;
     7 
     8 import com.svse.entity.Users;
     9 
    10 
    11 public class QueueMessageListener implements MessageListener {
    12 
    13    //添加了监听器,只要生产者发布了消息,监听器监听到有消息消费者就会自动消费(获取消息)
    14     public void onMessage(Message message) {
    15          //(第1种方式)没加转换器之前接收到的是文本消息
    16         //TextMessage tm = (TextMessage) message;
    17         
    18         //(第2种方式)加了转换器之后接收到的ObjectMessage对象消息
    19         ObjectMessage objMsg=(ObjectMessage) message;
    20          Users users;
    21         try {
    22             users = (Users) objMsg.getObject();
    23             //System.out.println("QueueMessageListener监听到了文本消息:\t" + tm.getText());
    24             System.out.println("QueueMessageListener监听到了文本消息:\t" + users);      
    25             //do something ...
    26         } catch (JMSException e1) {
    27             // TODO Auto-generated catch block
    28             e1.printStackTrace();
    29         }
    30     }
    31     
    32 }
    

    同样写好监听器后也是需在ActiveMQ.xml中进行配置注册的

    总结

    (1)注册JmsTemplate时,pubSubDomain这个属性的值要特别注意。默认值是false,也就是说默认只是支持queue模式,不支持topic模式。但是,如果将它改为true,则不支持queue模式。因此如果项目需要同时支持queue和topic模式,那么需要注册2个JmsTemplate,同时监听容器也需要注册2个

    (2)使用Queue时,生产者只要将Message发送到MQ服务器端,消费者就可以进行消费,而无需生产者程序一直运行;

    (3)消息是按照先入先出的顺序,一旦有消费者将Message消费,该Message就会从MQ服务器队列中删去;

    (4)有文章说,“生产者”<-->"消费者"是一对一的关系,其实并不准确,从应用中可以看出,一个生产者产生的消息,可以被多个消费者进行消费,只不过多个消费者在消费消息时是竞争的关系,先得到的先消费,一旦消费完成,该消息就会出队列,
    就不能被其他消费者再消费了,即“一次性消费”。就是我们熟悉的“点对点”通信了;

    相关文章

      网友评论

        本文标题:实战Spring4+ActiveMQ整合实现消息队列(生产者+消

        本文链接:https://www.haomeiwen.com/subject/weugkctx.html