美文网首页
rabbitmq -- 消息的可靠性

rabbitmq -- 消息的可靠性

作者: 爱码士吴小佳 | 来源:发表于2020-11-28 00:31 被阅读0次

    rabbitmq作为我们系统之间沟通的桥梁,消息的可靠性就显得格外重要。
    假如rabbitmq crash掉之后重新启动,原本的交换机、队列、消息都会消失,如果我们队列中存在一些很重要的消息的时候,我们并不愿意这样的事情发生,这就需要借助rabbitmq的持久化机制。

    rabbitmq的持久化

    交换机、队列设置可以为持久化,消息的投递模式也可以选择持久化(三个持久化)。
    持久化的消息在进入持久化队列之后会写入到rabbitmq的持久性日志文件中,消息被消费掉后,会把持久性日志文件中该消息标记为等待垃圾收集
    在broker重启之后,持久化的交换机、队列会重新初始化,持久化的消息会投递到原先的队列上

    怎么做到消息持久化

    三个持久化:
    交换机持久化、队列持久化、消息持久化

    代码示例:

    <?php
    
    try{
        $config = [
            "host" => "127.0.0.1",
            "port" => "5672",
            "login" => "wuj13",
            "password" => "Aaa294515505",
            "vhost" => "test_vhost"
        ];
        $conn = new AMQPConnection($config);
        //连接rabbitmq
        if(!$conn->connect()){
           throw new Exception("连接失败");
        }
        //创建信道
       $channel = new AMQPChannel($conn);
        //创建交换机
        $exchange = new AMQPExchange($channel);
        $exchangeName = "exchange_one";
        //设置交换机名称
        $exchange->setName($exchangeName);
        //设置交换机类型
        $exchange->setType(AMQP_EX_TYPE_DIRECT);
        //设置为持久化交换机
        $exchange->setFlags(AMQP_DURABLE);
        //生成交换机
        $exchange->declareExchange();
    
    
        //创建队列
        $queueName ="queue_test";
        $queue = new AMQPQueue($channel);
        //设置队列名称
        $queue->setName($queueName);
        $queue->setFlags(AMQP_DURABLE);
        //生成队列
        $queue->declareQueue();
        //队列绑定交换机
        $queue->bind($exchangeName,$queueName);
    
    
        $message = "hello world";
        //向交换机发布消息
        $re = $exchange->publish($message,$queueName,AMQP_NOPARAM,["delivery_mode" => 2]);
        if($re){
            echo "成功发送消息".$message.PHP_EOL;
        }
        else{
            throw new Exception("发送消息失败");
        }
    
    }
    catch(Exception $exception){
        echo "操作异常 ".$exception->getMessage();
    }
    finally{
        $channel->close();
        $conn->disconnect();
    }
    
    
    
    

    其中
    $exchange->setFlags(AMQP_DURABLE)设置交换机为durable代表持久类型
    $queue->setFlags(AMQP_DURABLE)设置队列为持久类型
    $exchange->publish($message,$queueName,AMQP_NOPARAM,["delivery_mode" => 2])向交换机发送消息时设置消息的delivery_mode为2代表消息是持久类型的

    代码运行后,我们到管理界面去看看,发现有一条持久化的消息


    image.png

    我们把rabbitmq服务给重启一下,交换机、队列、消息,全部都还在,代表我们的消息持久化已经基本做好了,大家可以自己去试一试。
    但我们不妨想一想,如果我们的broker在把消息写入日志文件的时候crash掉了,那我们的消息不就丢了吗?对的,其实这是有很小的概率会出现这样的情况,这个时候可以借助mirror queue来解决这个问题,这个我们后面会专门讲这个东西,不然篇幅就太长了。

    那我们先认为很大程度上已经解决了rabbitmq这一端因为宕机而导致消息丢失的问题,但是我们还面临着两个问题:
    1 我们的程序怎么保证我们的消息能够成功到达队列
    2 怎么保证我们的消息消费成功的
    我们一个个来解决!

    我们先看一幅图


    image.png

    消息要到达队列中并处以稳定状态(已持久化),要经过三个流程
    1 消息到达交换机
    2 交换机把消息路由到队列中
    3 消息持久化写入日志
    第一个流程可能由于网络的波动、延迟等原因,消息没有准确达到交换机
    第二个流程可能routing key错误等原因,消息没有准确到达队列
    第三个流程可能在消息写入日志的过程中,broker crash掉了,重新后消息丢失。
    也就是说,在没有其他措施的情况下,客户端发送消息这个操作,消息如果想要在队列中处于稳定地状态,要经过三个独立的环节,并不是一个原子操作,三个环节中的任意一个环节出错,都有可能导致消息丢失,而我们的生产者对此一无所知,会影响我们业务的正常进行。

    看到原子化,我们首先想到用事务来解决这个问题,但是在rabiitmq中,事务会很大幅度地降低性能,我们来看一个对比数据:
    在同样开启持久化的情况下,没有开启事务,写入10万条数据仅需11秒

    image.png 微信截图_20201128222508.png

    而在同样的条件下,我们模拟10万次事务的独立开启提交,写入10万条数据需要208秒!

    image.png 微信截图_20201128222108.png

    这相差了接近20倍的性能,所以一般情况下我们不会选择事务来解决消息发送可能失败的问题,我们选择"发送者确认模式"来解决这个问题。

    发送者确认模式

    我们可以通过$channel->confirmSelect() 来开启信道的发送者确认模式,开启之后,信道会为每一条消息分配一个唯一的消息id,即delivery_tag,从0开始,依次是1,2,3这样,每个信道之间的delivery_tag是相互独立的。
    我们先来了解两个东西:ack 、nack
    ack :
    在发送者确认模式中,有两种情况服务端会给客户端发送ack确认信号
    1 消息到达交换机,交换机发现该消息可以路由到队列,便将消息路由到队列,并向客户端发送一个ack信号(注意:消息如果是持久化的,需要等持久化完成后,才会向客户端发送ack信号。如果是开启了镜像队列,需要所有镜像中的队列收到消息后,才会发送ack信号),即消息成功入列且稳定后会向客户端发送ack信号
    2 消息到达交换机,交换机发现根据消息的routing key,无法将该消息路由到任何队列,就会向客户端发送一个ack信号。如果发送消息时,我们指定了mandatory参数为true,指定该消息为强制到达,那么在向客户端发送ack信号之前,还有将该消息返回给客户端,顺序是 "消息" ->ack

    nack :
    消息到达交换机,交换机路由消息的过程中,由于系统内部错误而无法继续下去的时候,就会向客户端发送一个nack信号,告诉客户端,这个消息处理不了。

    经过以上的阐述,我们要怎么知道这条消息到底有没有成功到达队列并处于稳定呢?
    我们发现
    ack无法作为消息是否成功入列的判断标准!!!
    ack无法作为消息是否成功入列的判断标准!!!
    ack无法作为消息是否成功入列的判断标准!!!
    重要的事情说三遍
    消息成功入列会给客户端发ack信号,消息无法路由到队列也会给客户端发ack信号,但是这两者是有区别的。成功入列的话是单纯发送"ack",而如果消息无法路由到队列则是发送 "消息" + "ack",且"消息"比ack信号先到

    那我们的程序中拿什么来作为消息是否成功入列的判断标准呢?相信各位看客心中已经有了答案。
    没错,如果返回了"消息",那么自然是失败的。如果没有返回消息体,单纯返回ack ,那就是成功的。废话不多说,上代码

    <?php
    
    try{
        $config = [
            "host" => "127.0.0.1",
            "port" => "5672",
            "login" => "wuj13",
            "password" => "Aaa294515505",
            "vhost" => "test_vhost"
        ];
        $conn = new AMQPConnection($config);
        //连接rabbitmq
        if(!$conn->connect()){
           throw new Exception("连接失败");
        }
        //创建信道
        $channel = new AMQPChannel($conn);
    
        //开启消息确认模式
        $channel->confirmSelect();
        $isSuccess = true;
        //设置回调方法
            $channel->setConfirmCallback(function($delivery_tag, $multiple) use ($channel,$conn,&$isSuccess){
            echo "收到ack回执:\n";
            echo "delivery_tag: $delivery_tag \n";
            echo "multiple:$multiple\n";
            if($isSuccess){
                echo "消息成功入列\n";
                /**
                 * TODO
                 * 1 数据库事务提交
                 * 2 业务处理完成,返回成功
                 */
            }
            else{
                echo "消息入列失败\n";
                /**
                 * TODO
                 * 1 数据库事务回滚
                 * 2 失败消息记录日志
                 * 3 返回失败
                 */
            }
            $channel->close();
            $conn->disconnect();
            exit;
    
        },function($delivery_tag, $multiple,$requeue){
            echo "收到nack回执:delivery_tag: ".$delivery_tag. " multiple: $multiple"." requeue:".$requeue.PHP_EOL;
            /**
             * TODO
             * 重新发送消息
             */
    
        });
    
        //设置消息返还的回调
        $channel->setReturnCallback(function($reply_code, $reply_text, $exchange, $routing_key, $properties, $body) use(&$isSuccess){
            $isSuccess = false;
            //进行日志记录
            echo "收到return消息\n";
            echo "body: $body \n";
            echo "replay_code: $reply_code \n";
            echo "replay_text: $reply_text \n";
        });
    
        //创建交换机
        $exchange = new AMQPExchange($channel);
        $exchangeName = "exchange_one";
        //设置交换机名称
        $exchange->setName($exchangeName);
        //设置交换机类型
        $exchange->setType(AMQP_EX_TYPE_DIRECT);
        //设置为持久化交换机
        $exchange->setFlags(AMQP_DURABLE);
        //生成交换机
        $exchange->declareExchange();
    
    
    //    //创建队列
        $queueName ="queue_test";
        $queue = new AMQPQueue($channel);
        //设置队列名称
        $queue->setName($queueName);
        $queue->setFlags(AMQP_DURABLE);
        //生成队列
        $queue->declareQueue();
        //队列绑定交换机
        $queue->bind($exchangeName,$queueName);
    
        $message = "hello world";
        for($i = 1; $i<=1;$i++){
           $exchange->publish("message".$i,"123",AMQP_MANDATORY,["delivery_mode" => 2]);
        }
        $channel->waitForConfirm();
    
    }
    catch(Exception $exception){
        echo "操作异常 ".$exception->getMessage();
        $channel->close();
        $conn->disconnect();
    }
    
    

    代码解析:
    $channel->confirmSelect() 我们使用confirmSelect来开启信道的发送者确认模式(注意,信道一旦开启发送者确认模式,就无法使用事务,这两者不兼容)

    $channel->setConfirmCallback() 是设置回调,参数一是ack回调,服务端返回ack信号时触发。参数二是nack回调,服务端返回nack信号时触发

    $channel->setReturnCallback() 是设置消息返还的回调,服务端返回"消息"时触发

    $channel->waitForConfirm() 是进入阻塞模式等待服务端返回通知

    至于在各个回调中应该怎么去处理业务,请注意看我代码中的注释!

    先说说nack回调的处理
    nack回调是属于rabbitmq系统内部错误时返回的信号,这种情况属于很少见,这个时候我们应该把消息进行重新发送

    而ack回调跟return回调我们要结合起来,用来判断消息有没有成功入列
    如果触发了return回调,代表消息必然路由失败,这时我们就把$isSuccess变量设置为false,便于后续到达的ack信号触发ack回调的时候进行成功与否的判断
    ack回调中,如果发现$isSuccess为false,证明在ack信号达到前,已经触发过return回调了,证明消息是入列失败的,否则证明单纯只有ack信号达到,则是成功入列的。
    如果消息入列失败,我们需要回滚我们的数据库事务,并把失败的消息记录日志,给前端返回错误。此时不会重新发送该消息,因为即使我们重新发送消息,也是无法路由的。
    如果入列成功,那就提交数据库事务,然后给前端正常返回业务数据即可。
    然后就是断开信道、rabbitmq的连接,中止阻塞进程。

    相关文章

      网友评论

          本文标题:rabbitmq -- 消息的可靠性

          本文链接:https://www.haomeiwen.com/subject/njcbiktx.html