ActiveMQ是Apache研发的消息中间件,是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
消息的传递有两种形式:
①点对点 ,即生产者和消费者一一对应。
②发布/订阅形式(广播),即一生产者产生消息并发送后,可以由多个消费者接收。
JMS提供五种不同的消息正文格式。
StreamMessage - Java原始数据流
MapMessage - 键值对
TextMessage - 字符串
ObjectMessage - 序列化的JAVA对象
BytesMessage - 字节流
解决后台访问error:405问题:
机器名:etc/sysconfig/network文件中查看
NETWORKING=yes
HOSTNAME=localhost.localdomain
修改hosts文件127.0.0.1 域名映射。
127.0.0.1 localhost localhost.localdomain
Queue方式,消息默认持久化
Producer
/**
* 点到点形式发送消息
* <p>Title: testQueueProducer</p>
* <p>Description: </p>
* @throws Exception
*/
@Test
public void testQueueProducer() throws Exception {
//1、创建一个连接工厂对象,需要指定服务的ip及端口。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
//2、使用工厂对象创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
//3、开启连接,调用Connection对象的start方法。
connection.start();
//4、创建一个Session对象。
//第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
//第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用queue
Queue queue = session.createQueue("test-queue");
//6、使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(queue);
//7、创建一个Message对象,可以使用TextMessage。
/*TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("hello Activemq");*/
TextMessage textMessage = session.createTextMessage("hello activemq");
//8、发送消息
producer.send(textMessage);
//9、关闭资源
producer.close();
session.close();
connection.close();
}
Consumer
@Test
public void testQueueConsumer() throws Exception {
//创建一个ConnectionFactory对象连接MQ服务器
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
//创建一个连接对象
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//使用Connection对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个Destination对象。queue对象
Queue queue = session.createQueue("test-queue");
//使用Session对象创建一个消费者对象。
MessageConsumer consumer = session.createConsumer(queue);
//接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//打印结果
TextMessage textMessage = (TextMessage) message;
String text;
try {
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//等待接收消息
System.in.read();
//关闭资源
consumer.close();
session.close();
connection.close();
}
Topic方式,消息默认不持久化
Producer
@Test
public void testTopicProducer() throws Exception {
//1、创建一个连接工厂对象,需要指定服务的ip及端口。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
//2、使用工厂对象创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
//3、开启连接,调用Connection对象的start方法。
connection.start();
//4、创建一个Session对象。
//第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。
//第二个参数:应答模式。自动应答或者手动应答。一般自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用topic
Topic topic = session.createTopic("test-topic");
//6、使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(topic);
//7、创建一个Message对象,可以使用TextMessage。
/*TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("hello Activemq");*/
TextMessage textMessage = session.createTextMessage("topic message");
//8、发送消息
producer.send(textMessage);
//9、关闭资源
producer.close();
session.close();
connection.close();
}
Consumer
@Test
public void testTopicConsumer() throws Exception {
//创建一个ConnectionFactory对象连接MQ服务器
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616");
//创建一个连接对象
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//使用Connection对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个Destination对象。topic对象
Topic topic = session.createTopic("test-topic");
//使用Session对象创建一个消费者对象。
MessageConsumer consumer = session.createConsumer(topic);
//接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//打印结果
TextMessage textMessage = (TextMessage) message;
String text;
try {
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("topic消费者3启动。。。。");
//等待接收消息
System.in.read();
//关闭资源
consumer.close();
session.close();
connection.close();
}
整合Spring
引入相关jar包:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
spring-activemq.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.161:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="itemAddTopic" />
</bean>
<bean id="myMessageListener" class="cn.e3mall.search.message.MyMessageListener"/>
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
<!-- 监听商品添加消息,同步索引库 -->
<bean id="itemAddMessageListener" class="cn.e3mall.search.message.ItemAddMessageListener"/>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicDestination" />
<property name="messageListener" ref="itemAddMessageListener" />
</bean>
</beans>
发送消息
public class ActiveMqSpring {
@Test
public void sendMessage() throws Exception {
//初始化spring容器
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//从容器中获得JmsTemplate对象。
JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
//从容器中获得一个Destination对象。
Destination destination = (Destination) applicationContext.getBean("queueDestination");
//发送消息
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("send activemq message");
}
});
}
}
接收消息
ItemAddMessageListener.java
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
//取消息内容
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
spring-activemq.xml配置接收器,随spring启动开启监听
<!-- 监听商品添加消息,同步索引库 -->
<bean id="itemAddMessageListener" class="cn.e3mall.search.message.ItemAddMessageListener"/>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicDestination" />
<property name="messageListener" ref="itemAddMessageListener" />
</bean>
网友评论