1.书写生产者消息生产的代码:
package com.shuai;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 向mq推送消息步奏
*/
public class JmsActivemq {
public static final String ACTIVEURL = "tcp://192.168.29.128:61616";
public static final String QUEUE1 = "queue1";
public static void main(String[] args) throws JMSException {
// 1.创建连接工厂
ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEURL);
// 2.生产连接,并开启连接
Connection connection = activeMqConnectionFactory.createConnection();
connection.start();
// 3.创建session会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地
Queue queue = session.createQueue(QUEUE1);
// 5.创建生产者
MessageProducer producer = session.createProducer(queue);
// 6.创建三个消息
for (int i = 1; i <= 3; i++) {
// 7.生产者发送消息到创建好的队列里面
TextMessage textMessage = session.createTextMessage("刘德华" + i);
producer.send(textMessage);
}
// 8.关闭资源
producer.close();
session.close();
connection.close();
System.out.println("over");
}
}
mq效果:
image.png
第一框是发送的消息个数,第二个框是等待的消息个数
2.书写消费者接收消息的代码:
(1)正常的消费者消费消息:
package com.shuai;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsActiveCustemer {
public static final String ACTIVEURL = "tcp://192.168.29.128:61616";
public static final String QUEUE1 = "queue1";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEURL);
// 2.创建连接
Connection connection = mqConnectionFactory.createConnection();
connection.start();
// 3.创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.创建目的地
Queue queue = session.createQueue(QUEUE1);
// 5.创建消费者
MessageConsumer consumer = session.createConsumer(queue);
// 6.接受消息打印
while (true) {
TextMessage message = (TextMessage) consumer.receive();
if (null != message) {
System.out.println(message.getText());
} else {
break;
}
}
// 7.关闭资源
consumer.close();
session.close();
connection.close();
}
}
image.png
(2)使用监听器消费消息,生产者生产一条消息,这边就会监听一条消息, 使用监听器需要添加System.in.read();这条语句保证控制台不关闭。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null!=message && message instanceof TextMessage){
TextMessage textMessage=(TextMessage)message;
System.out.println(textMessage);
}
}
});
// 保证控制台不关闭
System.in.read();
当你开启两个消费者的时候,生产6个消息,每一个消费者消费3个消息
目的地一共有两种,这一种是队列还有一种是主题,当生产者生产6个消息的时候对于每一个主题都会接收到6个消息。这里要注意先启动消费者在启动生产,如果先启动生产没有订阅的话,生产的消息就是费消息。
3.MQ的可靠性:
(1).持久化内容:
// 持久化操作如果MQ挂了之前生产的消息还是不会丢失,如果设置为非持久化时服务挂之前的消息就会丢失。默认的是持久化操作。主题持久化操作需要在创建主题之后再将连接开启。
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
(2)创建session所带参数事务与签收:
// 3.创建session会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
第一个参数是是否使用事务,设置为true如果出现错误的话会回滚。设置之后需要在关闭连接之前进行提交。签收的话可以根据自己的需求设置为手动或者自动签收。
网友评论