美文网首页
RabbitMQ学习之(五)_一个基于PHP的RabbitMQ操

RabbitMQ学习之(五)_一个基于PHP的RabbitMQ操

作者: minner_01 | 来源:发表于2019-05-07 15:38 被阅读0次

    Amqp.class.php

    <?php
    
    class Amqp
    {
        public $e_name;
        public $q_name;
        public $k_route;
        public $channel;
    
        public function __construct($config,$e_name,$q_name,$k_route)
        {
            $this->e_name = $e_name;
            $this->q_name = $q_name;
            $this->k_route = $k_route;
    
            //创建连接和channel 
            $this->conn = new AMQPConnection($config);    
            if (!$this->conn->connect()) {    
                return array('error_code' => 1,'msg'=>'Cannot connect to the broker!' );
            }
            $this->channel = new AMQPChannel($this->conn);
            $this->CreateExchange();
            $this->CreateQueue();
        }
    
        //创建交换机
        public function CreateExchange()
        {
            $ex = new AMQPExchange($this->channel);    
            $ex->setName($this->e_name);  
            $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型   
            $ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); //持久化  
            //echo "Exchange Status:".$ex->declare()."\n";   //队列内容总数
            $ex->declare();
            $this->ex = $ex;
        }
        
        //创建队列   
        public function CreateQueue()
        {
            $q = new AMQPQueue($this->channel);  
            $q->setName($this->q_name);    
            $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); //持久化   
            //echo "Message Total:".$this->q->declare()."\n";  
            //绑定交换机与队列,并指定路由键  
            //echo "queue status: ".$q->declare();
            //echo "\n";
            //echo 'Queue Bind: '.$q->bind($this->e_name, $this->k_route)."\n"; 
            //echo "\n";
             
             $q->bind($this->e_name, $this->k_route);
        }
        
        //发送消息
        public function send($msg)
        { 
            //$this->CreateExchange();
            //$this->CreateQueue();
            $message=json_encode($msg);
            $this->channel->startTransaction();
            //echo "send: ".$this->ex->publish($message, $this->k_route); //将你的消息通过制定routingKey发送
            $status = $this->ex->publish($message, $this->k_route);
            $this->channel->commitTransaction();
            $this->conn->disconnect();
            return array('status'=>$status);
        }
        
        //获取消息
        public function get()
        {
            $q = new AMQPQueue($this->channel);  
            $q->setName($this->q_name);  
            $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);  
            
            //$q->delete();删除队列
            $return=array();
            while($a=$q->declare())
            {
                //echo "queue status: ".$a;
                //echo "==========\n";
    
                $messages = $q->get(AMQP_AUTOACK);
                $return[]=json_decode($messages->getBody(),true);  
                //echo "\n";  
            }
            $this->conn->disconnect();
            return $return;
        }
    
    }
    

    config.php配置文件

    return array(
        'amqp'=>array(    
                    array(    
                            'host' => '127.0.0.1',
                            'port' => '5672',
                            'vhost' => '/',
                            'user' => 'admin',
                            'password' => 'admin'
                     )
                ),
    );
    

    send.php(加入队列文件|生产者)

    require_once('Amqp.class.php');
    $e_name  = 'e_guest';            //交换机名  
    $k_route = 'k_route_feedpush';   //路由key
    $q_name  = 'q_guest_feedpush';   //队列名
    $config  = config('amqp');       //导入配置文件
    
    $amqp = new Amqp(config('amqp'),$e_name,$q_name,$k_route);
    
    $msg = array('test','123');
    
    $re = $amqp->send($msg);
    

    get.php(接收并处理文件|消费者)

    require_once('Amqp.class.php');
    $config = require('config.php');
    $config_qmqp = $config['amqp'];
    
    $e_name  = 'e_guest'; //交换机名  
    $k_route = 'k_route_sendemail'; //路由key 
    $q_name  = 'q_guest_sendemail'; //队列名
    $amqp    = new Amqp($config_qmqp,$e_name,$q_name,$k_route);
    $re      = $amqp->get();
    

    相关文章

      网友评论

          本文标题:RabbitMQ学习之(五)_一个基于PHP的RabbitMQ操

          本文链接:https://www.haomeiwen.com/subject/cdzeoqtx.html