美文网首页PHP开发PHP经验分享
【RabbitMq】快速入门之work queue模式、fano

【RabbitMq】快速入门之work queue模式、fano

作者: Bryanz | 来源:发表于2022-02-24 08:18 被阅读0次

消息队列(MQ),很多场景都有它的身影,MQ的主要功能包括应用解耦流量削峰异步处理。本文主要讲解RabbitMq的原理及应用实例,将参考官网文档重点介绍RabbitMq基本概念work queue模式fanout模式direct模式topic模式RPC实现publisher confirms机制,从而达到快速入门的目的。

0.RabbitMq基本概念

  • vhost,虚拟主机,提供了完全隔离独立的环境,包括exchange、queue等,可通过插件web管理后台或者rabbitmqctl命令设置user的vhost权限。
  • connection,要使用rabbitmq必然要与服务器建立连接了,AMQP协议是基于TCP连接的应用层协议
  • channel,信道用于复用connection,减少TCP连接带来的资源开销,当访问量大的时候则需要开辟多个connection,并分摊到chennel。
  • routing_key,路由键在pub/sub模式下作为exchange匹配binding到queue的条件;在work queue模式下,可视为队列名称发送消息。
  • exchange,交换机在信道内,负责接受并转发消息。根据交换机的类型,有不同的匹配方式。
  • binding_key,绑定值可视为exchange与queue之间的映射关系值,绑定值与queue之间的关系是n:n,当一个queue对应exchange的多个binding_key时,exchange只会发送一次到该queue。
  • queue,消息队列
  • message,消息是要传递及处理的数据,通过RabbitMq指定的类来构造,可配置消息的参数属性,如correlation_id(请求标识),delivery_mode(投递模式)等。
  • producer/publisher,消息的生产者/发布者,携带routing_key和msg。
  • consumer/subscriber,消息的消费者/订阅者,按照不同的模式处理队列中的消息。

1.work queues模式

1.work queues
常规的消息队列模式,不涉及交换机exchange和队列绑定queue_binding,执行过程:生产者发送消息至队列,消费者从队列中取数据消费。

producer代码示例(PHP)

//1.建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
//2.信道
$channel = $connection->channel();
//3.信道中声明队列
$queue_name='task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
$message = "Hello Task";
//4.生成amqp消息
$msg = new AMQPMessage($message, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);//投递模式设置为消息持久化
//5.发布消息
$channel->basic_publish($msg, '', $queue_name);
echo "publisher  Sent '{$message}!'\n";
$channel->close();
$connection->close();

consumer代码示例

$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
$queue_name='task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
    echo "consumer received : " . $msg->body . PHP_EOL;
    sleep(1);
    echo "Done" . PHP_EOL;
    //确认消息
    $msg->ack();
};
//公平调度, 设置预加载个数
$channel->basic_qos(null, 1, null);
//持续监听,回调处理消息
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
while ($channel->is_open()) {
    $channel->wait();
}

下面介绍publish/subscribe模式,并引入exchangequeue_binding。该模式根据exchange的不同类型有不同的转发规则,exchange的类型主要有fanout、direct、topic

2.fanout模式

2.fanout

该模式引入exchange、queue_binding,但不涉及routing_key和binding_key,因为publisher把消息投递给exchange后,所有绑定在该交换机上的队列都能接收到消息。

publisher代码

...
//通用连接部分参考上面,后面代码同理,只展示核心变更部分;完整代码可看官网
//该模式不用声明队列,只需声明exchange
$channel->exchange_declare('fanout_logs', 'fanout', false, true, false);//1.fanout交换机
..
//消息投递到交换机
$channel->basic_publish($msg, 'fanout_logs');//2.fanout模式

subscriber代码

...
$channel->exchange_declare('fanout_logs', 'fanout', false, true, false);//1.声明交换机
...
$channel->queue_bind($queue_name, 'fanout_logs');//2.队列绑定交换机
...

比起work queue,该模式更灵活,利用exchange可将消息转发到多个queue中。

3.direct模式

3.direct

如果在pub/sub模式下,只想将交换机的消息转发给指定的队列,fanout模式显然无法满足。此时可以利用direct模式,该模式将exchange和queue通过binding_key绑定在一起;exchange在接收publisher消息时依据routing_key和binding_key是否完全匹配,决定是否转发到对应queue。

publisher代码

$channel->exchange_declare('direct_logs', 'direct', false, true, false);//1.direct交换机
$routing_key = 'black';
$channel->basic_publish($msg, 'direct_logs', $routing_key);//2.发布消息至交换机,携带routing_key

subscriber代码

...
$channel->exchange_declare('direct_logs', 'direct', false, true, false);
$bindingKey = 'black';
$channel->queue_bind($queue_name, 'direct_logs', $bindingKey);//队列绑定交换机,声明binding_key
...

4.topic模式

4.topic

topic模式在direct模式基础上升级,routing_key和binding_key非完全匹配,支持更灵活的匹配规则;routing_key/binding_key可以通过word1.word2.wordn方式进行灵活扩展。【符号*代表1个word,符号#可代表0或n个words】

publisher代码

$channel->exchange_declare('topic_logs', 'topic', false, false, false); //3.1.topics路由
$routing_key = 'black.tall.big';
$channel->basic_publish($msg, 'topic_logs', $routing_key);//2.发布消息至交换机,携带routing_key

subscriber代码

$channel->exchange_declare('topic_logs', 'topic', false, true, false);//topic模式
$bindingKey = '#';//相当于全部消息都能接收
$channel->queue_bind($queue_name, 'topic_logs', $bindingKey);//队列绑定交换机,声明binding_key

