@[toc]
docker搭建rabbitm
"Hello World!":
官网教程
点对点,一个生产者,一个消费者,一个队列。
特点:
- 没有交换机概念,生产者和消费者直接通过队列进行交流
1. mq创建一个队列
- 安装完rabbitm直接访问 118.25.188.37:15672
- 进入登入界面:默认密码都guest
-
==这里不创建队列也行,java中绑定队列如果队列没有会创建==
在这里插入图片描述
在这里插入图片描述
完成后点击add queue
在这里插入图片描述
2. 创建生产者消费者
- 引入依赖
dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>5.73</version>
</dependency>
</dependencies>
- 获取mq连接工具类(类似jdbc连接)
- MQConnectionUtils
public class MQConnectionUtils {
private static final String IP = "118.25.188.37";
private static final Integer PORT = 5672;
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
public static Connection newConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost(IP);
//设置端口号
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
//创建连接
Connection connection = factory.newConnection();
return connection;
}
}
- Producer 生产者
- Producer
public class Producer {
private static final String QUEUE_NAME = "mq";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取连接
Connection newConnection = MQConnectionUtils.newConnection();
// 2.创建通道
Channel channel = newConnection.createChannel();
// 3.创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "直接模式消息发送";
System.out.println("生产者发送消息:" + msg);
// 4.发送消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.close();
newConnection.close();
}
}
发送消息后mq会出现待消费的消息
在这里插入图片描述
- Customer 消费者
- Customer
public class Customer {
private static final String QUEUE_NAME = "mq";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.获取连接
Connection newConnection = MQConnectionUtils.newConnection();
// 2.获取通道
Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msgString);
}
};
// 3.监听队列 true表示自动应答,false表示手动应答
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
消费完后mq消息就没有了
工作队列 Work queues
在这里插入图片描述与点对点不同的是,消费者由1个变成了两个,消费者集群了
我们这里启动两个消费者
在这里插入图片描述
在这里插入图片描述
然后发送10条消息
看看结果:
在这里插入图片描述 在这里插入图片描述
可以看到实现的是均摊消费
应答模式
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
这里第二个参数表示应答模式为true,表示自动签收
- 自动应答:不会在乎消费者对这个消息处理是否成功,都会告诉队列删除该消息,如果消息获取失败的情况,实现自动补偿
- 手动应答:消费者处理完业务逻辑,手动返回一个ack(通知)告诉队列服务器是否删除该消息
这里我们将 应答模式设置为false
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
然后向消费者发送10个消息
可以看到消费者接收到了10个消息,但是我现在如果停止消费者
在这里插入图片描述
发现队列中还是有10个消息未消费,原因我我们没有手动返回ask
这里我们需要加上这个
channel.basicAck(envelope.getDeliveryTag(), false);
public class Customer {
private static final String QUEUE_NAME = "mq";
public static void main(String[] args) throws Exception {
System.out.println("消费者2启动");
// 1.获取连接
Connection newConnection = MQConnectionUtils.newConnection();
/* 2.获取通道 */
Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//监听队列
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msgString);
//手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 3.监听队列 true表示自动应答,false表示手动应答
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
这样就表示消费者接受消息成功了
实现:添加如下代码channel.basicQos(1);
public class Customer {
private static final String QUEUE_NAME = "mq";
public static void main(String[] args) throws Exception {
System.out.println("消费者2启动");
// 1.获取连接
Connection newConnection = MQConnectionUtils.newConnection();
/* 2.获取通道 */
Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//表示一次只消费一个消息
channel.basicQos(1);
//监听队列
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msgString);
//手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 3.监听队列 true表示自动应答,false表示手动应答
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
公平队列
在上面我们消费者如果集群,消费者接受采用的均摊消费,但每个消费者处理业务时间不同,这样就不能让性能更好的消费者消费更多的消息(能者多劳)
- 解决方案:消费者都采用应答模式实现公平队列,即谁消费快,消费的消息多
集成springboot
demo 路径结构:
在这里插入图片描述
2.代码测试
创建一个springboot项目,然后加入mq依赖
dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
配置 application.properties
# web端口
server.port=8089
# mq地址
spring.rabbitmq.host=118.25.188.37
测试生产者
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class ProductTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void senMes() {
rabbitTemplate.convertAndSend("mq","直接模式消息发送");
}
}
运行sendMes
无错误后访问mq web管理页面发现多了一条待消费的消息
在这里插入图片描述
编写消费者
@Component
@RabbitListener(queues = "mq") //指定消费队列
public class Customer1 {
@RabbitHandler
public void getMsg(String msg) {
System.out.println("直接模式消费消息" + msg);
}
}
然后直接运行main
可以看到效果
在这里插入图片描述
在这里插入图片描述
也可以启动多个消费者等待消息,具体idea启动多个实例请看这里
网友评论