1、RabbitMQ 消息中间件
大榜:小汪你怎么了,我看你脸色不太好呀。
小汪:最近团队正在使用消息队列实现业务模块之间的解耦,昨天熬了一晚上,也是云里雾里。哎,都快熬成熊猫眼了。
大榜:哦哦。消息队列的使用,可以对多个模块之间进行解耦,并实现异步通信,降低系统整体的响应时间。你们团队的想法挺好的,那采用的是哪种消息队列呢?
小汪:昨天上网搜索了半天,有ActiveMQ、RabbitMQ、RocketMQ、Kafka这四种,它们之间的区别,我当时还找了一张图,我发过来给你看看,帮我参谋下
大榜:上面这四种消息队列的对比,我之前也看过,你们公司100多号人,应该属于中小型公司,使用RabbitMQ作为消息队列应该是很好的选择,而且RabbitMQ社区很活跃,我上次使用RabbitMQ作为消息中间件,发生了消息丢失的现象,最后依靠RabbitMQ社区的力量解决的。
小汪:可以啊,榜哥,教教我。我现在脑子里面只知道有几个概念:生产者、消费者、消息模型啥的
2、RabbitMQ核心概念(类比实际生活场景)
大榜:嗯嗯,你说的很对,RabbitMQ的核心概念就是生产者、消费者、消息模型,而消息模型是由交换机、路由、队列组成。
小汪:你这么一说,我想起来了,昨晚的书里面介绍了交换机、路由的概念,但我还是觉得太抽象了,不太好理解。
大榜:确实不好理解,我刚开始学习时,也是不理解,后来我想到了一个实际生活中寄快递、取快递的场景,正好可以解释交换机、路由、队列。我们就举个栗子,假设生活在武汉的小芹,想寄一个爱心包裹,发送给远在深圳的你(小汪)。
小汪:小芹给我寄爱心包裹,说得我心里暖暖的。哎,但实际情况是我寄包裹给小芹,呜呜呜。
大榜:小汪,面包会有的,牛奶也会有的,只要我们向阳成长。扯远了,我们假设小芹寄爱心包裹给你(小汪)。具体流程应该如下:小芹将包裹给快递员,然后快递员通过某种运输方式将包裹从武汉运到深圳,小汪开开心心地从深圳快递点取走爱心包裹。
类比下,可以得到如下的关系:
- 小芹:生产者
- 爱心包裹:消息
- 快递员:交换机
- 某种运输方式:路由
- 武汉到深圳:队列名称
- 小汪:消费者
总的来说,小芹将包裹给快递员,然后快递员通过某种运输方式将包裹从武汉运到深圳,小汪开开心心地从深圳快递点取走爱心包裹。我们就可以看成:生产者产生消息,然后消息到达交换机,再通过路由,将消息发送至指定的队列中,最后消费者监听该队列中的消息,进行消费处理。
小汪:你这一说,我好像有点懂了。寄快递、取快递这种实际生活场景,可以看成如下的消息传输过程,生产者将消息交给交换机,然后交换机通过路由,将消息发送武汉到深圳的队列,消费者取出该条消息进行处理。RabbitMQ中生产者、消息、交换机、路由、队列、消费者之间的关系,可以画成下面这张图:
大榜:嗯嗯,你这理解力可以啊!
小汪:过奖过奖,这都是昨晚熬成熊猫眼的成果,而且刚刚和你讨论后,感觉进一步理解了RabbitMQ的概念。我现在懂了RabbitMQ的生产者、消息模型、消费者的大致原理,但团队要搭建一个RabbitMQ与SpringBoot整合的一个框架,昨晚搭建了半宿,也没干完,我这一时半天也搭不出来了,下午开周例会,等待被老大捶了,呜呜呜。
3、RabbitMQ发送和接收消息实战
大榜:RabbitMQ与SpringBoot整合,不算很难,因为SpringBoot是集成了Spring的,比Spring框架更灵活。
3.1、pom文件中引入RabbitMQ的起步依赖
我们只需要在pom文件中引入RabbitMQ的起步依赖
<!-- RabbitMQ的起步依赖,和spring-boot整合成一个Jar包了 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.5.6.RELEASE</version>
</dependency>
3.2、配置RabbitMQ的host、端口等信息
在application.properties配置文件中,增加host主机地址、port端口号、用户名、密码
# RabbitMQ配置
spring.rabbitmq.virtual-host=/
# 配置host、端口号、用户名、密码
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
接着,在配置文件中声明RabbitMQ的队列、交换机、路由的信息:
# 设置交换机、路由、队列,使用directExchange消息模型
# 自定义变量,表示本地开发环境
mq.env=local
# 设置direct消息模型中队列、交换机、路由
mq.basic.info.queue.name=${mq.env}.middleware.mq.basic.info.queue.demo1
mq.basic.info.exchange.name=${mq.env}.middleware.mq.basic.info.exchange.demo1
mq.basic.info.routing.key.name=${mq.env}.middleware.mq.basic.info.routing.key.demo1
小汪:榜哥,3.1中在pom文件引入RabbitMQ的依赖,3.2中在配置文件里面设置了RabbitMQ服务器的地址为localhost,用户名、密码都为guest。这个我懂,然后你设置了队列、交换机、路由的名称,再往下应该要把队列、交换机、路由注入到Spring容器中进行管理,是吧?
大榜:是的,思维转得很快嘛。RabbitsMQ在实际项目的应用过程中,如果配置和使用不当,则会出现各种令人头疼的问题,而且面试官经常考察的问题:如何防止消息丢失?如何保证消费不被重复消费?我当时就出现了消息丢失,熬出了好几个熊猫眼才解决消息丢失的问题。
小汪:榜哥,别卖关子了,你要是和盘托出,小弟请你喝冰可乐!
3.3、自定义注入Bean组件
大榜:我当时查阅和学习了RabbitMQ的相关教程和视频,为了保证消息的高可用和确认消费,RabbitMQ官方给我们提供了三条准则。也就是如果我们想要保证消息的高可用和确认消费,需要遵守这3条准则:生产者的发送确认机制;创建队列、交换机消息时设置持久化模式;消费者的确认消费ACK机制。
小汪:榜哥牛逼啊,堪称RabbitMQ实战经验的总结!
大榜:别给我带高帽子,我只是比你早学习RabbitMQ半年。上面3条准则对应的代码,我之前配置好了,可以直接拿来用。项目链接放在码云仓库: gitee.com/qinstudy/sp…
3.3.1、生产者的发送确认机制
RabbitMQ要求生产者在发送完消息之后进行”发送确认“,当生产者确认成功时即代表消息已经完成发送出去了!其中的代码如下:
/**
* 构建RabbitMQ发送消息的操作组件实例
* 生产者的发送确认机制
*/
@Bean(name = "rabbitMQTemplate")
public RabbitTemplate rabbitTemplate() {
// 生产者确认消息是否发送过去了
connectionFactory.setPublisherConfirms(true);
// 生产者发送消息后,返回反馈消息
connectionFactory.setPublisherReturns(true);
// 构建rabbitTemlate操作模板
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
// 生产者发送消息后,如果发送成功,则打印“消息发送成功”的日志信息
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
});
// 生产者发送消息后,若发送失败,则输出“消息发送失败”的日志信息
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}
3.3.2、创建队列、交换机、消息,设置持久化模式
第二条准则,就是如何保证RabbitMQ队列中的消息”不丢失“,RabbitMQ官方建议开发者在创建队列、交换机设置持久化参数为true,也就是durable参数的值为true。同时,官方强烈建议开发者在创建消息时设置消息的持久化模式为”持久化“,这样就可以保证RabbitMQ服务器崩溃并执行重启操作后,队列、交换机仍然存在,而且该消息不会丢失。
创建队列、交换机设置持久化参数为true,也就是durable参数的值为true,代码如下:
/**
* 创建direct消息模型:队列、交换机、路由
*/
// 1.1、创建队列
@Bean(name = "basicQueue")
public Queue basicQueue() {
return new Queue(env.getProperty("mq.basic.info.queue.name"), true);
}
// 1.2、创建交换机
@Bean
public DirectExchange basicExchange() {
return new DirectExchange(env.getProperty("mq.basic.info.exchange.name"), true, false);
}
// 1.3、创建绑定关系
@Bean
public Binding basicBinding() {
return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("mq.basic.info.routing.key.name"));
}
在创建消息时设置消息的持久化模式为”持久化“,代码如下:
@Component
@Slf4j
public class BasicPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
Environment env;
// 发送字符串类型的消息
public void sendMsg(String messageStr) {
if (!Strings.isNullOrEmpty(messageStr)) {
try {
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("mq.basic.info.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("mq.basic.info.routing.key.name"));
// 2创建队列、交换机、消息 设置持久化模式
// 设置消息的持久化模式
Message message = MessageBuilder.withBody(messageStr.getBytes("utf-8")).
setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
rabbitTemplate.convertAndSend(message);
log.info("基本消息模型-生产者-发送消息:{}", messageStr);
} catch (UnsupportedEncodingException e) {
log.error("基本消息模型-生产者-发送消息发生异常:{}", messageStr, e.fillInStackTrace());
}
}
}
}
3.3.3、消费者的确认消费ACK机制
消费者的确认消费机制有三种:None、Auto、Manual
None:不进行确认消息,也就是消费者发送任何反馈信息给MQ服务端;
Auto:消费者自动确认消费。消费者处理该消息后,需要发送一个自动的ack反馈信息给MQ服务端,之后该消息从MQ的队列中移除掉。其底层的实现逻辑是由RabbitMQ内置的相关组件实现自动发送确认反馈信息。
Manual:人为手动确认消费机制。消费者处理该消息后,需要手动地“以代码的形式”发送给一个ack反馈信息给MQ服务端。
RabbitmqConfig#listenerContainer(),代码如下:
/**
* 设置单个消费者
* 消费者的Ack确认机制为AUTO
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
// 设置消费者的个数
containerFactory.setConcurrentConsumers(1);
// 设置消费者的最大值
containerFactory.setMaxConcurrentConsumers(1);
// 设置消费者每次拉取的消息数量,即消费者一次拉取几条消息
containerFactory.setPrefetchCount(1);
// 设置确认消息模型为自动确认消费AUTO,目的是防止消息丢失和消息被重复消费
containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return containerFactory;
}
小汪:哦哦,我懂了。为了保证消息的高可用和确认消费,需要遵守这3条准则。3.3.1是生产者的发送确认机制;3.3.2中的代码是告诉我们创建队列、交换机消息时,要设置持久化模式;3.3.3是消费者的确认消费ACK机制。那RabbitMQ的发送、接收代码该如何实现呢?
大榜:上面我们已经配置了RabbitMQ的队列、交换机、路由,而且为了保证消息的高可用和确认消费,我们遵守3条准则:生产者的发送确认机制;创建队列、交换机消息时设置持久化模式;消费者的确认消费ACK机制。我们解决了大难点,发送、接收代码实现就简单了。
3.4、RabbitMQ发送、接收实战
3.4.1、定义生产者
@Component
@Slf4j
public class BasicPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
Environment env;
// 发送字符串类型的消息
public void sendMsg(String messageStr) {
if (!Strings.isNullOrEmpty(messageStr)) {
try {
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("mq.basic.info.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("mq.basic.info.routing.key.name"));
// 2创建队列、交换机、消息 设置持久化模式
// 设置消息的持久化模式
Message message = MessageBuilder.withBody(messageStr.getBytes("utf-8")).
setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
rabbitTemplate.convertAndSend(message);
log.info("基本消息模型-生产者-发送消息:{}", messageStr);
} catch (UnsupportedEncodingException e) {
log.error("基本消息模型-生产者-发送消息发生异常:{}", messageStr, e.fillInStackTrace());
}
}
}
}
3.4.2、创建消费者
@Component
@Slf4j
public class BasicConsumer {
/**
* 监听并消费队列中的消息
*/
@RabbitListener(queues = "${mq.basic.info.queue.name}", containerFactory = "singleListenerContainer")
public void consumerMsg(@Payload byte[] msg) {
try {
String messageStr = new String(msg, "utf-8");
log.info("基本消息模型-消费者-监听并消费到的消息:{}", messageStr);
} catch (UnsupportedEncodingException e) {
log.error("基本消息模型-消费者-发生异常:", e.fillInStackTrace());
}
}
}
3.4.3、编写单元测试,将一串字符串发送到队列中,消费者监听并处理
@Slf4j
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RabbitMQTest {
@Autowired
private BasicPublisher basicPublisher;
// 测试 基本消息模型,消息内容为 字符串
@Test
public void testBasicMessageModel() {
String msgStr = "~~~这是一串字符串消息~~~~";
basicPublisher.sendMsg(msgStr);
}
}
3.4.4、单元测试的结果分析
对编写的单元测试,结果如下图:
从图中可以看到,生产者发送了一条消息:"~~这是一串字符串消息~~",放到了消息队列中;消费者监听并处理了该条消息,然后消费者打印了该消息内容。
小汪:懂了,对于我这样的刚刚接触RabbitMQ的来说,都是干货啊。榜哥,我梳理下,看看理解的对不对。首先,我们从团队的业务需要实现解耦说起,引出了消息中间件;接着比较了4种消息队列的区别,根据公司的实际情况选择了RabbitMQ;然后将生产者、消息、交换机、路由、队列、消费者类比于寄快递、取快递这一实际场景(小芹给小汪的寄快递);最后,我们将RabbitMQ与流行的SpringBoot整合起来,遵守3条准则,保证RabbitMQ的高可用和确认消费,并实现了RabbitMQ的发送和接收消息实战。
大榜:是的,看来你已经入门了RabbitMQ,RabbitMQ发送和接收消息实战,该项目链接放在码云仓库,你为团队引入RabbitMQ时可以直接拿来用,链接地址: gitee.com/qinstudy/sp…
小汪:昨晚看书,RabbitMQ还有死信队列的概念,能解下惑吗?
大榜:到饭点了,要不咱们先吃个午饭,边吃边聊.....
作者:麻雀学习
链接:https://juejin.cn/post/6958421370623492132
来源:稀土掘金
网友评论