队列一对一
image.pngimage.png image.png
观察两者是不是有些似曾相识
都是一个套路
依赖
<!--activemq-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.11</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
ActiveMQ下的哪个版本,就装哪个<version>Linux中下载的版本号</version>
不废话直接上代码(生产者)
package com.yd.telnet.modular.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author
* @Date 2020/3/19
*/
public class JmsProduce {
//为什么是tcp,看源码!
public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.105:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
// 1、创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// 2、通过连接工厂,获取连接connection
Connection connection = activeMQConnectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话session
// 两个参数,事务、签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建接收的对象(队列还是主题)
// destination目的地(queue队列、topic主题)
//Destination destination = session.createQueue(QUEUE_NAME);
Queue queue = session.createQueue(QUEUE_NAME);
// 6、创建消息生产者
MessageProducer producer = session.createProducer(queue);
// 7、通过使用producer产生三条消息发送到队列里面
for(int i = 0;i<3;i++){
// 逐一创建消息
TextMessage textMessage = session.createTextMessage("msg------------" + i);
// 通过producer发送给mq
producer.send(textMessage);
}
// 8、关闭资源(先进后出,同jdbc)
producer.close();
session.close();
connection.close();
}
}
看效果
再运行一次,且删除掉其中的三条记录看结果(可以将删除的三条当做已消费)
image.png
image.png
总结
当有一个消息进入这个队列是,等待消费的消息是1,进入队列的消息是1。
当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。
再来一条消息时,等待消费的消息是1,进入队列的消息就是2(只增不减)
不废话直接上代码(消费者)
package com.yd.telnet.modular.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author
* @Date 2020/3/19
*/
public class JmsConsumer {
public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.105:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
// 1、创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// 2、通过连接工厂,获取连接connection
Connection connection = activeMQConnectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话session
// 两个参数,事务、签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建接收的对象(队列还是主题)
// destination目的地(queue队列、topic主题)
//Destination destination = session.createQueue(QUEUE_NAME);
Queue queue = session.createQueue(QUEUE_NAME);
// 6、创建消息消费者
MessageConsumer consumer = session.createConsumer(queue);
while(true){
TextMessage textMessage = (TextMessage) consumer.receive();
if(null != textMessage){
System.out.println("消费者处理消息"+ textMessage.getText());
}else{
break;
}
}
// 7、关闭资源
consumer.close();
session.close();
connection.close();
}
}
运行代码后
image.png
由于consumer.receive()这个方法是一个不离不弃,啥时候activemq停止啥时候结束,所以产生receive的第二种写法
image.png
receive属于同步运行阻塞,所以又有了监听的方式
package com.yd.telnet.modular.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
* @author 张思博
* @Date 2020/3/19
*/
public class JmsConsumer {
public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.105:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
// 1、创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// 2、通过连接工厂,获取连接connection
Connection connection = activeMQConnectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话session
// 两个参数,事务、签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建接收的对象(队列还是主题)
// destination目的地(queue队列、topic主题)
//Destination destination = session.createQueue(QUEUE_NAME);
Queue queue = session.createQueue(QUEUE_NAME);
// 6、创建消息消费者
MessageConsumer consumer = session.createConsumer(queue);
// 7、通过监听的方式来消费消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("监听消费者消费消息"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 8、保持控制台不灭
System.in.read();
// 9、关闭资源
consumer.close();
session.close();
connection.close();
}
}
该种方式:有消息就消费,没消息就等待
消息的消费者接收消息可以采用两种方式:
1.consumer.receive()或 consumer.receive(int timeout);(同步阻塞)
2.注册一个MessageListener.
采用第一种方式,消息的接收者会一直等待下去,直到有消息到达或者超时。后一种方式会注册一个监听器,当有消息到达的时候,会调用它的onMessage()方法。(异步非阻塞)
可以用监听的方式启动两个消费者,生产的消息会被平均分配(自己试)
image.png
网友评论