先说一下安装rabbitmq
1,安装opt_win64_22.0.exe (Erlang )
2,安装rabbitmq
3,计算机--管理--服务 找到rabbitmq服务右键属性,更改为当前登录人并填写密码,然后重启服务
4,去安装目录C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin的路径的cmd窗口 执行:rabbitmq-plugins enable rabbitmq_management 打开管理窗口
5,管理地址:http://localhost:15672 登录名:guest,密码:guest(注意:本地登录才有效,远程登录不行)
概述:
rabbitmq是一个消息队列,主要用于系统之间的异步和解耦,同时也能起到消息缓存,消息分发的作用。
一般的消息队列有3个关键部分,生产者,队列,消费者;但是rabbitmq还有一个exchange(交换机);交换机的作用是:生产者把消息给交换机然后根据策略路由到队列上面去保存;交换机不能保存消息。
rabbitmq还有一个虚拟主机的概念:其作用就是权限隔离,A虚拟机的交换机和队列跟B虚拟机的交换机和队列是不能互相访问(针对用户),就是用户仅有A虚拟机的权限就不能去访问B虚拟机里的交换机和队列。
交换机四种模式(每个队列中的消息只能被消费一次):
交换机的作用就是把消息路由到绑定的队列中去。
direct:简单模式,一个发,另一个收
topic:主题模式,比direct灵活
fanout:广播模式,绑定的队列全部都能消费
headers:设置 header attribute 参数类型的交换机(使用的少,后期再琢磨)
特性:
ack:应答机制,用于很重要的消息,必须确保送到且消费。默认是自动应答;如需手动应答需要自己设置配置文件。
durable:持久化机制,把队列中的数据持久化到硬盘,避免rabbitmq服务器down机,而数据不会丢失。
贴代码(springboot整合rabbitmq):
properties配置:
#rabbitmq配置
spring.rabbitmq.addresses=127.0.0.1
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
# 开启ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
direct模式(不用配置交换机,有默认的)
配置队列:
package com.example.rabbitmq1.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbitmq的默认模式:direct,最简单的队列模式,一个生产者,一个消费者,交换机也是默认的不用创建,路由key默认是队列名
* 1个消息只能被消费一次,如果有多个消费者,rabbitmq服务器会自己做负载均衡,平均消费
*/
@Configuration
public class DirectConfig {
public static final String queue_direct_a = "queue.direct.a";
public static final String queue_direct_user = "queue.direct.user";
@Bean(name = queue_direct_a)
public Queue queue1(){
return new Queue(queue_direct_a);
}
@Bean(name = queue_direct_user)
public Queue queue2(){
return new Queue(queue_direct_user);
}
}
生产者:
package com.example.rabbitmq1.producer;
import com.example.rabbitmq1.config.DirectConfig;
import com.example.rabbitmq1.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* 生产者
*/
@Component
public class DirectProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMsg(String info){
String now = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);
String msg = "时间: " + now + ", info: " + info;
amqpTemplate.convertAndSend(DirectConfig.queue_direct_a, msg);
}
/**
* mq之间传递Java对象(Java对象要实现序列化接口)
* @param user
*/
public void sendMsg(User user){
amqpTemplate.convertAndSend(DirectConfig.queue_direct_user, user);
}
}
消费者:
package com.example.rabbitmq1.consumer;
import com.example.rabbitmq1.config.DirectConfig;
import com.example.rabbitmq1.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = DirectConfig.queue_direct_user)
public class DirectConsumer3 {
@RabbitHandler
public void receive(User user){
// 如果是String类型,接收类型就改为String
System.out.println("-----------------------------user--------------------------");
System.out.println(user);
}
}
topic模式
配置队列,交换机,互相绑定
package com.example.rabbitmq1.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* topic模式相比direct模式就是灵活一点(在同一个队列中也是1个消息实体只能被消费1次)
* 一个交换机可以绑定到多个队列上,也可以使用匹配模式来匹配符合规则的队列
* 匹配模式:*代表一个元素,#代表0个或多个元素
* 队列命名原则:aaa.bbbb.ccc 以.隔开
*/
@Configuration
public class TopicConfig {
public static final String queue_topic_a = "queue.topicA"; //不支持queue.topic.a 三级模式(这个坑,我刚踩完)
public static final String queue_topic_b = "queue.topicB";
public static final String exchange_topic = "exchange_topic";
@Bean(name = queue_topic_a)
public Queue queue1(){
return new Queue(queue_topic_a);
}
@Bean(name = queue_topic_b)
public Queue queue2(){
return new Queue(queue_topic_b);
}
@Bean(name = exchange_topic)
public TopicExchange exchange(){
return new TopicExchange(exchange_topic);
}
/**
* 把队列绑定到交换机上面
* with方法中的参数是路由key
* 在使用的时候会根据交换和路由队列名来推送消息,
* 如果队列名符合路由key的规则就会把消息推送到绑定的队列中去
* @param queue
* @param topicExchange
* @return
*/
@Bean
public Binding bindingExchangeA(@Qualifier(queue_topic_a) Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with(queue_topic_a);
}
@Bean
public Binding bindingExchangeB(@Qualifier(queue_topic_b) Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("queue.*");
}
}
生产者:
package com.example.rabbitmq1.producer;
import com.example.rabbitmq1.config.TopicConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TopicProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMsg(String text){
amqpTemplate.convertAndSend(TopicConfig.exchange_topic, TopicConfig.queue_topic_b, text);
}
}
消费者:
package com.example.rabbitmq1.consumer;
import com.example.rabbitmq1.config.TopicConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicConsumer {
@RabbitListener(queues = TopicConfig.queue_topic_a)
public void receiveA(String info){
System.out.println("-----------------------队列A:" + info);
}
@RabbitListener(queues = TopicConfig.queue_topic_b)
public void receiveB(String info){
System.out.println("-----------------------队列B:" + info);
}
}
fanout模式就是广播模式,和topic类似,把不同点说说
@Bean
public FanoutExchange exchange() {
// 是fanout的交换机
return new FanoutExchange(exchange_fanout);
}
@Bean
public Binding bindingFanoutA(@Qualifier(queue_fanout_a) Queue queue, FanoutExchange fanoutExchange){
// 没有路由key,(后面没有with)
return BindingBuilder.bind(queue).to(fanoutExchange);
}
// 生产者,注意队列名为null
public void sendMsg(String info){
// amqpTemplate.convertAndSend(FanoutConfig.exchange_fanout, info); 此用法无效
amqpTemplate.convertAndSend(FanoutConfig.exchange_fanout, null, info);
}
应答机制(properties记得开启配置):
package com.example.rabbitmq1.consumer;
import com.example.rabbitmq1.config.FanoutConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class FanoutConsumer {
@RabbitListener(queues = FanoutConfig.queue_fanout_a)
public void receive1(String info, Channel channel, Message message){
try {
// 应答这条消息,如果没有应答,下次连接消息依然会收到
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("----------------------------队列A: "+info);
}catch (Exception e){
e.printStackTrace();
}
}
@RabbitListener(queues = FanoutConfig.queue_fanout_b)
public void receive2(String info, Channel channel, Message message){
try {
// 销毁这条消息(在队列中销毁)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("----------------------------队列B: "+info);
}
}
持久化机制:默认是持久化
看源码的构造方法:队列和交换机
队列:
public class Queue extends AbstractDeclarable {
public static final String X_QUEUE_MASTER_LOCATOR = "x-queue-master-locator";
private final String name;
private final boolean durable;
private final boolean exclusive;
private final boolean autoDelete;
private final Map<String, Object> arguments;
private volatile String actualName;
public Queue(String name) {
this(name, true, false, false); //看第2个参数默认true
}
public Queue(String name, boolean durable) {
this(name, durable, false, false, (Map)null);
}
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
this(name, durable, exclusive, autoDelete, (Map)null);
}
}
交换机:
public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
private final String name;
private final boolean durable;
private final boolean autoDelete;
private final Map<String, Object> arguments;
private volatile boolean delayed;
private boolean internal;
public AbstractExchange(String name) {
this(name, true, false); //看第2个参数默认为true
}
public AbstractExchange(String name, boolean durable, boolean autoDelete) {
this(name, durable, autoDelete, (Map)null);
}
}
网友评论