- setp 1:添加依赖
<!--集中定义依赖版本-->
<properties>
<activemq.version>5.11.2</activemq.version>
<spring.version>4.2.4.RELEASE</spring.version>
<junit.version>4.12</junit.version>
</properties>
<dependencies>
<!--activemq-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>
<!--jms-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<!--整合支持-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<!--junit-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
</dependencies>
- step 2:创建
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd">
<!--开启注解扫描-->
<context:annotation-config/>
<!--JMS服务厂商提供的连接工厂-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.128:61616"/>
</bean>
<!--spring对厂商提供的连接工厂的包装,方便更换不同的JMS实现-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!--引用了实际的连接工厂-->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!--配置生产者,使用spring提供的jms模板-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!--将spring包装后的连接工厂注入-->
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!--点对点消息队列-->
<bean class="org.apache.activemq.command.ActiveMQQueue" id="activeMQQueue">
<constructor-arg>
<!--指定队列名称-->
<value>spring_queue</value>
</constructor-arg>
</bean>
<!--发布/订阅消息队列-->
<bean class="org.apache.activemq.command.ActiveMQTopic" id="activeMQTopic">
<constructor-arg>
<value>spring_topic</value>
</constructor-arg>
</bean>
spring整合后的连接工厂是包装后的连接工厂,今后若是项目想要替换消息中间件技术如RabbitMq
,RocketMq
,只需要在配置文件中更改连接工厂的实现即可(还有注解信息...)。
- step 3:创建队列消息生产者
/**
* 使用spring整合Junit4方式测试代码
* 避免了测试方法前使用@Before注解手动初始化spring容器
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext.xml")
public class Queue_Producer {
@Resource(name = "jmsTemplate")
private JmsTemplate jmsTemplate;
@Resource(name = "activeMQQueue")
private Queue queue;
@Test
public void sendQueueMessage() {
jmsTemplate.send(queue, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("text five!");
}
});
}
}
- step 4:创建消息监听类
/**
* 消息监听器
*/
public class MyMessageListener implements MessageListener {
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
- step 5:在
applicationContext.xml
中配置监听器bean
和监听容器
-------------------在applicationContext.xml中追加以下内容---------------------
<!--消息监听器-->
<bean id="myMessageListener" class="listener.MyMessageListener"/>
<!--消息监听容器-->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="messageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="messageListener" ref="myMessageListener"/>
<property name="destination" ref="activeMQQueue"/>
</bean>
运行生产者类,消息即可被消费。
- topic 消息生产与消费
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext.xml")
public class Topic_Producer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
@Qualifier("activeMQTopic")
private Topic topic;
@Test
public void sendMessage() {
jmsTemplate.send(topic, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("its a topic message");
}
});
}
}
在applicationContext.xml
中创建监听容器,监听类为上文的 listener
<!--广播消息监听容器-->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="topicMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="messageListener" ref="myMessageListener"/>
<property name="destination" ref="activeMQTopic"/>
</bean>
运行生产者类,消息即可被消费。
spring 整合了 ActiveMq 之后,当 spring 容器初始化时,监听器就会监听连接工厂中定义的 ip 和 端口,一旦生产者将消息发送至服务器,监听器就会获取消息进行消费。
注意:本案例的MyMessageListener
类不能定义在Test
目录下,否则无法解析该类。
若有疑惑,可能阅读 http://www.jianshu.com/p/1439340d2388 有所帮助。
本文完。
网友评论