美文网首页
hyperf3.0--amqp

hyperf3.0--amqp

作者: farlamp | 来源:发表于2024-04-15 17:27 被阅读0次

    安装amqp

     composer require hyperf/amqp
    

    创建一个消费者
    [ 消费消息 ]

    php bin/hyperf.php gen:amqp-consumer DemoConsumer
    
    <?php
    
    declare(strict_types=1);
    
    namespace App\Amqp\Consumer;
    
    use Hyperf\Amqp\Result;
    use Hyperf\Amqp\Annotation\Consumer;
    use Hyperf\Amqp\Message\ConsumerMessage;
    use PhpAmqpLib\Message\AMQPMessage;
    
    /**
     * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", name="DemoConsumer", nums=1)
     */
    #[Consumer(exchange: 'hyperf', routingKey: 'hyperf', queue: 'hyperf', name: "DemoConsumer", nums: 1)]
    class DemoConsumer extends ConsumerMessage
    {
        public function consumeMessage($data, AMQPMessage $message): string
        {
            print_r($data); 
            return Result::ACK;
        }
    }
    
    

    创建一个生产者
    [ 投递消息 ]

     php bin/hyperf.php gen:amqp-producer DemoProducer
    
    <?php
    
    declare(strict_types=1);
    
    namespace App\Amqp\Producer;
    
    use Hyperf\Amqp\Annotation\Producer;
    use Hyperf\Amqp\Message\ProducerMessage;
    
    /**
     * @Producer(exchange="hyperf", routingKey="hyperf")
     */
    #[Producer(exchange: 'hyperf', routingKey: 'hyperf')]
    class DemoProducer extends ProducerMessage
    {
        public function __construct($data)
        {
            print_r($data);
            $this->payload = $data;
        }
    }
    

    创建生产者脚本

    php bin/hyperf.php gen:command DelayCommand 
    
    <?php
    
    declare(strict_types=1);
    
    namespace App\Command;
    
    use App\Amqp\Producer\DelayDirectProducer;
    use App\Amqp\Producer\DemoProducer;
    use Hyperf\Amqp\Producer;
    use Hyperf\Command\Command as HyperfCommand;
    use Hyperf\Command\Annotation\Command;
    use Hyperf\Context\ApplicationContext as ContextApplicationContext;
    use Hyperf\Utils\ApplicationContext;
    use Psr\Container\ContainerInterface;
    
    /**
     * @Command
     */
    #[Command]
    class DelayCommand extends HyperfCommand
    {
        /**
         * @var ContainerInterface
         */
        protected $container;
    
        public function __construct(ContainerInterface $container)
        {
            $this->container = $container;
    
            parent::__construct('demo:queue');
        }
    
        public function configure()
        {
            parent::configure();
            $this->setDescription('生产消息发布 ');
        }
    
        public function handle()
        {
            for ($i = 0; $i < 100; $i++) {
                $data = [
                    "ss" => 122,
                    "num" => $i,
                    "number" => mt_rand(1111, 9999),
                    "time" => time(),
                ];
                $message = new DemoProducer($data);
                $producer = ContextApplicationContext::getContainer()->get(Producer::class);
                $ret = $producer->produce($message);
            }
            print_r($ret);
            $this->line('Hello Hyperf!', 'info');
            exit();
        }
    }
    
    

    相关文章

      网友评论

          本文标题:hyperf3.0--amqp

          本文链接:https://www.haomeiwen.com/subject/bsbdxjtx.html