ActiveMQ的安装下载见:ActiveMQ的下载安装,ActiveMQ的点对点模型也在其中讲过。
1 订阅者Subscriber
public class Subscriber1 {
// 默认的连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
// 默认的连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
// 默认的连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//连接工厂
ConnectionFactory connectionFactory;
Connection connection = null;//连接
Session session;//会话
Destination destination;//消息目的地
MessageConsumer consumer;//消息消费者
connectionFactory =
new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
try {
//通过工厂获取连接
connection = connectionFactory.createConnection();
connection.start();//启动连接
//创建session
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//这里的名字要和ActiveMQ中的队列名子一致
destination = session.createTopic("短信发送");
consumer = session.createConsumer(destination);//创建消息消费者
// 写MQ的监听器
consumer.setMessageListener(new MyMessageListener1());
session.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 消息监听器
*/
class MyMessageListener1 implements MessageListener{
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("Consumer1从MQ队列中接收消息:"+msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Subscriber2和Subscriber1一样,这里略去。
2 发布者
public class JMSPublisher {
// 默认的连接用户名
private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;
// 默认的连接密码
private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;
// 默认的连接地址
private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//连接工厂
ConnectionFactory connectionFactory;
Connection connection = null;//连接
Session session;//会话
Destination destination;//消息目的地
MessageProducer messageProducer;//消息生产者
connectionFactory =
new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
try {
//通过工厂获取连接
connection = connectionFactory.createConnection();
connection.start();//启动连接
//创建session
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic("短信发送");
//创建消息生产者
messageProducer = session.createProducer(destination);
//向对列中发送10条消息
for(int i = 0;i <= 9;i++){
String message = i+"123456789";
TextMessage msg = session.createTextMessage(message);
messageProducer.send(destination,msg);
}
session.commit();
}catch (Exception e){
e.printStackTrace();
}finally {
// 关闭连接
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
3 测试
先启动两个订阅者,然后在开启发布者,每次发布者发布消息时,两个订阅者的窗口都会收到发布者发布的消息。
相对于点对点模型,发布订阅模式下,订阅者是被动接收MQ推送的消息,而点对点模型需要消费者主动去队列中取消息。
网友评论