- Round-robin dispatching(循环调度)
文档:By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin.
翻译:默认情况下,RabbitMQ 会将每条消息按顺序地发送至下一个消费者,平均下来每个消费者将会获得相同数量的消息,这种消息的分发方式叫做“循环”。
实测:
(1)启动一个生产者,往一个队列中发送4条消息,处理这4条消息的所需的时间分别是1s, 4s, 1s, 4s。
(2)启动一个消费者1,这个消费者1会立马读取并处理上述4条消息。
(3)在第一个消费者处理上述4条消息时,启动消费者2,这时消费者2没有任何消息可以读取和处理,处于等待消息的状态。
(4)等待消费者1处理完上述4条消息,此时消费者1和消费者2都进入等待消息的状态。
(5)像步骤(1)一样,继续往相同的队列中放入相同的4条消息。
(6)此时体现“循环调度”的时候到了:消费者1被分配到第一条消息,消费者2被分配到第二条消息,消费者1被分到第三条消息,消费者2被分到第四条消息。
(7)消费者都进入消息处理状态。当消费者1处理完分配给它的两条消息(共需要2s完成)时,消费者2连第一条都还未处理完成。
结论:
a:默认情况下,RabbitMQ执行循环调度策略,只关心消息的数量,不关心消息需要的处理时间(从RabbitMQ充当的角色来看,它主要负责接收生产者的消息并将其放入队列;将消息从队列中取出并调度给消费者,不关心处理消息所需的时间(事实上也根本不知道自己将需能不能处理完该消息、需要多久才能处理完)。处理消息是消费者的任务,完成消息的处理,更是取决于每个消费者的具体环境和实现)。
b:RabbitMQ不断地将队列中的消息分配给现有的消费者,直到队列中的消息被调度完时进入等待状态,所以常见的一种现象是:现有的消费者可能已经囤积了大量的消息,并不断地处理这些消息;但新加入的消费者却无事可做,因为当新的消费者加入时,队列中的消息已经全部被分配到先前的消费者。
(You might have noticed that the dispatching still doesn't work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn't know anything about that and will still dispatch messages evenly.)
- Message acknowledgment(消息确认)
文档:Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.
翻译:完成一项任务可能需要一些时间。 您可能想知道,如果其中一个消费者开始一项漫长的任务并仅部分完成就死掉,会发生什么情况。 使用我们当前的代码(指相关官方教程中的代码),RabbitMQ一旦向消费者传递了一条消息,便立即将其标记为删除。 在这种情况下,如果您杀死消费者,我们将丢失正在处理的消息。 我们还将丢失所有发送给该特定消费者但尚未处理的消息。
实测:
(1)向一个队列中发送两条需要5s才能处理完成的消息。
(2)检查控制台中未读消息,确认是2条。
(3)启动一个消费者,读取完所有2条消息。当处理第一条消息时,强制关闭该消费者。
(4)控制台中的未读消息为0,上述消费者的正在处理的消息和已读取待处理的消息已经丢失。
static public function receiving()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'mqadmin', 'mqadmin939878');
$channel = $connection->channel();
$channel->queue_declare('channel0', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo "I am handling a message.\n";
$seconds = $msg->body;
sleep($seconds);
echo "I toke $seconds seconds to finish the job.\n";
// 如果basic_consume开启了确认,则必须发送确认,否则RabbitMQ不会释放队列中的消息
// 如果basic_consume没有开启确认,则不能发送确认,否则会抛出一个 channel-level protocol exception
$msg->ack();
};
//第四个参数为是否不需要消费者确认:true/false
$channel->basic_consume('channel0', '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
}
(5)如果启动了一个消费者,该消费者在basic_consume设置参数中设置需要确认,但是在其回调中却忘记确认,导致RabbitMQ中相关的Unacknowledged消息的条数只增不减,此时可以关闭该消费者,重新启动一个需要确认并在回调函数中发送确认的消费者,新的消费者会正确地处理并发送确认。前提是必须关闭错误的消费者,否则RabbitMQ不会将消息从从Unacknowledged状态标记为Ready状态,即可以调度的状态,因为RabbitMQ会认为原先的消费者还在处理着该消息,只不过处理该消息需要的时间很久而已。
结论:
a:通过文档的描述和实测观察发现:如果一个消费者开启了消息确认,当它结束工作时,所有调度给该消费者的未确认消息将会被RabbitMQ从Unacknowledged状态标记为Ready状态,即可以调度的状态。新的消费者可以重新处理之前被其它消费者读取过的待处理的消息和处理了但没有发送确认的消息。
- Message durability(消息持久性)
文档:When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.
翻译:RabbitMQ退出或崩溃时,除非你设置它,否则它将丢失队列和消息。 确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久。
操作:
(1)持久化队列(队列持久化后,如果想要删除掉它可以在官方的管理插件中删除)
将生产者和消费者queue_declare函数的第三个参数都设置为true。
$channel->queue_declare('task_queue', false, true, false, false);
(2)持久化消息
在new AMQPMessage中,设置构造函数参数delivery_mode = 2(消费者不需要构造消息,在生产者中设置即可)
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
备注:将消息标记为持久性并不能完全保证不会丢失消息。如果需要更强的保证,请使用publisher confirms.
- Fair dispatch(公平调度)
文档:RabbitMQ just dispatches a message when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.
翻译:当消息进入队列时,RabbitMQ只是调度消息。 它不会查看消费者的未确认消息数。 它只是盲目地将每第n条消息发送给第n个使用者。
操作:在消费者中设置每个消费者任意时刻只能容纳1条消息,但也引发另一个问题:由于消费者只能容纳一个消息,所以堆积在队列中的消息数量到底队列容纳消息,可以考虑增加更多的消费者。
static public function receiving()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'mqadmin', 'mqadmin939878');
$channel = $connection->channel();
$channel->queue_declare('channel0', false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
$seconds = $msg->body;
echo "I am handling a message. It probably take me $seconds seconds to finish.\n";
sleep($seconds);
echo "I toke $seconds seconds to finish the job.\n";
// 如果basic_consume开启了确认,则必须发送确认,否则RabbitMQ不会释放队列中的消息
// 如果basic_consume没有开启确认,则不能发送确认,否则会抛出一个 channel-level protocol exception
$msg->ack();
};
//This tells RabbitMQ not to give more than one message to a worker at a time
$channel->basic_qos(null, 1, null);
//第四个参数为是否不需要消费者确认:
// true不需要确认
// false需要确认
$channel->basic_consume('channel0', '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
}
实测:
(1)当在消费者中设置了channel->basic_qos(null, 1, null);不起作用)
结论:
a:文档的描述和实测的结果透露出一个信息:RabbitMQ会记录每个消费者容纳的消息数量,并记录其中的消息的确认状态,根据这些信息决定是否将消息从队列中调度到该消费者。
b:RabbitMQ根据每个消费者容纳的消息数量和其中的消息的确认状态来确定要不要将消息放入该消费者。所以必须开启消息确认才能让$channel->basic_qos(null, 1, null);生效。
The messages will be lost if no queue is bound to the exchange
如果没有队列绑定到交换机,那么发往该交换机的消息将会丢失!
The meaning of a binding key depends on the exchange type. The fanout exchanges simply ignored its value.
网友评论