美文网首页LNMP开发程序员IT在线课程
[LNMP]LNMP的“消息中间件”选型:RabbitMQ+PH

[LNMP]LNMP的“消息中间件”选型:RabbitMQ+PH

作者: tumg的LNMP_IOS小集 | 来源:发表于2016-03-18 22:20 被阅读3309次

    “消息中间件”并不是传统LNMP领域中的常见名词,但在一些复杂计算类、耗时类或高负载业务处理时候,通过会采用“队列”的方式进行异步化、计算分布或高峰削平处理,其实这里运用的是“消息中间件”的概念和应用。

    应用需求


    1. 耗时类:群发邮件、消息等批量主动推送型的功能;
    2. 复杂计算类:图片处理
    3. 高负载类:统计分析、文本分析

    几种方案简略对比


    1. 内建队列:通过数据库或redis/memcache实现队列消息的发送和接收,集成度较高,通常是在同一个系统内实现,但无法进行任务的负载均衡,可监控性和维护性差,适合单机类小型应用;
    2. gearman:是分布式任务分发系统,但本身是一个单点服务,可扩展性、可维护性较RabbitMQ差,通过“注册函数”的进行任务调度,灵活性较低,适合中小型应用,不太适合做 “消息队列中心”;
    3. RabbitMQ:是基于AMQP协议的消息队列系统,具备集群、路由分发和广播、全面持久化等特点,可扩展及维护性较好,适合作为“消息队列中心”。

    消息队列中的3个角色


    1. MQ:消息队列服务,负责队列消息的管理、分发和持久化等,是整个应用的核心,一般只有一个(集群只是多机,服务只有一个);
    2. clienter:负责推送队列信息,提出处理需求,可以有多个;
    3. worker:负责接收队列信息,进行实际的任务处理;

    消息队列的解耦


    1. 时间解耦:即异步处理,clienter 和 worker的工作可以不在一个时间轴内;
    2. 资源解耦:clienter 和 worker 可以部署在不同的机器、ip和网络环境中,实现资源的独立分配;
    3. 应用解耦:clienter 和 worker 通常是不同的应用,甚至是不同的编程语言的应用,实现模块之间的解耦。

    Linux下RabbitMQ的安装&启动


    <pre>
    yum -y install erlang rabbitmq-server
    service rabbitmq-server start
    chkconfig rabbitmq-server on
    </pre>

    增加rabbitmq_management(web的监控和管理端)

    <pre>
    /usr/lib/rabbitmq/lib/rabbitmq_server-3.1.5/sbin/rabbitmq-plugins enable rabbitmq_management
    </pre>

    rabbitMQ服务监听配置

    在/etc/rabbitmq 目录下新增文件:rabbitmq-env.conf
    <pre>
    RABBITMQ_NODE_IP_ADDRESS=192.168.100.101
    RABBITMQ_NODE_PORT=5672
    RABBITMQ_NODENAME=rabbit
    </pre>

    配置完成,查看端口是否正常监听

    <pre>
    service rabbitmq-server restart
    netstat -apn | grep 5672
    </pre>

    web的监控


    监控界面

    PHP调用rabbitMQ服务


    官方php并没有默认安装开启AMQP相应的扩展,需要单独增加扩展

    1. 推荐rabbitMQ官方推荐的php扩展包:https://github.com/php-amqplib/php-amqplib
    2. pecl包:http://pecl.php.net/package/amqp

    推荐php-amqplib进行开发,它说明和demo比较齐全,pecl-amqp只是基于amqp协议的扩展,参数和demo几乎没有,不推荐。

    worker的处理方式


    1. 通过计划任务触发worker处理,适用于可延时较高的任务;
    2. 常驻处理,如果有pcntl扩展,建议通过守护进程的方式触发worker处理,提高常驻处理的稳定性

    Yii 2.x下基于php-amqplib 的组件component


    题主比较懒,只花了2小时封装了基本的发布、拉取消息的功能,其他功能待封装

    <?php
    /**
     * User: tu
     * Yii Component RabbitMQ
     * version 0.1
     * base on package: php-amqplib | https://github.com/php-amqplib/php-amqplib
     */
    namespace frontend\components;
    
    use PhpAmqpLib\Channel\AMQPChannel;
    use PhpAmqpLib\Exception\AMQPConnectionException;
    use yii\base\Component;
    use PhpAmqpLib\Connection\AMQPConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    use yii\base\ErrorException;
    use yii\base\Event;
    use yii\base\Exception;
    
    
    /**
     * Class RabbitMQ
     * @package frontend\components
     */
    class RabbitMQ extends Component{
    
        const EXCHANGE_TYPE_DIRECT = 'direct';
    
        const EXCHANGE_TYPE_FANOUT = 'fanout';
    
        const EXCHANGE_TYPE_TOPIC = 'topic';
    
        const EXCHANGE_TYPE_HEADER = 'header';
    
        const MESSAGE_DURABLE_YES = 2;
    
        const MESSAGE_DURABLE_NO = 1;
    
        private $_host = '127.0.0.1';
    
        private $_port = 5672;
    
        private $_user = '';
    
        private $_passwd = '';
    
        private $_vHost = '/';
    
        private $_connection = null;
    
        private $_queue = '';
    
        private $_exchange = '';
    
        /**
         * 组件初始化
         */
        public function init(){
            parent::init();
            //脚本退出前,关闭连接
            register_shutdown_function([$this,'close']);
        }
    
        /**
         * 连接
         */
        public function connect(){
            $this->getConnect();
        }
    
        /**
         * 关闭连接
         */
        public function close(){
            if($this->_isConnect()){
                $this->_connection->close();
            }
        }
    
        /**
         * 设置默认 queue
         * @param $queue
         */
        public function setDefaultQueue($queue){
            $this->_queue = $queue;
    
        }
    
        /**
         * 设置默认 exchange
         * @param $exchange
         */
        public function setDefaultExchange($exchange){
            $this->_exchange = $exchange;
        }
    
        /**
         * 发布消息
         * @param $message
         * @param $queue
         * @param $exchange
         * @param bool $passive
         * @param bool $durable
         * @param bool $exclusive
         * @param string $type
         * @param bool $auto_delete
         * @return bool
         */
        public function publishMessage($message,$queue,$exchange,$passive=false,$durable=true,$exclusive=false,$type=self::EXCHANGE_TYPE_DIRECT,$auto_delete=false){
            $newChannel = $this->getChannel();
            $newQueue = isset($queue)?$queue:$this->_queue;
            $newExchange = isset($exchange)?$exchange:$this->_exchange;
    
            if($this->_prepare($newChannel,$newQueue,$newExchange,$passive,$durable,$exclusive,$type,$auto_delete)){
                $delivery_mode = ($durable)?self::MESSAGE_DURABLE_YES:self::MESSAGE_DURABLE_NO;
                $msg = new AMQPMessage($message, array('content_type' => 'text/plain', 'delivery_mode' => $delivery_mode));
                $newChannel->basic_publish($msg,$exchange);
                $newChannel->close();
                return true;
            }
            $newChannel->close();
            return false;
        }
    
        /**
         * 拉取消息
         * @param $queue
         * @param $exchange
         * @param bool $passive
         * @param bool $durable
         * @param bool $exclusive
         * @param string $type
         * @param bool $auto_delete
         * @return bool
         */
        public function getMessage($queue,$exchange,$passive=false,$durable=true,$exclusive=false,$type=self::EXCHANGE_TYPE_DIRECT,$auto_delete=false){
            $newChannel = $this->getChannel();
            $newQueue = isset($queue)?$queue:$this->_queue;
            $newExchange = isset($exchange)?$exchange:$this->_exchange;
            $mix = false;
    
            if($this->_prepare($newChannel,$newQueue,$newExchange,$passive,$durable,$exclusive,$type,$auto_delete)){
                $msg = $newChannel->basic_get($queue);
                if($msg){
                    $newChannel->basic_ack($msg->delivery_info['delivery_tag']);
                    $mix = $msg->body;
                }
            }
            $newChannel->close();
            return $mix;
        }
    
        /**
         * @return bool
         */
        private function _isConnect(){
            if($this->_connection && $this->_connection->isConnected()){
                return true;
            }
            return false;
        }
    
        /**
         * @param $channel
         * @param $queue
         * @param $exchange
         * @param bool $passive
         * @param bool $durable
         * @param bool $exclusive
         * @param string $type
         * @param bool $auto_delete
         * @return bool
         */
        private function _prepare($channel,$queue,$exchange,$passive=false,$durable=true,$exclusive=false,$type=self::EXCHANGE_TYPE_DIRECT,$auto_delete=false){
    
            if($channel && is_a($channel,'\PhpAmqpLib\Channel\AMQPChannel')){
                $channel->queue_declare($queue,$passive,$durable,$exclusive,$auto_delete);
                $channel->exchange_declare($exchange,$type,$passive,$durable,$auto_delete);
                $channel->queue_bind($queue, $exchange);
                return true;
            }
            return false;
        }
    
        /**
         * @param $host
         */
        public function setHost($host){
            $this->_host = $host;
        }
    
        /**
         * @param $port
         */
        public function setPort($port){
            $this->_port = $port;
        }
    
        /**
         * @param $user
         */
        public function setUser($user){
            $this->_user = $user;
        }
    
        /**
         * @param $passwd
         */
        public function setPasswd($passwd){
            $this->_passwd = $passwd;
        }
    
        /**
         * @param $vHost
         */
        public function setVHost($vHost){
            $this->_vHost = $vHost;
        }
    
        /**
         * @return AMQPChannel
         * @throws ErrorException
         */
        public function getChannel(){
            return $this->getConnect()->channel();
        }
    
        /**
         * @return null|AMQPConnection
         * @throws ErrorException
         * @throws \yii\base\ExitException
         */
        public function getConnect(){
            if(!$this->_isConnect()){
                try{
                    $this->_connection = new AMQPConnection($this->_host, $this->_port, $this->_user, $this->_passwd, $this->_vHost);
                } catch (\PhpAmqpLib\Exception\AMQPRuntimeException $e){
                    throw new ErrorException('rabbitMQ server connect error',500,1);
                }
            }
            return $this->_connection;
        }
    }
    
    

    相关文章

      网友评论

      • sucjun:为什么我这边会显示
        Class 'PhpAmqpLib\Connection\AMQPConnection' not found
        可这个文件的确存在,而且也能找到
        还有什么其他地方要配置吗?

      本文标题:[LNMP]LNMP的“消息中间件”选型:RabbitMQ+PH

      本文链接:https://www.haomeiwen.com/subject/mrzqlttx.html