美文网首页
hyperf3.0--amqp

hyperf3.0--amqp

作者: geeooooz | 来源:发表于2023-06-26 15:20 被阅读0次

    官方文档 https://hyperf.wiki/3.0/#/zh-cn/amqp

    安装

    composer require hyperf/amqp

    连接配置

    <?php
    //config/autoload/amqp.php
    declare(strict_types=1);
    /**
     * This file is part of Hyperf.
     *
     * @link     https://www.hyperf.io
     * @document https://hyperf.wiki
     * @contact  group@hyperf.io
     * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
     */
    use Hyperf\Amqp\IO\IOFactory;
    
    return [
        'enable' => true,
        'default' => [
            'host' => '127.0.0.1',//env('AMQP_HOST', '192.168.0.44'),
            'port' => 5672,//(int) env('AMQP_PORT', 5672),
            'user' => 'admin',//env('AMQP_USER', 'root'),
            'password' => 'admin',//env('AMQP_PASSWORD', 'root'),
            'vhost' => env('AMQP_VHOST', '/test'),
            'open_ssl' => false,
            'concurrent' => [
                'limit' => 2,
            ],
            'pool' => [
                'connections' => 2,
            ],
            'io' => IOFactory::class,
            'params' => [
                'insist' => false,
                'login_method' => 'AMQPLAIN',
                'login_response' => null,
                'locale' => 'en_US',
                'connection_timeout' => 3,
                'read_write_timeout' => 6,
                'context' => null,
                'keepalive' => true,
                'heartbeat' => 3,
                'channel_rpc_timeout' => 0.0,
                'close_on_destruct' => false,
                'max_idle_channels' => 10,
            ],
        ],
    ];
    
    

    投递消息

    使用 gen:producer 命令创建一个 producer
    php bin/hyperf.php gen:amqp-producer DemoProducer

    <?php
    
    declare(strict_types=1);
    
    namespace App\Amqp\Producer;
    
    use Hyperf\Amqp\Annotation\Producer;
    use Hyperf\Amqp\Message\ProducerMessage;
    
    #[Producer(exchange: 'fanout', routingKey: 'user')]
    class DemoProducer extends ProducerMessage
    {
        public function __construct($data)
        {
             //这里可以拿$data做一些操作  最后返回的值 就是传到rabbitmq的
            $this->payload = $data;
        }
    }
    
    

    在Index控制器中调用 投递消息

    <?php
    
    declare(strict_types=1);
    /**
     * This file is part of Hyperf.
     *
     * @link     https://www.hyperf.io
     * @document https://hyperf.wiki
     * @contact  group@hyperf.io
     * @license  https://github.com/hyperf/hyperf/blob/master/LICENSE
     */
    namespace App\Controller;
    use Hyperf\Context\ApplicationContext;
    use App\Amqp\Producer\DemoProducer;
    use Hyperf\Amqp\Producer;
    
    class IndexController extends AbstractController
    {
        public function index()
        {
            $message = new DemoProducer(1);
            $producer = ApplicationContext::getContainer()->get(Producer::class);
            $result = $producer->produce($message);
            var_dump($result);
        }
    }
    
    
    #[Producer] 注解是用于声明一个生产者的,它用于将消息发送到指定的交换机和路由键上。在 #[Producer] 注解中,确实没有队列字段,因为生产者并不需要绑定队列,它只需要将消息发送到指定的交换机上即可。
    
    当消息发送到交换机后,交换机会根据绑定的规则将消息路由到相应的队列中。所以,在使用 #[Producer] 注解时,需要根据实际需求配置好交换机和路由键,以便确保消息能够被正确地路由到指定的队列中。
    
    在示例代码中的 #[Producer(exchange: 'fanout', routingKey: 'user')] 注解中,我们声明了一个名为 user 的路由键,该路由键被绑定到了 fanout 类型的交换机上。当消息发送到交换机时,交换机会将消息路由到所有绑定的队列中,因为 fanout 类型的交换机会将所有接收到的消息广播到所有绑定的队列中。
    
    因此,在使用 #[Producer] 注解时,需要根据实际需求配置好交换机和路由键,以便确保消息能够正确地被路由到指定的队列中。
    

    消费消息

    使用 gen:amqp-consumer 命令创建一个 consumer。
    php bin/hyperf.php gen:amqp-consumer DemoConsumer

    <?php
    
    declare(strict_types=1);
    
    namespace App\Amqp\Consumer;
    
    use Hyperf\Amqp\Message\Type;
    use Hyperf\Amqp\Result;
    use Hyperf\Amqp\Annotation\Consumer;
    use Hyperf\Amqp\Message\ConsumerMessage;
    use PhpAmqpLib\Message\AMQPMessage;
    
    #[Consumer(exchange: 'demo', routingKey: 'demo', queue: 'demo', name: "demo", nums: 1)]
    /**
    这是一个消费者的注解,用于声明一个消费者。注解中的各个参数含义如下:
    exchange:表示消费者要消费的交换机。
    routingKey:表示消费者要绑定的路由键。
    queue:表示消费者要消费的队列名称。
    name:表示消费者的名称,用于区分不同的消费者。
    nums:表示消费者的数量,这个参数指定消费者的数量后,框架会自动创建同样数量的消费者来同时消费队列中的消息。
    在示例代码中的 #[Consumer(exchange: 'fanout', routingKey: 'user', queue: 'user', name: "user", nums: 1)] 注解中,
    我们声明了一个名为 DemoConsumer 的消费者,它要消费 fanout 类型的交换机上的 user 路由键所绑定的名为 user 的队列中的消息,同时指定了消费者的数量为 1。
    注解中的参数还可以根据实际需求进行配置,例如可以将 nums 参数设为 2,让框架创建 2 个消费者来同时消费队列中的消息,提高消费效率。
     */
    
    
    class DemoConsumer extends ConsumerMessage
    {
        //设置交换机类型  我认为只写这个就行
        protected  string $type = Type::DIRECT; //Type::FANOUT;
    
        public function consumeMessage($data, AMQPMessage $message): string
        {
            var_dump($data);
            //业务处理...
    
            return Result::ACK;
        }
    }
    
    

    启动就行了

    禁止消费进程自启

    默认情况下,使用了 #[Consumer] 注解后,框架会自动创建子进程启动消费者,并且会在子进程异常退出后,重新拉起。 如果出于开发阶段,进行消费者调试时,可能会因为消费其他消息而导致调试不便。

    这种情况,只需要在 #[Consumer] 注解中配置 enable=false (默认为 true 跟随服务启动)或者在对应的消费者中重写类方法 isEnable() 返回 false 即可

    <?php
    
    declare(strict_types=1);
    
    namespace App\Amqp\Consumers;
    
    use Hyperf\Amqp\Annotation\Consumer;
    use Hyperf\Amqp\Message\ConsumerMessage;
    use Hyperf\Amqp\Result;
    use PhpAmqpLib\Message\AMQPMessage;
    
    #[Consumer(exchange: "hyperf", routingKey: "hyperf", queue: "hyperf", nums: 1, enable: false)]
    class DemoConsumer extends ConsumerMessage
    {
        public function consumeMessage($data, AMQPMessage $message): string
        {
            print_r($data);
            return Result::ACK;
        }
    
        public function isEnable(): bool
        {
            return parent::isEnable();
        }
    }
    

    设置最大消费数

    可以修改 #[Consumer] 注解中的 maxConsumption 属性,设置此消费者最大处理的消息数,达到指定消费数后,消费者进程会重启。

    消费结果

    框架会根据 Consumer 内的 consume 方法所返回的结果来决定该消息的响应行为,共有 4 中响应结果,分别为 \Hyperf\Amqp\Result::ACK\Hyperf\Amqp\Result::NACK\Hyperf\Amqp\Result::REQUEUE\Hyperf\Amqp\Result::DROP,每个返回值分别代表如下行为:

    返回值 行为
    \Hyperf\Amqp\Result::ACK 确认消息正确被消费掉了
    \Hyperf\Amqp\Result::NACK 消息没有被正确消费掉,以 basic_nack 方法来响应
    \Hyperf\Amqp\Result::REQUEUE 消息没有被正确消费掉,以 basic_reject 方法来响应,并使消息重新入列
    \Hyperf\Amqp\Result::DROP 消息没有被正确消费掉,以 basic_reject 方法来响应

    相关文章

      网友评论

          本文标题:hyperf3.0--amqp

          本文链接:https://www.haomeiwen.com/subject/matgydtx.html