What is ActiveMQ?
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
主要特点:
- 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
- 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
- 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
- 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
- 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
- 支持通过JDBC和journal提供高速的消息持久化
- 从设计上保证了高性能的集群,客户端-服务器,点对点
- 支持Ajax
- 支持与Axis的整合
- 可以很容易得调用内嵌JMS provider,进行测试
ActiveMQ的消息形式
对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
- · StreamMessage -- Java原始值的数据流
- · MapMessage--一套名称-值对
- · TextMessage--一个字符串对象
- · ObjectMessage--一个序列化的 Java对象
- · BytesMessage--一个字节的数据流
使用测试(未整合Spring)
public void testQueueProducer() throws Exception {
// 1.创建一个连接工厂对象,需要指定服务的ip以及端口
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.128:61616");
// 2.使用工厂对象来创建一个Connection对象
Connection connection = connectionFactory.createConnection();
// 3.开启连接,调用Connection对象的start方法
connection.start();
// 4.创建一个Session对象
// 第一个参数:是否开启事务。如果true开启事务,第二个无意义。一般不开启false
// 第二个参数:如果不开启事务。应答模式,一般是自动应答或者手动应答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.使用Session对象创建一个Destination对象。两种形式queue、topic
Queue queue = session.createQueue("spring-queue");
// 6.使用Session对象创建一个Producer对象
MessageProducer producer = session.createProducer(queue);
// 7.创建一个Message对象,可以使用TextMessage。
/*
* TextMessage textMessage=new ActiveMQTextMessage();
* textMessage.setText("hello Activemq");
*/
TextMessage textMessage = session.createTextMessage("hello Activemq");
// 8.发送消息
producer.send(textMessage);
// 9.关闭资源
producer.close();
session.close();
connection.close();
}
public void testQueueConsumer() throws Exception {
// 创建一个ConnectionFactory对象连接MQ服务器
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.128:61616");
// 创建一个连接对象
Connection connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 使用Connection对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个Destinatiion对象,queue对象
Queue queue = session.createQueue("spring-queue");
// 使用Session对象创建一个消费者对象
MessageConsumer consumer = session.createConsumer(queue);
// 接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// 打印结果
TextMessage textMessage = (TextMessage) message;
String text;
try {
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 等待接收消息
System.in.read();
// 关闭资源
consumer.close();
session.close();
connection.close();
}
public void testTopicProducer() throws Exception {
// 1.创建一个连接工厂对象,需要指定服务的ip以及端口
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.128:61616");
// 2.使用工厂对象来创建一个Connection对象
Connection connection = connectionFactory.createConnection();
// 3.开启连接,调用Connection对象的start方法
connection.start();
// 4.创建一个Session对象
// 第一个参数:是否开启事务。如果true开启事务,第二个无意义。一般不开启false
// 第二个参数:如果不开启事务。应答模式,一般是自动应答或者手动应答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.使用Session对象创建一个Destination对象。两种形式queue、topic
Topic topic = session.createTopic("test-topic");
// 6.使用Session对象创建一个Producer对象
MessageProducer producer = session.createProducer(topic);
// 7.创建一个Message对象,可以使用TextMessage。
/*
* TextMessage textMessage=new ActiveMQTextMessage();
* textMessage.setText("hello Activemq");
*/
TextMessage textMessage = session.createTextMessage("topicMessage");
// 8.发送消息
producer.send(textMessage);
// 9.关闭资源
producer.close();
session.close();
connection.close();
}
public void testTopicConsumer() throws Exception {
// 创建一个ConnectionFactory对象连接MQ服务器
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.128:61616");
// 创建一个连接对象
Connection connection = connectionFactory.createConnection();
// 开启连接
connection.start();
// 使用Connection对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个Destinatiion对象,queue对象
Topic topic = session.createTopic("test-topic");
// 使用Session对象创建一个消费者对象
MessageConsumer consumer = session.createConsumer(topic);
// 接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// 打印结果
TextMessage textMessage = (TextMessage) message;
String text;
try {
text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("topic 消费者3已经启动");
// 等待接收消息
System.in.read();
// 关闭资源
consumer.close();
session.close();
connection.close();
}
作者:lhsjohn
网友评论