美文网首页程序员
RabbitMQ学习笔记

RabbitMQ学习笔记

作者: 创奇 | 来源:发表于2020-05-03 14:14 被阅读0次
RabbitMQ

RabbitMQ 于 2007 年发布,是使用 Erlang 编程语言编写的,最早是为电信行业系统之间的可靠通信设计的,也是少数几个支持 AMQP 协议的消息队列之一。
RabbitMQ是轻量级、迅捷,Messageing that just works ,开箱即用的消息队列。也就是说,RabbitMQ是一个相当轻量级的消息队列,非常容易部署和使用。
RabbitMQ支持灵活的路由配置,和其他消息队列不同的是,他在生产者(Producer)和队列(Queue)之间增加了一个Exchange模块,可以理解为交换机。

优点:

  1. 使用 Erlang 编程语言编写的,性能好,时效性高达到微秒级是目前所有消息队列中最低延迟。kafka和阿里的RocketMQ也只是毫秒级。
  2. 社区活跃。

缺点:

  1. 国内对Erlang 语言的研究较少,对RabbitMQ的维护和扩展主要依赖社区。
  2. 吞吐量相对较低,属于万级低于kafka和阿里的RocketMQ的十万级。
  3. 不支持集群动态扩容,扩展和二次开发难

生产者:

生产者顾名思义就是生产消息的一方即负责发送消息。RabbitMQ的生产者并不是直接将消息发送到队列,而是发送到Exchange(交换机),再由Exchange发送到队列,由上图可以看出生产者可以发送消息到多个Exchange,再由Exchange分发到绑定的队列,这里有点负载均衡味道。

代码演示配置队列:

@Bean
public Queue queue(){
    return new Queue("队列的名字", true); 
}

// topic模式 交换机Exchange
@Bean
public TopicExchange topicExchange(){
    return new TopicExchange("交换机名字");
}

@Bean
public Queue topicQueue(){
    return new Queue("队列的名字", true);
}

// Binding将Exchange与Queue关联起来,
// 这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
@Bean
public Binding topicBinding(){
    return BindingBuilder.bind(topicQueue())
    .to(topicExchange()).with("关联的key");
}

生产者代码演示:


 public void sendTopic(Object msg){
        logger.info("topic 发送的消息:" + msg);
        amqpTemplate.convertAndSend("交换机名称", "路由key", msg);
    }


    public void sendFanout(Object msg){
        logger.info("fanout 发送的消息:" + msg);
        amqpTemplate.convertAndSend("交换机名称",  "", msg); // 广播不用绑定routingKey
    }

Exchange有多种模式:headers、topic 、fanout、direct。其中常用的是topic和fanout模式。
topic模式(topic exchange)和订阅模式(fanout exchange)的区别在于 topic会使用到router-key,支持模糊匹配。

消费者:

消费者即读取队列中的消息的一方。监听某个队列,当队列存在消息时进行读取并从队列中取出消息。

代码演示消费者:

@RabbitListener(queues = "监听的队列名称")
 public void receiverTopic(String msg){

        // 将获取的消息用于发送邮件或者发送短信,此处省略
        logger.info("Topic 接收到的消息:" + msg);
  }

复杂业务的消费者:

/**
     * 监听队列消息,当队列有消息时进行消费
     *
     * @Payload 注解将消息强制转换为Message对象
     * @Headers 注解获取消息头信息
     *
     * @param msg
     * @param channel
     * @param headers
     */
    @RabbitListener(queues = "队列名称")
    public void receiver(@Payload Message msg, Channel channel,
                         @Headers Map<String, Object> headers){

@Payload 注解将消息强制转换为对应的对象

选择消息队列的基本标准:

  1. 消息的可靠传递:确保不丢消息;
  2. cluster:支持集群,确保不会因为某个节点宕机导致服务不可用,当然也不能丢消息;
  3. 性能:具备足够好的性能,能满足绝大多数场景的性能要求,如时效性、低延迟。

RabbitMQ消息提供confirm机制确保消息可靠。配置confirm机制

application.yml

spring:
   rabbitmq:
    host: 47.52.72.145
    username: tale
    password: tale_mq
    port: 5670
    publisher-confirms: true  #  支持发布确认
    publisher-returns: true    # 支持发布返回

添加配置实现消息发送失败回调及消息确认机制。

@Autowired
    private CachingConnectionFactory connectionFactory;


    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());

        // 消息是否成功发送到Exchange
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息成功发送到Exchange");
            } else {
                log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
            }
        });

        // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
        rabbitTemplate.setMandatory(true);
        // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
        });

        return rabbitTemplate;
    }

    /**
     * 序列化
     * @return
     */
    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

setConfirmCallback确认机制,ack是否收到消息。
当消息从Exchange路由到Queue发送失败时,触发setReturnCallback回调。

一般采用这种机制实现消息100%投递,异步的模式,不会阻塞,吞吐量会比较高。

理解confirm消息确认机制

 消息的确认,指生产者收到投递消息后,如果Broker收到消息就会给我们的生产者一个应答,生产者接受应答来确认broker是否收到消息。

如何实现confirm确认消息。

在Channel上开启确认模式:channel.confirmSelect()
在channel上添加监听:addConfirmListener,监听成功和失败的结果,具体结果对消息进行重新发送或者记录日志。

return消息机制:

 Return消息机制处理一些不可路由的消息,生产者通过指定一个Exchange和Routinkey,将消息送到某个队列,消费者监听队列进行消费处理。在某些情况下,如果我们在发送消息的时候当Exchange不存在或者指定的路由key路由找不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener。

消息队列作用:

  1. 解耦
  2. 异步
  3. 削峰

参考:该如何选择消息队列?

相关文章

  • RabbitMQ 学习笔记:安装

    RabbitMQ 学习笔记系列 上一系列:MySQL 学习实践 RabbitMQ 学习笔记:安装 RabbitMQ...

  • RabbitMQ 问答式总结

    RabbitMQ的学习笔记 关于RabbitMQ的几个角色如下: 关于名词的通俗解析: 首先我们肯定知道Rabbi...

  • 统一配置

    spring cloud学习笔记 上文 学习使用Docker下文 RabbitMQ的基本使用 由于 spring ...

  • RabbitMQ学习笔记

    RabbitMQ 简介 MQ 消息队列,上承生产者,下接消费者。从生产者侧获取消息,然后将消息转发给消费者。由此可...

  • RabbitMQ 学习笔记

    RabbitMQ 是一个支持多种消息传递协议的消息代理, 支持 AMQP(一个具有强大路由功能的开放式连接协议),...

  • rabbitmq学习笔记

    在Window7下安装rabbitmq 下载 otp_win64_17.3.exe 和 rabbitmq-serv...

  • Rabbitmq学习笔记

    什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...

  • RabbitMQ学习笔记

    本文作者:陈进坚个人博客:https://jian1098.github.io[https://jian1098....

  • RabbitMQ学习笔记

    安装 追加内容export PATH=$PATH:/data/install/erlang/bin

  • RabbitMQ学习笔记

    RabbitMQ简介 RabbitMQ是一个由erlang开发的AMQP(Advanved Mesaage Que...

网友评论

    本文标题:RabbitMQ学习笔记

    本文链接:https://www.haomeiwen.com/subject/clhdghtx.html