美文网首页activeMQ
Spring整合ActiveMQ

Spring整合ActiveMQ

作者: ouyangan | 来源:发表于2017-01-17 17:53 被阅读228次

    activemq安装

    • 进入activemq官网
    • 下载apache-activemq-5.9.1-bin.tar.gz
    • 配置jdk ,上传activemq至服务器
    • `tar zxvf activemq-x.x.x.tar.gz``
    • chmod 755 activemq
    • cd [activemq_install_dir]/bin
    • ./activemq start,更多详细命令参考http://activemq.apache.org/version-5-getting-started.html
    • 访问监控页面http://ipAdress:8161

    spring整合activemq

    • 依赖
               <dependency>
                   <groupId>org.apache.activemq</groupId>
                   <artifactId>activemq-core</artifactId>
                   <version>5.7.0</version>
               </dependency>
               <dependency>
                   <groupId>org.apache.activemq</groupId>
                   <artifactId>activemq-pool</artifactId>
                   <version>5.14.3</version>
               </dependency>
    
    • 配置
        <!-- ActiveMQ 连接工厂 -->
        <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="${activemq.brokerURL}"/>
            <property name="userName" value="${activemq.username}"/>
            <property name="password" value="${activemq.password}"/>
        </bean>
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
            <property name="sessionCacheSize" value="100"/>
        </bean>
        <!-- 定义JmsTemplate的Queue类型 -->
        <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
            <constructor-arg ref="connectionFactory"/>
            <!-- 定义默认的队列名称-->
            <property name="defaultDestinationName" value="queueDemo"/>
            <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
            <property name="pubSubDomain" value="false"/>
        </bean>
        <!-- 定义JmsTemplate的Topic类型 -->
        <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
            <constructor-arg ref="connectionFactory"/>
            <property name="defaultDestinationName" value="topicDemo"/>
            <!-- pub/sub模型(发布/订阅) -->
            <property name="pubSubDomain" value="true"/>
        </bean>
    
        <!--监听器-->
        <bean id="queueDemoA" class="com.hunt.jms.demo.QueueDemoA"/>
        <bean id="subDemoA" class="com.hunt.jms.demo.SubDemoA"/>
        <bean id="subDemoB" class="com.hunt.jms.demo.SubDemoB"/>
        <bean id="subDemoC" class="com.hunt.jms.demo.SubDemoC"/>
    
        <!-- 定义Queue监听器 -->
        <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
            <jms:listener destination="queueDemo" ref="queueDemoA"/>
        </jms:listener-container>
    
        <!-- 定义Topic监听器 -->
        <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
            <jms:listener destination="topicDemo" ref="subDemoA"/>
            <jms:listener destination="topicDemo" ref="subDemoB"/>
            <jms:listener destination="topicDemo" ref="subDemoC"/>
        </jms:listener-container>
    
    • 实现监听器
    public class QueueDemoA implements MessageListener {
        //计数器
        private AtomicInteger count = new AtomicInteger(1);
    
        @Override
        public void onMessage(Message message) {
            try {
                System.out.println();
                Thread.currentThread().sleep(2000L);
                System.out.println("队列接收消息-> " + (count.incrementAndGet()));
                ActiveMQTextMessage message1 = (ActiveMQTextMessage) message;
                System.out.println(message1.getText());
            } catch (InterruptedException | JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    public class SubDemoA implements MessageListener {
        @Override
        public void onMessage(Message message) {
            try {
                System.out.println();
                Thread.currentThread().sleep(2000L);
                ActiveMQTextMessage message1 = (ActiveMQTextMessage) message;
                System.out.println("A接收消息-> " +message1.getText());
            } catch (InterruptedException | JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    public class SubDemoB implements MessageListener {
        @Override
        public void onMessage(Message message) {
            try {
                System.out.println();
                Thread.currentThread().sleep(2000L);
                ActiveMQTextMessage message1 = (ActiveMQTextMessage) message;
                System.out.println("B接收消息-> " +message1.getText());
            } catch (InterruptedException | JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 测试
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration({"classpath:spring.xml"})
    @Transactional
    public class DebugTest {
    
        @Resource(name = "jmsQueueTemplate")
        private JmsTemplate jmsQueueTemplate;
        @Resource(name = "jmsTopicTemplate")
        private JmsTemplate jmsTopicTemplate;
    
        @Test
        public void testQueue() {
            for (int i = 0; i < 1000; i++) {
                sendMessage(String.valueOf(i + 1));
            }
            while (true) {
    
            }
        }
    
        @Test
        public void testTopic() {
            for (int i = 0; i < 1000; i++) {
                sendTopic(String.valueOf(i + 1));
            }
            while (true) {
    
            }
        }
        private void sendMessage(String msg) {
            jmsQueueTemplate.send(session -> session.createTextMessage(msg));
        }
    
        private void sendTopic(String msg) {
            jmsTopicTemplate.send(session -> session.createTextMessage(msg));
        }
    }
    
    • 队列效果


      Paste_Image.png
    • 发布/订阅效果


      Paste_Image.png

    相关文章

      网友评论

      • Nathans:请问为什么设置成手动确认但不生效

      本文标题:Spring整合ActiveMQ

      本文链接:https://www.haomeiwen.com/subject/dyjmbttx.html