美文网首页
RabbitMQ与Laravel项目中结合

RabbitMQ与Laravel项目中结合

作者: 起个名字好难_98 | 来源:发表于2019-11-21 17:35 被阅读0次

    搭建RabbitMQ环境不在此文范围内,后面会单独出搭建的教程资料

    与Laravel的结合使用

    1.composer引入官方php-amqplib/php-amqplib包

    2.封装消息生产者

    /**
     * 入消息队列
     *
     * @param $queue string 队列名
     * @param $data mixed 数据
     */
    public static function pushMessageQueue($queue, $data = null)
    {
        $connection  = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
        $channel = $connection->channel();
    
        $table = new AMQPTable([
            'x-queue-type' => 'classic'
        ]);
    
        $channel->queue_declare($queue, false, true, false, false, false, $table);
    
        $message = new AMQPMessage(json_encode($data, JSON_UNESCAPED_UNICODE));
        $channel->basic_publish($message, '', $queue);
    
        $channel->close();
        try {
            $connection->close();
        } catch (\Exception $e) {
        }
    }
    

    3.控制器调用封装好的入队方法

    IndexController.php
    SystemService::pushMessageQueue('other', ['date' => date('Y-m-d H:i:s')]);
    

    4.封装消费者基类

    <?php
    
    namespace App\Console\Commands\Queue;
    
    use PhpAmqpLib\Wire\AMQPTable;
    use Illuminate\Console\Command;
    use PhpAmqpLib\Message\AMQPMessage;
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    
    class Base extends Command
    {
        /**
         * rabbitMQ队列名称
         *
         * @var string
         */
        protected $queue = '';
    
        /**
         * rabbitMQ连接
         *
         * @var AMQPStreamConnection|null
         */
        protected $connection = null;
    
        /**
         * 连接频道
         *
         * @var \PhpAmqpLib\Channel\AMQPChannel|null
         */
        protected $channel = null;
    
        public function __construct()
        {
            parent::__construct();
    
            if (!empty($this->queue)) {
                $this->connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
                $this->channel = $this->connection->channel();
                $this->channel->queue_declare($this->queue, false, true, false, false, false, new AMQPTable([
                    'x-queue-type' => 'classic'
                ]));
    
                $this->channel->basic_consume($this->queue, '', false, false, false, false, $this->handle());
    
                while (count($this->channel->callbacks)) {
                    try {
                        $this->channel->wait();
                    } catch (\ErrorException $exception) {
                    }
                }
            }
        }
    
        public function handle()
        {
            return function ($message) {
    
            };
        }
    
        /**
         * 确认消息
         *
         * @param $message AMQPMessage 当前消息
         */
        protected function ack($message)
        {
            $this->channel->basic_ack($message->delivery_info['delivery_tag']);
        }
    
        /**
         * 拒收消息
         *
         * @param $message AMQPMessage 当前消息
         * @param $multiple bool 是否应用于多消息
         * @param $requeue bool 是否requeue
         */
        protected function nack($message, $multiple = false, $requeue = false)
        {
            $this->channel->basic_nack($message->delivery_info['delivery_tag'], $multiple, $requeue);
        }
    
        /**
         * 拒绝消息并选择是否重新入队
         *
         * @param $message AMQPMessage 当前消息
         * @param $requeue bool 是否requeue true则重新入队列(该消费者还是会消费到该条被reject的消息),否则丢弃或者进入死信队列。
         */
        protected function reject($message, $requeue = false)
        {
            $this->channel->basic_reject($message->delivery_info['delivery_tag'], $requeue);
        }
    
        /**
         * 是否恢复消息到队列
         *
         * @param $requeue bool true则重新入队列并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费,false则消息会重新被投递给自己
         */
        protected function recover($requeue = false)
        {
            $this->channel->basic_recover($requeue);
        }
    }
    
    

    5.other队列消费者

    <?php
    
    namespace App\Console\Commands\Queue;
    
    class Other extends Base
    {
        protected $queue = 'other';
    
        protected $signature = 'command:other';
    
        protected $description = '队列测试';
    
        public function handle()
        {
            return function ($message) {
                echo '收到消息:'.$message->body.PHP_EOL;
                // 业务....
                sleep(2);
                $this->ack($message);
            };
        }
    }
    
    

    6.Kernel文件进行注册artison命令

    <?php
    
    namespace App\Console;
    
    use App\Console\Commands\Queue\Other;
    use Illuminate\Console\Scheduling\Schedule;
    use Illuminate\Foundation\Console\Kernel as ConsoleKernel;
    
    class Kernel extends ConsoleKernel
    {
        /**
         * The Artisan commands provided by your application.
         *
         * @var array
         */
        protected $commands = [
            Other::class
        ];
    
        /**
         * Define the application's command schedule.
         *
         * @param \Illuminate\Console\Scheduling\Schedule $schedule
         * @return void
         */
        protected function schedule(Schedule $schedule)
        {
            // $schedule->command('inspire')
            //          ->hourly();
        }
    
        /**
         * Register the commands for the application.
         *
         * @return void
         */
        protected function commands()
        {
            $this->load(__DIR__.'/Commands');
    
            require base_path('routes/console.php');
        }
    }
    
    

    7.代码写完了,运行一下看看效果

    7.1模拟请求入队,直接请求对应控制器
    7.2消费者输出
    image.png
    7.3RabbitMQ控制台监控
    image.png
    Tips:laravel需要注意下这里,去除composer执行完毕的自动发现包(php artsion package:discover)否则composer install/update会一直阻塞在消费队列监听。修改后如下图:
    image.png

    相关文章

      网友评论

          本文标题:RabbitMQ与Laravel项目中结合

          本文链接:https://www.haomeiwen.com/subject/gyitwctx.html