spring整合activemq

作者: z七夜 | 来源:发表于2018-05-16 21:43 被阅读48次

    1.linux安装activemq

    本例使用docker pull的activemq的镜像,并没有安装,
    安装完成之后通过8161端口访问,输入用户名密码(admin),即可访问activemq的管理界面


    image.png

    2.新建一个maven项目

    这是一个ssm项目。pom如下

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>jk.zmn</groupId>
      <artifactId>spring-activemq</artifactId>
      <packaging>war</packaging>
      <version>0.0.1-SNAPSHOT</version>
      <name>spring-activemq Maven Webapp</name>
      <url>http://maven.apache.org</url>
      
        <properties>
        <spring.version>4.0.5.RELEASE</spring.version>
        <mybatis.version>3.2.1</mybatis.version>
        <slf4j.version>1.6.6</slf4j.version>
        <log4j.version>1.2.12</log4j.version>
        <mysql.version>5.1.35</mysql.version>
        <jackjson.version>2.8.8</jackjson.version>
        <activemq.version>5.11.2</activemq.version>
      </properties>
      <dependencies>
      <!-- 添加Spring依赖 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aspects</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <!--spring单元测试依赖 -->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>${spring.version}</version>
                <scope>test</scope>
            </dependency>
     
      <!-- spring webmvc相关jar -->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-webmvc</artifactId>
                <version>${spring.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-web</artifactId>
                <version>${spring.version}</version>
            </dependency>
      
      <!-- mysql驱动包 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
        
        <!-- alibaba data source 相关jar包-->
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>druid</artifactId>
             <version>0.2.23</version>
         </dependency>
         
        
         <!-- logback start -->
      <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.2</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.logback-extensions</groupId>
            <artifactId>logback-ext-spring</artifactId>
            <version>0.1.1</version>
        </dependency>
    
        
         
        <!--mybatis依赖 -->
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis</artifactId>
            <version>${mybatis.version}</version>
        </dependency>
    
        <!-- mybatis/spring包 -->
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis-spring</artifactId>
            <version>1.2.0</version>
        </dependency>
      <!-- 添加servlet3.0核心包 -->
              <dependency>
                  <groupId>javax.servlet</groupId>
                  <artifactId>javax.servlet-api</artifactId>
                  <version>3.0.1</version>
              </dependency>
              <dependency>
                  <groupId>javax.servlet.jsp</groupId>
                  <artifactId>javax.servlet.jsp-api</artifactId>
                  <version>2.3.2-b01</version>
              </dependency>
              <!-- jstl -->
              <dependency>
                  <groupId>javax.servlet</groupId>
                  <artifactId>jstl</artifactId>
                  <version>1.2</version>
              </dependency>
        <!--单元测试依赖 -->
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
        </dependency>
        
        <dependency>
             <groupId>com.github.pagehelper</groupId>
             <artifactId>pagehelper</artifactId>
             <version>4.1.4</version>
         </dependency>
    
        <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>jackson-core</artifactId>
           <version>${jackjson.version}</version>
        </dependency>
        <dependency>
           <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>jackson-annotations</artifactId>
           <version>${jackjson.version}</version>
        </dependency>
        <dependency>
           <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>jackson-databind</artifactId>
           <version>${jackjson.version}</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>${activemq.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${spring.version}</version>
        </dependency>
      </dependencies>
      
      
      
      <build>
        <finalName>spring-activemq</finalName>
      </build>
    </project>
    

    1.非整合spring的单机版

    1.queue形式

    内容提供者

        @Test
        public void testQueueMqProducter() throws Exception{
            //1.创建链接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");
            //2.创建链接
            Connection connection = factory.createConnection();
            //3.开启连接
            connection.start();
            //4.创建一个session对象
            /**
             * 第一个参数,是否开启事务,如果开启 后面一个参数无意义
             * 第二个参数,应答模式
             */
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建一个目的地
            Queue testQueue = session.createQueue("testQueue");
            //6.创建一个内容提供者
            MessageProducer producter = session.createProducer(testQueue);
            //7.发送消息
            TextMessage QMessage = new ActiveMQTextMessage();
            QMessage.setText("我是队列信息,要减库存了");
            producter.send(QMessage);
            //8.关闭资源
            session.close();
            connection.close();
            
            
        }
    

    消费者

    
        @Test
        public void testQueueMqConsumer() throws Exception{
            //1.创建链接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");
            //2.创建链接
            Connection connection = factory.createConnection();
            //3.开启连接
            connection.start();
            //4.创建一个session对象
            /**
             * 第一个参数,是否开启事务,如果开启 后面一个参数无意义
             * 第二个参数,应答模式
             */
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建一个目的地
            Queue testQueue = session.createQueue("testQueue");
            //6.创建一个消费者
             MessageConsumer consumer = session.createConsumer(testQueue);
            //7.发送消息
             TextMessage receive = (TextMessage) consumer.receive();
             System.out.println(receive.getText());
            //8.关闭资源
            session.close();
            connection.close();
            
            
        }
    

    效果如下

    image.png

    2. 发布订阅模式

    发布者

    @Test
        public void testTopicMqProducter() throws Exception{
            //1.创建链接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");
            //2.创建链接
            Connection connection = factory.createConnection();
            //3.开启连接
            connection.start();
            //4.创建一个session对象
            /**
             * 第一个参数,是否开启事务,如果开启 后面一个参数无意义
             * 第二个参数,应答模式
             */
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建一个目的地
            Topic createTopic = session.createTopic("testTopic");
            //6.创建一个内容提供者
            MessageProducer producter = session.createProducer(createTopic);
            //7.发送消息
            TextMessage QMessage = new ActiveMQTextMessage();
            QMessage.setText("我是发布信息,张三抢到了手机11,rsad");
            producter.send(QMessage);
            //8.关闭资源
            session.close();
            connection.close();
            
            
        }
    
    

    订阅者

    @Test
        public void testTopicMqConsumer() throws Exception{
            //1.创建链接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");
            //2.创建链接
            Connection connection = factory.createConnection();
            //3.开启连接
            connection.start();
            //4.创建一个session对象
            /**
             * 第一个参数,是否开启事务,如果开启 后面一个参数无意义
             * 第二个参数,应答模式
             */
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建一个目的地
            Topic createTopic = session.createTopic("testTopic");
            //6.创建一个内容提供者
            MessageConsumer consumer = session.createConsumer(createTopic);
            
            //获取数据
            consumer.setMessageListener(new MessageListener() {
                
                @Override
                public void onMessage(Message message) {
                    TextMessage message2 = (TextMessage) message;
                    try {
                        System.out.println(message2.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
            System.out.println("消费者1启动");
            System.in.read();
            //8.关闭资源
            session.close();
            connection.close();
            
            
        }
        
        @Test
        public void testTopicMqConsumer2() throws Exception{
            //1.创建链接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");
            //2.创建链接
            Connection connection = factory.createConnection();
            //3.开启连接
            connection.start();
            //4.创建一个session对象
            /**
             * 第一个参数,是否开启事务,如果开启 后面一个参数无意义
             * 第二个参数,应答模式
             */
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建一个目的地
            Topic createTopic = session.createTopic("testTopic");
            //6.创建一个内容提供者
            MessageConsumer consumer = session.createConsumer(createTopic);
            
            //获取数据
            consumer.setMessageListener(new MessageListener() {
                
                @Override
                public void onMessage(Message message) {
                    TextMessage message2 = (TextMessage) message;
                    try {
                        System.out.println(message2.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
            System.out.println("消费者2启动");
            System.in.read();
            //8.关闭资源
            session.close();
            connection.close();
            
            
        }
        
    

    效果如下


    image.png

    2 整合spring

    spring-activemq.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
        xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
        
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://www.itzmn.com:61616" />
        </bean>
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
        <bean id="connectionFactory"
            class="org.springframework.jms.connection.SingleConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
            <property name="targetConnectionFactory" ref="targetConnectionFactory" />
        </bean>
        
        <!-- 配置生产者 -->
        <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
            <property name="connectionFactory" ref="connectionFactory" />
        </bean>
        
        
        <!--这个是队列目的地,点对点的 -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg>
                <value>spring-queue</value>
            </constructor-arg>
        </bean>
        <!--这个是主题目的地,一对多的 -->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="springTopic" />
        </bean>
        <!-- 消息监听 -->
        <bean id="myMessageListener" class="jk.zmn.activemq.listener.MyMessageListener"/>
        <!-- 消息监听容器 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="queueDestination" />
            <property name="messageListener" ref="myMessageListener" />
        </bean>
        
        <!-- 消息监听 -->
        <bean id="myTopicMessageListener" class="jk.zmn.activemq.listener.MyTopicMessageListener"/>
        <!-- 消息监听容器 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="topicDestination" />
            <property name="messageListener" ref="myTopicMessageListener" />
        </bean>
        
        <!-- 消息监听 -->
        <bean id="myTopicMessageListener2" class="jk.zmn.activemq.listener.MyTopicMessageListener2"/>
        <!-- 消息监听容器 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="topicDestination" />
            <property name="messageListener" ref="myTopicMessageListener2" />
        </bean>
        
        
        
    </beans>
    
    

    queue模式

    内容提供者

    @Test
        public void testSpringActiveMqProducter() {
            ApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext-activemq.xml");
            JmsTemplate jmsTemplate = classPathXmlApplicationContext.getBean(JmsTemplate.class);
            Destination destination = (Destination) classPathXmlApplicationContext.getBean("queueDestination");
            jmsTemplate.send(destination,new MessageCreator() {
                
                @Override
                public Message createMessage(Session session) throws JMSException {
                    
                    return session.createTextMessage("生意来了,张三购买商品");
                }
            });
        }
    

    这个消费者,要实现messagelistener的接口

    public class MyMessageListener implements MessageListener{
    
        @Override
        public void onMessage(Message message) {
            TextMessage message2 = (TextMessage) message;
            try {
                System.out.println(message2.getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }
    
    

    发布订阅模式

    发布者

    @Test
        public void testSpringActiveMqTopicProducter() {
            ApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext-activemq.xml");
            JmsTemplate jmsTemplate = classPathXmlApplicationContext.getBean(JmsTemplate.class);
            Destination destination = (Destination) classPathXmlApplicationContext.getBean("topicDestination");
            jmsTemplate.send(destination, new MessageCreator() {
                
                @Override
                public Message createMessage(Session session) throws JMSException {
                    
                    return session.createTextMessage("又来生意啦,李四要购买手机");
                }
            });
        }
    

    订阅者

    public class MyTopicMessageListener implements MessageListener{
    
        @Override
        public void onMessage(Message message) {
            try {
                TextMessage message2 = (TextMessage) message;
                System.out.println(message2.getText());
                System.out.println("我去减库存");
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }
    
    public class MyTopicMessageListener2 implements MessageListener{
    
        @Override
        public void onMessage(Message message) {
            try {
                TextMessage message2 = (TextMessage) message;
                System.out.println(message2.getText());
                System.out.println("我去生成订单");
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
        }
    
    }
    
    

    代码太乱,实验的话,请到码云下载,

    群号:552113611

    相关文章

      网友评论

      本文标题:spring整合activemq

      本文链接:https://www.haomeiwen.com/subject/zfimdftx.html