如何确保rabbitMQ消息的可靠性
要确保rabbitMQ消息的可靠性要做到三点:
- publisher confirms(发布方确认):
确保producer到broker节点消息的可靠性。如可能发生消息到了broker但是还没有投递到queue,broker突然宕机这种情况;
- message持久化:
将message存储到硬盘,如果是未持久化的消息会存在内存中,broker宕机后重启内存中的消息会丢失
- acknowledgement(consumer确认):
该机制能够保证,只有consumer成功消费了消息,才将其从queue中移除。
RabbitMQ的Publihser Confirms
rabbitMQ部署在分布式环境,为了确保消息的可靠性,在发布方和消费方都需要消息发送和处理的
确认信息。RabbitMQ的几种协议都有这种特性。这里我们基于AMQP 0-9-1 来讨论消息的确认机制。
消费者确认在AMQP 0-9-1中叫做 acknowlegements 机制;broker确认则叫做 publisher confirms。
上述两个功能基于相同的想法并受TCP的启发。它们对于从producer到RabbitMQ节点以及从RabbitMQ节点到consumer的可靠性传送都是至关重要的。
acknowledgement(consumer确认)
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
消费消息完成后
channel.basicAck(envelope.getDeliveryTag(), false);
acknowledgement要考虑到consumer预取消息数量的问题,即 Fair Dispatch
When will messages be confirmed?
什么时候消息会被确认?
对于不能路由到queue的消息,如果发布消息时mandatory设置为true,那么basic.Return
会在basic.ack之前发送给客户端。basic.nack也是类似情况。
对于可路由的消息,当消息被所有队列接受时发送basic.ack。对于路由到持久队列的持久消息,这
意味着消息持久化到磁盘,才会发送basic.ack。对于镜像队列,这意味着所有队列都接收了该消息,才会发送basic.ack
message 持久化
-
交换机要是durable的
-
queue也要是durable的
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
- 消息也要打上持久化的标记,具体的 MessageProperties 属性设置为PERSISTENT_TEXT_PLAIN
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
Fair dispatch(公平分发)
实际上是简单的负载均衡技术。现在我们假设有这样两个workers,分别记作1号、2号。
;奇数号的message处理起来耗费时间长,偶数号message处理起来更为轻量。那么其中有一个worker就 会一直处于繁忙状态,另一个worker则比较空闲。默认情况下broker并不清楚worker工作载荷状态,还会一种均匀的向1号和2号worker分发消息。
出现这种情况是因为,rabbitMQ只是将queue中的message分发出去,它并不能感知每个consumer有多少个message还没有ack。它只是平均的将n条message分给n个consumer。
通过设置consumer的预取数量可以解决该问题。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
这样只有consumer处理并且ack了前一个消息后,才能得到rabbitMQ分发的新的消息。
1的预取值是最保守的。 这将显着降低吞吐量,特别是在消费者连接延迟较高的环境中。 对于许多应用来说,更高的价值是合适和最佳的。
Delivery Identifiers: Delivery Tags
basic.qos
basic.qos方法可以限制 channel上未确认消息的数量。实际上以channel为单元进行限制并不是理想的范围。因为单个
channel上面可能会有多个consumer消费多个队列。这样channel和queue就要为每个发送的消息相互协调,以确保它们不会超出限制,这在单台机器上很慢,而在整个集群中使用时非常慢。此外,在
许多场景下,指定每个consumer的预取计数更简单一些。
Example - single consumer
Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue", false, consumer);
Example - multiple independent consumers
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
Example - multiple consumers sharing the limit
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
消费顺序性
rabbitMQ如何保证consumer消费消息的顺序性?既前一条message处理完成后,
才能对后一条数据进行消费处理。
exclusive
rabbitMQ利用心跳检测tcp连接
默认的心跳间隔 60s
Enabling Heartbeats with Java Client
ConnectionFactory cf = new ConnectionFactory();
// set the heartbeat timeout to 60 seconds
cf.setRequestedHeartbeat(60);
RabbitMQ 3.6.x之后,默认心跳间隔是60s,客户端只有设置小于60s的心跳间隔才会生效。
心跳间隔设置的太低,会导致误报(有可能网络延迟严重)。5~20s对大多数环境是最佳的。
confirm 代码
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
System.err.println("ack "+l+ " "+ b);
}
@Override
public void handleNack(long l, boolean b) throws IOException {
System.err.println("nack "+l+ " "+ b);
}
});
使用spring
public void runWithSpring(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b) {
System.err.println("confirmedMessage");
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.err.println("Returned Message");
}
});
rabbitTemplate.convertAndSend("myqueue", "foriffo");
}
网友评论