美文网首页rocketmq我爱编程
如何确保rabbitMQ消息的可靠性

如何确保rabbitMQ消息的可靠性

作者: 横渡 | 来源:发表于2018-04-02 18:24 被阅读30次

如何确保rabbitMQ消息的可靠性

要确保rabbitMQ消息的可靠性要做到三点:

  1. publisher confirms(发布方确认):

确保producer到broker节点消息的可靠性。如可能发生消息到了broker但是还没有投递到queue,broker突然宕机这种情况;

  1. message持久化:

将message存储到硬盘,如果是未持久化的消息会存在内存中,broker宕机后重启内存中的消息会丢失

  1. acknowledgement(consumer确认):

该机制能够保证,只有consumer成功消费了消息,才将其从queue中移除。

RabbitMQ的Publihser Confirms

rabbitMQ部署在分布式环境,为了确保消息的可靠性,在发布方和消费方都需要消息发送和处理的

确认信息。RabbitMQ的几种协议都有这种特性。这里我们基于AMQP 0-9-1 来讨论消息的确认机制。

消费者确认在AMQP 0-9-1中叫做 acknowlegements 机制;broker确认则叫做 publisher confirms。

上述两个功能基于相同的想法并受TCP的启发。它们对于从producer到RabbitMQ节点以及从RabbitMQ节点到consumer的可靠性传送都是至关重要的。

acknowledgement(consumer确认)


boolean autoAck = false;

channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

消费消息完成后


channel.basicAck(envelope.getDeliveryTag(), false);

acknowledgement要考虑到consumer预取消息数量的问题,即 Fair Dispatch

When will messages be confirmed?

什么时候消息会被确认?

对于不能路由到queue的消息,如果发布消息时mandatory设置为true,那么basic.Return

会在basic.ack之前发送给客户端。basic.nack也是类似情况。

对于可路由的消息,当消息被所有队列接受时发送basic.ack。对于路由到持久队列的持久消息,这

意味着消息持久化到磁盘,才会发送basic.ack。对于镜像队列,这意味着所有队列都接收了该消息,才会发送basic.ack

message 持久化

  1. 交换机要是durable的

  2. queue也要是durable的


boolean durable = true;

channel.queueDeclare("task_queue", durable, false, false, null);

  1. 消息也要打上持久化的标记,具体的 MessageProperties 属性设置为PERSISTENT_TEXT_PLAIN

channel.basicPublish("", "task_queue",

            MessageProperties.PERSISTENT_TEXT_PLAIN,

            message.getBytes());

Fair dispatch(公平分发)

实际上是简单的负载均衡技术。现在我们假设有这样两个workers,分别记作1号、2号。

;奇数号的message处理起来耗费时间长,偶数号message处理起来更为轻量。那么其中有一个worker就 会一直处于繁忙状态,另一个worker则比较空闲。默认情况下broker并不清楚worker工作载荷状态,还会一种均匀的向1号和2号worker分发消息。

出现这种情况是因为,rabbitMQ只是将queue中的message分发出去,它并不能感知每个consumer有多少个message还没有ack。它只是平均的将n条message分给n个consumer。

通过设置consumer的预取数量可以解决该问题。


int prefetchCount = 1;

channel.basicQos(prefetchCount);

这样只有consumer处理并且ack了前一个消息后,才能得到rabbitMQ分发的新的消息。

1的预取值是最保守的。 这将显着降低吞吐量,特别是在消费者连接延迟较高的环境中。 对于许多应用来说,更高的价值是合适和最佳的。

Delivery Identifiers: Delivery Tags

basic.qos

basic.qos方法可以限制 channel上未确认消息的数量。实际上以channel为单元进行限制并不是理想的范围。因为单个

channel上面可能会有多个consumer消费多个队列。这样channel和queue就要为每个发送的消息相互协调,以确保它们不会超出限制,这在单台机器上很慢,而在整个集群中使用时非常慢。此外,在

许多场景下,指定每个consumer的预取计数更简单一些。

Example - single consumer


Channel channel = ...;

Consumer consumer = ...;

channel.basicQos(10); // Per consumer limit

channel.basicConsume("my-queue", false, consumer);

Example - multiple independent consumers


Channel channel = ...;

Consumer consumer1 = ...;

Consumer consumer2 = ...;

channel.basicQos(10); // Per consumer limit

channel.basicConsume("my-queue1", false, consumer1);

channel.basicConsume("my-queue2", false, consumer2);

Example - multiple consumers sharing the limit


Channel channel = ...;

Consumer consumer1 = ...;

Consumer consumer2 = ...;

channel.basicQos(10, false); // Per consumer limit

channel.basicQos(15, true);  // Per channel limit

channel.basicConsume("my-queue1", false, consumer1);

channel.basicConsume("my-queue2", false, consumer2);

消费顺序性

rabbitMQ如何保证consumer消费消息的顺序性?既前一条message处理完成后,

才能对后一条数据进行消费处理。

exclusive

rabbitMQ利用心跳检测tcp连接

默认的心跳间隔 60s

Enabling Heartbeats with Java Client


ConnectionFactory cf = new ConnectionFactory();

// set the heartbeat timeout to 60 seconds

cf.setRequestedHeartbeat(60);

RabbitMQ 3.6.x之后,默认心跳间隔是60s,客户端只有设置小于60s的心跳间隔才会生效。

心跳间隔设置的太低,会导致误报(有可能网络延迟严重)。5~20s对大多数环境是最佳的。

confirm 代码


channel.addConfirmListener(new ConfirmListener() {

                @Override

                public void handleAck(long l, boolean b) throws IOException {

                    System.err.println("ack "+l+ " "+ b);

                }

                @Override

                public void handleNack(long l, boolean b) throws IOException {

                    System.err.println("nack "+l+ " "+ b);

                }

            });

使用spring


public void runWithSpring(){



        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            @Override

            public void confirm(CorrelationData correlationData, boolean b) {

                System.err.println("confirmedMessage");

            }

        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

            @Override

            public void returnedMessage(Message message, int i, String s, String s1, String s2) {

                System.err.println("Returned Message");



            }

        });

        rabbitTemplate.convertAndSend("myqueue", "foriffo");

    }

相关文章

网友评论

    本文标题:如何确保rabbitMQ消息的可靠性

    本文链接:https://www.haomeiwen.com/subject/ysbxhftx.html