美文网首页
ubuntu16.04 消息队列 TP5 - PHP Resqu

ubuntu16.04 消息队列 TP5 - PHP Resqu

作者: MelodyOf流沙 | 来源:发表于2019-01-18 11:22 被阅读0次

    TP5 - PHP Resque Worker

    安装指南

    命令安装

    • composer require chrisboulton/php-resque

    运行环境

    • PHP 5.2+
    • Redis 2.2+
    • TP 5.0

    应用实例指南

    添加入口文件

    <?php
    // +----------------------------------------------------------------------
    // | ResQue执行入口文件
    // +----------------------------------------------------------------------
    // | 作者: Melody
    // +----------------------------------------------------------------------
    
    // [ PHP版本检查 ]
    if (version_compare(PHP_VERSION, '5.5', '<')) {
        die('PHP版本过低,最少需要PHP5.5,请升级PHP版本!');
    }
            fwrite(STDOUT, "Start init");
    set_time_limit(0);
    // [ 应用入口文件 ]
    
    // 定义应用目录
    define('APP_PATH', __DIR__ . '/application/');
    define('MODE_NAME', 'cli');     // 自定义cli模式
    // 处理自定义参数
    $act = isset($argv[2]) ? $argv[2] : 'start';
    putenv("Q_ACTION={$act}");
    putenv("Q_ARGV=" . json_encode($argv));
    // 加载框架引导文件
    require './thinkphp/start.php';
    

    创建Worker类

    <?php
    /**
     * Created by PhpStorm.
     * User: melody
     * Date: 2019-01-10
     * Time: 11:36
     */
    
    namespace app\cisdi\home;
    use app\common\queue;
    use think\Controller;
    
    class MsgQueue extends Controller
    {
        protected $args = [];
        protected $keys = [];
        protected $queues = '*';
        public function __construct()
        {
            queue\Config::init();
            if (!IS_CLI)  die('The file can only be run in cli mode!');
            fwrite(STDOUT, getenv('Q_ARGV'));
            $argv = json_decode(getenv('Q_ARGV'));
            foreach ($argv as $item) {
                if (strpos($item, '=')) {
                    list($key, $val) = explode('=', $item);
                } else {
                    $key = $val = $item;
                }
                $this->keys[] = $key;
                $this->args[$key] = $val;
            }
    
            $this->init();
        }
    
        /**
         * 执行队列
         * 环境变量参数值:
         * --queue|QUEUE: 需要执行的队列的名字
         * --interval|INTERVAL:在队列中循环的间隔时间,即完成一个任务后的等待时间,默认是5秒
         * --app|APP_INCLUDE:需要自动载入PHP文件路径,Worker需要知道你的Job的位置并载入Job
         * --count|COUNT:需要创建的Worker的数量。所有的Worker都具有相同的属性。默认是创建1个Worker
         * --debug|VVERBOSE:设置“1”启用更啰嗦模式,会输出详细的调试信息
         * --pid|PIDFILE:手动指定PID文件的位置,适用于单Worker运行方式
         */
        private function init()
        {
            $is_sington = false; //是否单例运行,单例运行会在tmp目录下建立一个唯一的PID
    
            // 根据参数设置QUEUE环境变量
            $QUEUE = in_array('--queue', $this->keys) ? $this->args['--queue'] : '*';
            if (empty($QUEUE)) {
                die("Set QUEUE env var containing the list of queues to work.\n");
            }
            $this->queues = explode(',', $QUEUE);
    
            // 根据参数设置INTERVAL环境变量
            $interval = in_array('--interval', $this->keys) ? $this->args['--interval'] : 5;
            putenv("INTERVAL={$interval}");
    
            // 根据参数设置COUNT环境变量
            $count = in_array('--count', $this->keys) ? $this->args['--count'] : 1;
            putenv("COUNT={$count}");
    
            // 根据参数设置APP_INCLUDE环境变量
            $app = in_array('--app', $this->keys) ? $this->args['--app'] : '';
            putenv("APP_INCLUDE={$app}");
    
            // 根据参数设置PIDFILE环境变量
            $pid = in_array('--pid', $this->keys) ? $this->args['--pid'] : '';
            putenv("PIDFILE={$pid}");
    
            // 根据参数设置VVERBOSE环境变量
            $debug = in_array('--debug', $this->keys) ? $this->args['--debug'] : '';
            putenv("VVERBOSE={$debug}");
        }
    
        public function index()
        {
            fwrite(STDOUT, "Start index ");
            $act = getenv('Q_ACTION');
            switch ($act) {
                case 'stop':
                    $this->stop();
                    break;
                case 'status':
                    $this->status();
                    break;
                default:
                    $this->start();
            }
        }
    
        /**
         * 开始队列
         */
        public function start()
        {
            fwrite(STDOUT, "开始worker" . "\n");
            // 载入任务类
            $path = JOB_PATH;
            fwrite(STDOUT, "地址". $path . "\n");
            $flag = \FilesystemIterator::KEY_AS_FILENAME;
            $glob = new \FilesystemIterator($path, $flag);
            foreach ($glob as $file) {
                if('php' === pathinfo($file, PATHINFO_EXTENSION))
                    require realpath($file);
            }
    
            $logLevel = 0;
            $LOGGING = getenv('LOGGING');
            $VERBOSE = getenv('VERBOSE');
            $VVERBOSE = getenv('VVERBOSE');
            if (!empty($LOGGING) || !empty($VERBOSE)) {
                $logLevel = \Resque_Worker::LOG_NORMAL;
            } else {
                if (!empty($VVERBOSE)) {
                    $logLevel = \Resque_Worker::LOG_VERBOSE;
                }
            }
    
            $APP_INCLUDE = getenv('APP_INCLUDE');
            if ($APP_INCLUDE) {
                if (!file_exists($APP_INCLUDE)) {
                    die('APP_INCLUDE (' . $APP_INCLUDE . ") does not exist.\n");
                }
                require_once $APP_INCLUDE;
            }
    
            $interval = 5;
            $INTERVAL = getenv('INTERVAL');
            if (!empty($INTERVAL)) {
                $interval = $INTERVAL;
            }
    
            $count = 1;
            $COUNT = getenv('COUNT');
            if (!empty($COUNT) && $COUNT > 1) {
                $count = $COUNT;
            }
    
            if ($count > 1) {
                for ($i = 0; $i < $count; ++$i) {
                    $pid = pcntl_fork();
                    if ($pid == -1) {
                        die("Could not fork worker " . $i . "\n");
                    } // Child, start the worker
                    else {
                        if (!$pid) {
                            $worker = new \Resque_Worker($this->queues);
                            $worker->logLevel = $logLevel;
                            fwrite(STDOUT, '*** Starting worker ' . $worker . "\n");
                            $worker->work($interval);
                            break;
                        }
                    }
                }
            } // Start a single worker
            else {
                $worker = new \Resque_Worker($this->queues);
                $worker->logLevel = $logLevel;
    
                $PIDFILE = getenv('PIDFILE');
                if ($PIDFILE) {
                    file_put_contents($PIDFILE, getmypid()) or
                    die('Could not write PID information to ' . $PIDFILE);
                }
    
                fwrite(STDOUT, '*** Starting worker ' . $worker . "\n");
                $worker->work($interval);
            }
        }
    
        /**
         * 停止队列
         */
        public function stop()
        {
            $worker = new \Resque_Worker($this->queues);
            $worker->shutdown();
        }
    
        /**
         * 查看某个任务状态
         */
        public function status()
        {
            $id = in_array('--id', $this->keys) ? $this->args['--id'] : '';
            $status = new \Resque_Job_Status($id);
            if (!$status->isTracking()) {
                die("Resque is not tracking the status of this job.\n");
            }
    
            echo "Tracking status of " . $id . ". Press [break] to stop.\n\n";
            while (true) {
                fwrite(STDOUT, "Status of " . $id . " is: " . $status->get() . "\n");
                sleep(1);
            }
        }
    }
    

    创建Job处理类

    示例创建的类为job/ComJob

    <?php
    /**
     * Created by PhpStorm.
     * User: melody
         */
    
    namespace app\cisdi\job;
    
    
    class ComJob
    {
        /**
         * 任务执行函数
         */
        public function perform()
        {
            $args = $this->args;
            fwrite(STDOUT, json_encode($args) . '已处理完此信息' . PHP_EOL);
        }
    
        /**
         * perform方法之前调用
         */
        public function setUp()
        {
            // ... Set up environment for this job
        }
    
        /**
         * perform方法之后调用
         */
        public function tearDown()
        {
            // ... Remove environment for this job
        }
    }
    

    添加任务示例

        /**
         * 测试添加消息队列
         */
        public function testEnQueue(){
            $job = '\\app\\cisdi\\job\\ComJob'; // 定义任务类
            $args = array(
                'time' => time(),
                'array' => array(
                    'test' => 'test',
                ),
            );
            $jobId = \Resque::enqueue('default', $job, $args, true);
            echo "Queued job ".$jobId."\n\n";
        }
    

    应用案例

    • 创建任务
    执行testEnQueue方法(可通过cli或者web端执行)
    得到的返回信息:
    Queued job fc7632470b1d7e69aede1675d79bdfe8
    
    • 开启任务处理器
    root@iZwz980sm1dapjhje7e6icZ:/home/wwwroot/default/wx_test# php resque.php cisdi/msg_queue/index start
    Start init应用初始化开始
    ["resque.php","cisdi\/msg_queue\/index","start"]Start index 开始worker
    地址/home/wwwroot/default/wx_test//application/cisdi/job/
    *** Starting worker iZwz980sm1dapjhje7e6icZ:32471:*
    {"time":1547713088,"array":{"test":"test"}}已处理完此信息
    

    让应用长驻内存

    • 安装supervisor
    apt-get install supervisor
    
    • 添加应用到supervisor
    [program:resque]
    command = php resque cisdi/msg_queue/index start &
    numprocs=1
    directory= 你的当前目录
    stderr_logfile_maxbytes=10MB
    stdout_logfile=/var/log/supervisor/%(program_name)s-stdout.log
    stderr_logfile=/var/log/supervisor/%(program_name)s-stderr.log
    redirect_stderr=true
    autostart=true
    autorestart=true
    
    • 重启supervisor让配置文件生效
    supervisorctl reload
    
    • 启动应用进程
    supervisorctl start
    

    相关文章

      网友评论

          本文标题:ubuntu16.04 消息队列 TP5 - PHP Resqu

          本文链接:https://www.haomeiwen.com/subject/xmagdqtx.html