美文网首页
[PHP] Rabbit MQ 直连交换机

[PHP] Rabbit MQ 直连交换机

作者: 不留余白 | 来源:发表于2021-05-20 16:59 被阅读0次

直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:

  1. 直连交换机经常用来循环分发任务给多个工作者(workers)。当这样做的时候,我们需要明白一点,在AMQP 0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。
  2. 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
    当一个携带着路由键为R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列。
直连型交换机图例
  • 假设一个商城的场景,每支付一个订单,我们要通知业务A系统

A.send.php 消费者

<?php
//生产者-业务A
require_once __DIR__ . '/../../../../vendor/autoload.php';

$queue = 'biz_A_queue';
$exchange = 'pay_order_exchange';
//连接
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
//建立通道
$channel = $conn->channel();
//试探性的声明一个队列
$channel->queue_declare($queue, false, false, false, false);
//试探性的声明一个交换机
$channel->exchange_declare($exchange, 'direct', false, false, false);
//将队列与交换机绑定
$channel->queue_bind($queue, $exchange);
$msg = new \PhpAmqpLib\Message\AMQPMessage(json_encode([
    'id' => 1,
    'msg' => '卖出A产品',
    'time' => date('Ymd H:i:s')
],JSON_UNESCAPED_UNICODE));
$channel->basic_publish($msg, $exchange);
$channel->close();
$conn->close();

A.receive.1.php 消费者demo

//消费者-业务A
require_once __DIR__ . '/../../../../vendor/autoload.php';

$queue = 'biz_A_queue';
$exchange = 'pay_order_exchange';
//连接
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
//建立通道
$channel = $conn->channel();
//试探性的声明一个队列
$channel->queue_declare($queue, false, false, false, false);
$channel->basic_consume($queue,'',false,false,false,false,function ($msg){
    echo $msg->body.PHP_EOL;
});
while ($channel->is_consuming()){
    $channel->wait();
}
$channel->close();
$conn->close();

运行消费者文件

$ php ./A.receive.1.php
{"id":1,"msg":"卖出A产品","time":"20210520 14:52:00"}

再来看一下交换机循环分发任务给多个消费者的demo,也就是负载均衡
把 A.send.php的代码改成如下,让他发10条消息

<?php
//生产者-业务A
require_once __DIR__ . '/../../../../vendor/autoload.php';

$queue = 'biz_A_queue';
$exchange = 'pay_order_exchange';
//连接
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
//建立通道
$channel = $conn->channel();
//试探性的声明一个队列
$channel->queue_declare($queue, false, false, false, false);
//试探性的声明一个交换机
$channel->exchange_declare($exchange, 'direct', false, false, false);
//将队列与交换机绑定
$channel->queue_bind($queue, $exchange);
for ($i=1;$i<11;$i++){
    $msg=json_encode([
        'id' => $i,
        'msg' => '卖出A产品',
        'time' => date('Ymd H:i:s')
    ],JSON_UNESCAPED_UNICODE);
    echo 'send message:'.$msg.PHP_EOL;
    $msg = new \PhpAmqpLib\Message\AMQPMessage($msg);
    $channel->basic_publish($msg, $exchange);
}
$channel->close();
$conn->close();

再将消费者文件复制两份,现在有了A.receive.2.php、A.receive.3.php加上之前的A.receive.1.php 一共有3个消费者


执行生产者文件发送消息 3个消费者轮流接收到消息

这就是多个消费者的工作模式,接下来再看下用routing key路由的例子
如果A业务和B业务都有队列绑定了卖出C产品的路由键(sell_product_c),那么除了A队列本就可以收到这个消息之外,B队列也可以通过这个路由键收到消息。

A.rk.send.php

<?php
//生产者-业务A
require_once __DIR__ . '/../../../../vendor/autoload.php';

$queue = 'biz_A_queue';
$exchange = 'pay_order_exchange';
$routingKey = 'sell_product_c';

//连接
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
//建立通道
$channel = $conn->channel();
//试探性的声明一个队列
$channel->queue_declare($queue, false, false, false, false);
//试探性的声明一个交换机
$channel->exchange_declare($exchange, 'direct', false, false, false);
//将队列与交换机绑定
$channel->queue_bind($queue, $exchange, $routingKey);//交换机与routing key 的队列绑定
for ($i = 1; $i < 11; $i++) {
    $msg = json_encode([
        'id' => $i,
        'msg' => '卖出C产品',
        'time' => date('Ymd H:i:s')
    ], JSON_UNESCAPED_UNICODE);
    echo 'send message:' . $msg . PHP_EOL;
    $msg = new \PhpAmqpLib\Message\AMQPMessage($msg);
    $channel->basic_publish($msg, $exchange,$routingKey);//将消息发送到绑定routing key 的队列
}
$channel->close();
$conn->close();

B.rk.receive.1.php

<?php
//消费者-业务B
require_once __DIR__ . '/../../../../vendor/autoload.php';

$queue = 'biz_B_queue';//业务B的队列名称
$exchange = 'pay_order_exchange';
$routingKey = 'sell_product_c';

$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
$channel = $conn->channel();
$channel->queue_declare($queue, false, false, false, false);
$channel->queue_bind($queue, $exchange, $routingKey);//通过routing key绑定交换器和队列
$channel->basic_consume($queue, false, false, true, false, false, function ($msg) {
    echo '【队列 biz_B_queue】 ' . $msg->body . PHP_EOL;
});
while ($channel->is_consuming()) {
    $channel->wait();
}
$channel->close();
$conn->close();

运行B.rk.receive.1.phpA.receive.1.phpA.receive.2.php 三个消费者进程 和 A.rk.send.php

A.rk.send.php 消费者执行结果

可以看出 B.rk.receive.1完整收到了10个消息, A.receive.1 和A.receive.2 各自收到5个消息。也是上面说的将消息发送到绑定了routing key的队列上,除了A队列本就可以收到这个消息之外,B队列也可以通过这个路由键收到消息。

适用场景:有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。

相关文章

  • [PHP] Rabbit MQ 直连交换机

    直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列...

  • [PHP] Rabbit MQ 扇形交换机

    扇形交换机会把消息发送到交换机上的所有队列中。 生产者 消费者

  • spring 使用 rabbit mq

    rabbit mq 安装 brew 安装 rabbit mq后台rabbit mq后台 用户名:guest 密码...

  • Rabbit MQ & NodeJS & Protobuf

    Rabbit MQ & NodeJS & Protobuf 一、NodeJS Rabbit MQ 客户端封装 ...

  • 【轻知识】phper的rabbit mq 初看

    初看 Rabbit MQ vmware 虚拟机centos 7 环境搭建 erlang跟rabbit mq 我都是...

  • Rabbit MQ 与 NodeJS

    amqplib包提供了js访问Rabbit MQ 的接口 安装rabbit MQ 此时,通过 http://loc...

  • RabbitMQ学习1--安装

    首先Rabbit MQ 是建立在强大的Erlang OTP平台上,因此安装Rabbit MQ的前提是安装Erlan...

  • rabbit mq

    2019-10-14 zhanghang

  • Rabbit MQ

    为啥要用MQ 1. 消费方不需要实时等待依赖上一个任务的执行结果,只要生产者随时发送消息,消费者随时可接受消息调用...

  • Rabbit MQ

    简介 RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件) 高级消息队列...

网友评论

      本文标题:[PHP] Rabbit MQ 直连交换机

      本文链接:https://www.haomeiwen.com/subject/gpzvjltx.html