Rabbitmq

作者: 无聊之园 | 来源:发表于2019-05-27 10:42 被阅读0次

    一、rabbitmq简介

          基于amqp协议的,既然是协议,那么就定义了数据交换的格式,不是和jms一样是api规范,amqp是一个协议,所以可以跨平台,amqp有exchange和bind角色,这个跟jms不一样。jms有两种模式,点对点的queue和发布订阅的topic,amqp的模式主要在于exchange的类型,exchange有多种类型,主要使用的有,direct、topic、fanout exchange。 还有一个不同是,jms可以发送多种消息格式,amqp只能发送二进制。
          Rabbitmq是Erlang语言开发的,所以性能方面比较好,消息发布到消费响应时间比较低,但是因为是erlang语言,所以看不懂源码,更不能定制化。
          rabbitmq的管理界面功能比较丰富,可以看到很多状态

    rabbitmq的大概流程:生产者发送消息,消息不会直接到达queue,而是发送给exchange,生产者发送了消息内容之外,还指定了routingKey,然后exchange根据这个routingkey,找到binding到这个exchange的队列,然后发送给这个队列。

    这样的好处是:假设本来B服务监听A服务的消息,未来C服务也想监听A服务的消息,那么C服务可以重新创建一个queue,然后把这个queue绑定到A服务消息发送的那个exchange中,就能收到消息了。(这个跟activemq的topic机制没有什么不同)

    但是,引入了exchange的机制的另一个好处是,如果A服务发送给C服务的消息,C服务处理不过来,产生大量积压,则可以再开启消费者,从queue里消费消息,而这个机制,是activemq的topic实现不了的

    image.png

    二、exchage和queue的定义

          RabbitMQ不允许使用不同的参数重新定义一个队列,所以已经存在的队列,我们无法修改其属性, 也就是说,exchange和queue一旦创建好了,就不能修改了,除非删掉重来。
    比如,一定定义了一个非持久化的,要想改成持久化的,只能删掉重来。(可以通过管理界面删除)

    channel.exchangeDeclare("testExchange", "direct", false);
    

    autoDelete:true、false.true:当已经没有消费者时,服务器是否可以删除该Exchange。

       Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
                                           Map<String, Object> arguments) throws IOException;
    

    exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
    autodelete:当没有任何消费者使用时,自动删除该队列。

     Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments) throws IOException;
    

    三、消息确认、持久化、消息拒绝

    rabbitmq的消息确认、消息持久化的思想和activemq大体是一样的,只不过rabbitmq的持久化,有:
    exchange的持久化:不持久化,则重启服务,exchange就消失了。

    channel.exchangeDeclare("testExchange", "direct", true);
    

    队列持久化:

    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    

    消息的持久化:

    channel.basicPublish("testExchange", "routingkey1", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    

    消息确认和activemq一样:

    // 每次从rabbitmq拉几条消息,也可以设置其他更多,反正如果没有应答,消费者重启后,还会发送
    // 但是,如果一下拉取太多了,如果有多条消费者都从这个队列消费消息,平衡这个队列的压力的时候,某一个消费者拉去太多,会导致压力不均衡
    channel.basicQos(1);
    // 配置手动应答
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
    //应答需要指定Delivery Tag, 用来标识信道中投递的消息。RabbitMQ 推送消息
    // 给 Consumer 时,会附带一个 Delivery Tag,以便 Consumer 可以在消息确认
    // 时告诉 RabbitMQ 到底是哪条消息被确认了。
    //RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。
    //第二个参数:multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认;
    // 如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认,也
    // 就是批量确认之间所有的消息。
    //对同一消息的重复确认,或者对不存在的消息的确认,会产生 IO 异常,导致信道关闭。
    channel.basicAck(envelope.getDeliveryTag(), false);
    

    消息拒绝:

    // deliveryTag:该消息的index
    // requeue:被拒绝的是否重新入队列
    // channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
    

    四、消息发布:

    消息发布,为了保证消息发布到rabbitmq成功以否,rabbitmq提供了回调确认机制,保证数据不丢失,当然事务也可以做到,但是性能不好,不常用。

    
    // routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用
    // mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个
    // 符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:
    // 出现上述情形broker会直接将消息扔掉
    // immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue
    // 上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有
    // queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
    //BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 
    // 1:持久化 这里指的是消息的持久化,配合
    // exchange(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然 保留
    // 单来说:mandatory标志告诉服务器至少将该消息route到一个队列中,否则将
    // 消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消
    // 费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返
    // 还给生产者,不用将消息入队列等待消费者了。
    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
                throws IOException;
    

    四、exchange类型

    type:主要有direct、fanout、topic三种,还有一些不常用的。
    direct:直接找到routingkey对应的queue,生产者发送指定的routingkey要和queue绑定到exchange上的的routingkey完全相同,不能有通配符。
    topic:和direct的区别是,routingkey可以有通配符。'*'表示匹配一个单词, '#'则表示匹配没有或者多个单词
    fanout: 不需要routingkey,直接把消息发送到所有绑定到exchange的queue中。

    rabbitmq,queue绑定到bind之后,就不能改了。除非删掉,重新来过。

    channel.exchangeDeclare("testExchange", "direct", false);
    

    五、消费者线程
    这部分,很容易导致,消费者的队列堵塞,导致消息消费不到。

    可以指定消费者线程数。默认是单线程的。
    每个消费者线程,轮询查看是否有消息,默认每1秒轮询一次。有消息,则调用我们定义的消费者。

    关于消费者线程:
    假设,有两个消费者,然后消费者线程定义为5,则总共有10个消费者线程。

    服务一启动的时候,消费者的一个线程,就会一下拉取最大250个消息,所以一下把rabbitmq服务器的消息拉取完了。(其实不是拉取,是有一个线程往这个队列里循环放最大250个消息)
    所以,这个消费者线程堵塞了,其他消息者线程也拿不到消息,所以也就没有并发执行。
    而,服务启动后,生产者再发送一条消息,则会随机取一个线程执行,也就达到并发的效果。
    spring-boot-starter-amqp部分源码分析

    还有一种方式,手动把消息在消费端转到jdk的线程池。

    spring:
      rabbitmq:
        listener:
          simple:
            concurrency: 1        #最小消息监听线程数
            max-concurrency: 1    #最大消息监听线程数
            acknowledge-mode: manual  # 应答机制
            retry:
              ####开启消费者重试
              enabled: true
              ####最大重试次数(默认无数次)
              max-attempts: 5
    

    六、消费者重试
    默认,重试3次,间隔为1秒。
    这个错误,只有在,重试3次之后才会抛出来。
    如果自动应答,则抛出错误后消息就丢失了。
    如果有应答机制,则抛出错误后,消息就不再往这个消费者发了,除非另起一个一个消费者,后者消费者重启。

    @RabbitListener(queues = "hello.queue1", containerFactory = "customContainerFactory")
    public void processMessage1(String msg) {
            System.out.println("thread name:" + Thread.currentThread().getName());
    
            System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue1队列的消息:" + msg);
    
            throw new RuntimeException("错我");
    //        return msg.toUpperCase();
    //        return;
        }
    

    七、pull还是push模式
    rabbitmq一般我们使用的是推模式,只有调用basicGet才会使用拉模式。

    八、同步消息
    rabbitmq,还可以同步消息,得到消费者的响应
    rabbitTemplate.convertSendAndReceive

    九、rabbitmq的流控机制

    当发布者发布消息远大于消费者时,或者,rabbitmq本身的资源、性能处理不过来这些消息的时候,如果没有任何控制,则rabbitmq就会被压死。所以rabbitmq有流控机制。当达到流控阈值的时候,会堵塞发布者发布,直到消费者消费了一些消息,资源降下来了为止。

    先看三个参数:
    otal_memory_available_override_value:rabbitmq可以使用的内存值。默认,rabbitmq会自动推算出这个值,但是如果部署在docke上,则可能推算错误,推算到物理机的内存。
    vm_memory_high_watermark:触发流控的阈值比例。默认时0.4。为什么时0.4这么小,是因为erlang语言垃圾回收最坏会占用已用内存的2倍。
    vm_memory_high_watermark.relative = 0.6 相对
    vm_memory_high_watermark.absolute = 2GB 绝对

    当达到这个阈值之后,堵塞发布,等到,rabbitmq把内存中的数据,一部分持久化到硬盘,或者消费者消费了一部分,才放开堵塞。

    vm_memory_high_watermark_paging_ratio:触发内存持久化到磁盘的阈值。
    默认时0.5。比如,可用内存1g,内存阈值0.4,内存持久化磁盘0.5,则 1 * 0.4 * 0.5 = 0.2就会持久化磁盘。

    除了宏观上流控之外,各个进程还有流控控制。
    rabbitmq的各个进程之间还有credit flow流控的控制。消息会在各个进程之间流转,各个进程相互独立,如果某一个进程压力太大,将会导致整个rabbitmq压力大。所以进程会有credit flow控制,比如,进程来了一个消息,则credit - 1,如果credit为0了,则堵塞。

    参考:https://mathspanda.github.io/2018/06/24/rabbitmq-flow-control/

    防止消费者消息堵塞:
    消息堵在消费者堵塞队列中,得不到执行。
    解答办法:一定要basicAck或basicNack应答。basicQos不要设置太大,也就是堵塞队列,不要太长。

    代码
    https://gitee.com/wuliaozhiyuan/private.git

    相关文章

      网友评论

          本文标题:Rabbitmq

          本文链接:https://www.haomeiwen.com/subject/qjectctx.html