美文网首页
Phalcon消息队列

Phalcon消息队列

作者: flycorn | 来源:发表于2016-12-13 16:34 被阅读914次

    参考:
    消息队列
    phalcon消息队列
    PhalconFcMvc

    CLI应用结构

     phalcon_cli/
     |----app  //应用目录
     |      |----config  //配置目录
     |      |      |----config.php
     |      |----tasks  //任务目录
     |      |      |----MainTask.php  //主任务
     |      |      |----ProductionTask.php  //生产任务
     |      |      |----ConsumeTask.php  //消费任务
     |      |----cli.php  //应用入口
    

    cli.php

    use Phalcon\DI\FactoryDefault\CLI as CliDI;
    use Phalcon\CLI\Console as ConsoleApp;
    $di = new CliDI();
    defined('APPLICATION_PATH')|| define('APPLICATION_PATH', realpath(dirname(__FILE__)));
    $loader = new \Phalcon\Loader();
    $loader->registerDirs(array(APPLICATION_PATH . '/tasks'));
    $loader->register();
    
    if(is_readable(APPLICATION_PATH . '/config/config.php')) {
        $config = include APPLICATION_PATH . '/config/config.php';
        $di->set('config', $config);
    }
    $console = new ConsoleApp();
    $console->setDI($di);
    
    /*** Process the console arguments*/
    $arguments = array();
    $params = array();
    
    foreach($argv as $k => $arg) {
      if($k == 1) {
           $arguments['task'] = $arg;
      } elseif($k == 2) {
           $arguments['action'] = $arg;
      } elseif($k >= 3) {
          $params[] = $arg;
      }
    }
    if(count($params) > 0) {
        $arguments['params'] = $params;
    }
    // define global constants for the current task and action
    define('CURRENT_TASK', (isset($argv[1]) ? $argv[1] : null));
    define('CURRENT_ACTION', (isset($argv[2]) ? $argv[2] : null));
    try {
        // handle incoming arguments
        $console->handle($arguments);
    }catch (\Phalcon\Exception $e) {
        echo $e->getMessage();
        exit(255);
    }
    

    生产者:ProductionTask.php

    class ProductionTask extends \Phalcon\CLI\Task{    
    
      public function mainAction()    {    
          //连接Beanstalk
          $queue = new Phalcon\Queue\Beanstalk([
              'host' => '127.0.0.1',
              'port' => 11300
          ]); 
          //choose方法指定tube
          $queue->choose("my_tube");
          //创建任务
          for($i=0;$i<10;$i++){
             $queueId = $queue->put(['msg' => 'hello phalcon('.$i.')']);   
              echo '任务Id:'.$queueId."\n";
          }
      }
    
    }
    

    消费者:ConsumeTask.php

    class ConsumeTask extends \Phalcon\CLI\Task{    
    
      public function mainAction()    {          
          //连接Beanstalk 
          $queue = new Phalcon\Queue\Beanstalk([
              'host' => '127.0.0.1', 
              'port' => 11300 
          ]);
          //监视指定tube
          $queue->watch("my_tube"); 
    
          while(true){
              echo 'Waiting for a job... STRG+C to abort.'."\n";
              //获取任务
              $job = $queue->reserve();
              if(!$job){
                  echo 'Invalid job found. Not processing.'."\n";
              }else{
                  $job_id = $job->getId();
                  echo 'Processing job '.$job_id."\n";
                  //获取任务详情
                  $jobInfo = $job->getBody();
                  echo 'Msg:'.$jobInfo['msg']."\n";
                  $job->delete();
                  echo 'Success Job '.$job_id.'. Deleting.'."\n";
              }
          }
      }
    
    }
    

    操作

    1、开启Beanstalkd
    # beanstalkd -l 127.0.0.1 -p 11301 &
    2、打开两个控制台
    3、控制台1:
    # php phalcon_cli/app/cli.php ConsumeTask main
    4、控制台2:
    # php phalcon_cli/app/cli.php Production main
    

    注意:

    watch 监控特定tube
    choose 指定特定tube
    reserve 获取任务,搭配watch使用
    peekReady 获取任务,搭配choose使用

    消费者用 watch 选择多个管道 (tube), 然后用 reserve 命令获取待执行的任务,这个命令是阻塞的。客户端直到有任务可执行才返回。当任务处理完毕后, 消费者可以彻底删除任务 (DELETE), 释放任务让别人处理 (RELEASE), 或者保留 (BURY) 任务。

    相关文章

      网友评论

          本文标题:Phalcon消息队列

          本文链接:https://www.haomeiwen.com/subject/zwqbmttx.html