RabbitMQ 于 2007 年发布,是使用 Erlang 编程语言编写的,最早是为电信行业系统之间的可靠通信设计的,也是少数几个支持 AMQP 协议的消息队列之一。
RabbitMQ是轻量级、迅捷,Messageing that just works ,开箱即用的消息队列。也就是说,RabbitMQ是一个相当轻量级的消息队列,非常容易部署和使用。
RabbitMQ支持灵活的路由配置,和其他消息队列不同的是,他在生产者(Producer)和队列(Queue)之间增加了一个Exchange模块,可以理解为交换机。
优点:
- 使用 Erlang 编程语言编写的,性能好,时效性高达到微秒级是目前所有消息队列中最低延迟。kafka和阿里的RocketMQ也只是毫秒级。
- 社区活跃。
缺点:
- 国内对Erlang 语言的研究较少,对RabbitMQ的维护和扩展主要依赖社区。
- 吞吐量相对较低,属于万级低于kafka和阿里的RocketMQ的十万级。
- 不支持集群动态扩容,扩展和二次开发难
生产者:
生产者顾名思义就是生产消息的一方即负责发送消息。RabbitMQ的生产者并不是直接将消息发送到队列,而是发送到Exchange(交换机),再由Exchange发送到队列,由上图可以看出生产者可以发送消息到多个Exchange,再由Exchange分发到绑定的队列,这里有点负载均衡味道。
代码演示配置队列:
@Bean
public Queue queue(){
return new Queue("队列的名字", true);
}
// topic模式 交换机Exchange
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("交换机名字");
}
@Bean
public Queue topicQueue(){
return new Queue("队列的名字", true);
}
// Binding将Exchange与Queue关联起来,
// 这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
@Bean
public Binding topicBinding(){
return BindingBuilder.bind(topicQueue())
.to(topicExchange()).with("关联的key");
}
生产者代码演示:
public void sendTopic(Object msg){
logger.info("topic 发送的消息:" + msg);
amqpTemplate.convertAndSend("交换机名称", "路由key", msg);
}
public void sendFanout(Object msg){
logger.info("fanout 发送的消息:" + msg);
amqpTemplate.convertAndSend("交换机名称", "", msg); // 广播不用绑定routingKey
}
Exchange有多种模式:headers、topic 、fanout、direct。其中常用的是topic和fanout模式。
topic模式(topic exchange)和订阅模式(fanout exchange)的区别在于 topic会使用到router-key,支持模糊匹配。
消费者:
消费者即读取队列中的消息的一方。监听某个队列,当队列存在消息时进行读取并从队列中取出消息。
代码演示消费者:
@RabbitListener(queues = "监听的队列名称")
public void receiverTopic(String msg){
// 将获取的消息用于发送邮件或者发送短信,此处省略
logger.info("Topic 接收到的消息:" + msg);
}
复杂业务的消费者:
/**
* 监听队列消息,当队列有消息时进行消费
*
* @Payload 注解将消息强制转换为Message对象
* @Headers 注解获取消息头信息
*
* @param msg
* @param channel
* @param headers
*/
@RabbitListener(queues = "队列名称")
public void receiver(@Payload Message msg, Channel channel,
@Headers Map<String, Object> headers){
@Payload 注解将消息强制转换为对应的对象
选择消息队列的基本标准:
- 消息的可靠传递:确保不丢消息;
- cluster:支持集群,确保不会因为某个节点宕机导致服务不可用,当然也不能丢消息;
- 性能:具备足够好的性能,能满足绝大多数场景的性能要求,如时效性、低延迟。
RabbitMQ消息提供confirm机制确保消息可靠。配置confirm机制
application.yml
spring:
rabbitmq:
host: 47.52.72.145
username: tale
password: tale_mq
port: 5670
publisher-confirms: true # 支持发布确认
publisher-returns: true # 支持发布返回
添加配置实现消息发送失败回调及消息确认机制。
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
// 消息是否成功发送到Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息成功发送到Exchange");
} else {
log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
}
});
// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(true);
// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
});
return rabbitTemplate;
}
/**
* 序列化
* @return
*/
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
setConfirmCallback确认机制,ack是否收到消息。
当消息从Exchange路由到Queue发送失败时,触发setReturnCallback回调。
一般采用这种机制实现消息100%投递,异步的模式,不会阻塞,吞吐量会比较高。
理解confirm消息确认机制
消息的确认,指生产者收到投递消息后,如果Broker收到消息就会给我们的生产者一个应答,生产者接受应答来确认broker是否收到消息。
如何实现confirm确认消息。
在Channel上开启确认模式:channel.confirmSelect()
在channel上添加监听:addConfirmListener,监听成功和失败的结果,具体结果对消息进行重新发送或者记录日志。
return消息机制:
Return消息机制处理一些不可路由的消息,生产者通过指定一个Exchange和Routinkey,将消息送到某个队列,消费者监听队列进行消费处理。在某些情况下,如果我们在发送消息的时候当Exchange不存在或者指定的路由key路由找不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener。
消息队列作用:
- 解耦
- 异步
- 削峰
参考:该如何选择消息队列?
网友评论