bindingKey的举🌰
成功:black.#,自动匹配2个words、'black.tall.*'匹配1个word,占位匹配时必须要有点号.
失败:black.short.*
失败-错误使用符号:black#

5.RPC模式

5.RPC

RPC, 全称remote procedure call即远程程序调用,比起常规的远程调用,基于RabbitMq的RPC优点有:1.异步调用2.方便扩展提升服务端性能(开启多个server)

5.1.实现原理

  • 服务端和客户端,通过两个队列进行通信,RPC队列rpc_queue和回调队列reply_to_queue
  • 客户端携带请求标识correlation_idreply_to_queue回调队列信息,发送请求至rpc_queue,服务端监听rpc_queue,消费消息并发送消息至指定回调队列reply_to_queue。
  • 客户端监听回调队列reply_to_queue并通过correlation_id获取请求处理结果

下面以计算斐波那契数为作为RPC示例。

client端代码

class FibonacciRpcClient
{
    private $connection;
    private $channel;
    private $callback_queue;
    private $response;
    private $corr_id;

    //构造函数,监听回调队列,处理
    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'root',
            'root'
        );
        $this->channel = $this->connection->channel();
        //1.生成回调队列
        $this->callback_queue = 'reply_to';
        $this->channel->queue_declare($this->callback_queue, false, true, false, false);

        //2.1.轮训消费
        $this->channel->basic_consume(
            $this->callback_queue,
            '',
            false,
            true,
            false,
            false,
            array(
                $this,
                'onResponse'
            )
        );


    }

    //2.1.2监听队列的回调函数
    public function onResponse($rep)
    {
        if ($rep->get('correlation_id') == $this->corr_id) {
            $this->response = $rep->body;
        }
    }

    //远程调用,发送消息至rpc队列
    public function call($n)
    {
        $this->response = null;
        $this->corr_id = uniqid();//3.生成请求的唯一标识

        //4.1.创建消息,携带请求标识、回调队列名称
        $msg = new AMQPMessage(
            (string)$n,
            array(
                'correlation_id' => $this->corr_id,
                'reply_to'       => $this->callback_queue
            )
        );
        //4.2.发送消息至rpc队列,等待服务端消费
        $this->channel->basic_publish($msg, '', 'rpc_queue');
        //5.循环判断结果
        while (!$this->response) {
            $this->channel->wait();
        }
        return intval($this->response);
    }
}

$fibonacci_rpc = new FibonacciRpcClient();//构造函数,监听回调队列reply_to
$response = $fibonacci_rpc->call(35);//发送消息至prc队列,并循环判断回调队列的处理结果。
echo ' [.] Got ', $response, "\n";//回调队列的处理结果

server端代码

$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
//声明队列
$channel->queue_declare('rpc_queue', false, false, false, false);
function fib($n)
{
    if ($n == 0) {
        return 0;
    }
    if ($n == 1) {
        return 1;
    }
    return fib($n-1) + fib($n-2);
}

echo " [x] Awaiting RPC requests\n";
$callback = function ($req) {
    //1.1监听rpc队列,处理client发送的消息
    $n = intval($req->body);
    echo ' [.] fib(', $n, ")\n";

    //1.2.返回处理结果,并携带请求标识
    $msg = new AMQPMessage(
        (string) fib($n),
        array('correlation_id' => $req->get('correlation_id'))
    );
    //2.发送消息至同一信道的 回调队列, 由client监听消费。
    $req->delivery_info['channel']->basic_publish(
        $msg,
        '',
        $req->get('reply_to')
    );
    //3.消息接受确认
    $req->ack();
};

//设置预加载数量,服务端worker公平调度
$channel->basic_qos(null, 1, null);
//轮训消费,监听rpc队列
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

调用结果

client:
 [.] Got 9227465
server:
 [x] Awaiting RPC requests
 [.] fib(35)

6.publisher confirms模式
publisher confirms是RabbitMq实现可靠传输的扩展,用来判断publisher是否成功把消息发送到RabbitMq的broker。RabbitMq实现可靠传输的方式有两种:事务(不推荐)、publisher confirms,这两种方式互斥。publisher confirms的实现方式又可分为:同步异步

  • 6.1. 同步实现
    该模式是基于信道的,所以只要增加两个步骤即可:
    6.1.1. 信道声明为confirm模式
    6.1.2. 声明同步等待的超时时间
    代码如下:
...
$channel->confirm_select();//1.声明信道为confirm模式
...
try {
    $channel->wait_for_pending_acks($timeOut);//2.同步等待timeOut时间
}catch (Exception $exception){
    echo "exception:" . $exception->getMessage() . PHP_EOL;
}

..

  • 6.2. 异步实现
    异步实现通过注册回调的两个方法set_ack_handler和set_nack_handler。
    代码如下
$channel->confirm_select();//1.声明信道为confirm模式

//2.消息被ack后的回调
$channel->set_ack_handler(function (AMQPMessage $msg) {
    echo "ack msg" . PHP_EOL;
    file_put_contents('./ackfile.txt',json_encode($msg),FILE_APPEND);
});

//3.消息被nack'ed后的回调
$channel->set_nack_handler(function (AMQPMessage $msg) {
    echo "nack msg" . PHP_EOL;
    file_put_contents('./nackfile.txt',json_encode($msg),FILE_APPEND);
});

$channel->wait_for_pending_acks();

以上只是RabbitMq各种模式的基本使用,其他很多特性(持久化、网络分区、集群等)并未涉及,若要使用更多的特性请查阅官网文档,然后手动跑一下代码才能理解得更好。希望本文能帮助大家对RabbitMq的使用有个大致了解。

相关文章

网友评论

    本文标题:【RabbitMq】快速入门之work queue模式、fano

    本文链接:https://www.haomeiwen.com/subject/toyglrtx.html