美文网首页
ActiveMQ SpringJMS

ActiveMQ SpringJMS

作者: dwwl | 来源:发表于2019-04-20 09:59 被阅读0次

    消息中间件的应用场景:

    解耦,把耗时的操作(不需要返回结果的操作)移除运行主线程,提高响应速度

    场景:商品审核时需要的服务

    商家商品服务:需要返回结果,实时性高
    广告内容服务:需要返回结果,实时性高
    
    搜索服务:把SKU数据写入到solr,不需要返回结果,没必要必须在主线程完成,实时性低,用到消息中间件
    页面静态化服务:根据SKU数据生成静态页面,不需要返回结果,没必要必须在主线程完成,实时性低,用到消息中间件
    

    JMS消息传递类型:

    点对点(PTP):生产者 消费者一一对应,即使消费者有多个,每一个生产者产生的消息只能被一个消费者消费,发生竞争
    特点:如果没有消费者,消息对会存在destination
    应用场景实例:
    商品审核通过,把sku数据放到solr上,在分布式服务器上时,多个service即多个主机,都引用同一个solr,这时,用queue即可
    
    发送/订阅(pub/sub):有多个消费者链接到消息队列时,生产者放在队列的消息,能被多个消费者接受。
    特点:如果没有消费者,消息会丢失
    应用场景实例:
    商品审核通过时,将根据sku数据利用页面静态化技术生成页面,此时分布式时(每一台主机都有自己页面库),此时用topic,每台主机都会执行
    
    ![1553308107057.png](https://img.haomeiwen.com/i8781623/16b8b9adb8ade447.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

    入门小Demo:

    依赖:

    
    

    QueryProducer.java:

    public class QueueProducer {
    
        public static void main(String[] args) throws JMSException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory=new             ActiveMQConnectionFactory("tcp://192.168.25.135:61616");
            //2.创建连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session(会话对象)  参数1:是否启动事务  参数2:消息确认方式
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息生产者对象
            MessageProducer producer = session.createProducer(queue);
            //7.创建消息对象(文本消息)
            TextMessage textMessage = session.createTextMessage("欢迎来到申请的品优购世界");
            //8.发送消息
            producer.send(textMessage);
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    }
    

    QueueConsumer.java

    public class QueueConsumer {
    
        public static void main(String[] args) throws JMSException, IOException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");
            //2.创建连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session(会话对象)  参数1:是否启动事务  参数2:消息确认方式
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息消费者对象
            MessageConsumer consumer = session.createConsumer(queue);
            //7.设置监听
            consumer.setMessageListener(new MessageListener() {
                
                public void onMessage(Message message) {
                    TextMessage textMessage=(TextMessage)message;
                    try {
                        System.out.println("提取的消息:"+ textMessage.getText() );
                    } catch (JMSException e) {                  
                        e.printStackTrace();
                    }
                    
                }
            });
            //8.等待键盘输入
            System.in.read();
            
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }
    

    SpringJMS

    依赖

      <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.4</version>
         </dependency>
      </dependencies>  
    

    applicationContext-jms-producer.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="http://www.springframework.org/schema/beans   
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context   
            http://www.springframework.org/schema/context/spring-context.xsd">
            
            
        <context:component-scan base-package="cn.itcast.demo"></context:component-scan>     
        
           
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="tcp://192.168.25.135:61616"/>  
        </bean>
           
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
        </bean>  
               
        <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <property name="connectionFactory" ref="connectionFactory"/>  
        </bean>      
        <!--这个是队列目的地,点对点的  文本信息-->  
        <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">  
            <constructor-arg value="queue_text"/>  
        </bean>    
        
        <!--这个是订阅模式  文本信息-->  
        <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">  
            <constructor-arg value="topic_text"/>  
        </bean>  
        
    </beans>
    

    QueueProducer.java

    注意Destination的注入方式 ,属性名称和xml相同:

    @Component
    public class QueueProducer {
    
        @Autowired
        private JmsTemplate jmsTemplate;
        
        @Autowired
        private Destination queueTextDestination;
        
        /**
         * 发送文本消息
         * @param text
         */
        public void sendTextMessage(final String text){
            jmsTemplate.send(queueTextDestination, new MessageCreator() {
                
                public Message createMessage(Session session) throws JMSException {
                    
                    return session.createTextMessage(text);
                }
            });
            
        }
        
    }
    
    

    消费模块

    applicationContext-jms-consumer-queue.xml:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="http://www.springframework.org/schema/beans   
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context   
            http://www.springframework.org/schema/context/spring-context.xsd">
        
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="tcp://192.168.25.135:61616"/>  
        </bean>
           
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
        </bean>  
        
        <!--这个是队列目的地,点对点的  文本信息-->  
        <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">  
            <constructor-arg value="queue_text"/>  
        </bean>    
        
        <!-- 我的监听类 -->
        <bean id="myMessageListener" class="cn.itcast.demo.MyMessageListener"></bean>
        
        
        <!-- 消息监听容器 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="queueTextDestination" />
            <property name="messageListener" ref="myMessageListener" />
        </bean>
        
    </beans>
    

    MyMessageListener.java

    
    public class MyMessageListener implements MessageListener {
    
        public void onMessage(Message message) {
            
            TextMessage textMessage=(TextMessage)message;
            try {
                System.out.println("接收到消息:"+textMessage.getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
    
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:ActiveMQ SpringJMS

          本文链接:https://www.haomeiwen.com/subject/clrkgqtx.html