美文网首页
Phalcon消息队列

Phalcon消息队列

作者: flycorn | 来源:发表于2016-12-13 16:34 被阅读914次

参考:
消息队列
phalcon消息队列
PhalconFcMvc

CLI应用结构

 phalcon_cli/
 |----app  //应用目录
 |      |----config  //配置目录
 |      |      |----config.php
 |      |----tasks  //任务目录
 |      |      |----MainTask.php  //主任务
 |      |      |----ProductionTask.php  //生产任务
 |      |      |----ConsumeTask.php  //消费任务
 |      |----cli.php  //应用入口

cli.php

use Phalcon\DI\FactoryDefault\CLI as CliDI;
use Phalcon\CLI\Console as ConsoleApp;
$di = new CliDI();
defined('APPLICATION_PATH')|| define('APPLICATION_PATH', realpath(dirname(__FILE__)));
$loader = new \Phalcon\Loader();
$loader->registerDirs(array(APPLICATION_PATH . '/tasks'));
$loader->register();

if(is_readable(APPLICATION_PATH . '/config/config.php')) {
    $config = include APPLICATION_PATH . '/config/config.php';
    $di->set('config', $config);
}
$console = new ConsoleApp();
$console->setDI($di);

/*** Process the console arguments*/
$arguments = array();
$params = array();

foreach($argv as $k => $arg) {
  if($k == 1) {
       $arguments['task'] = $arg;
  } elseif($k == 2) {
       $arguments['action'] = $arg;
  } elseif($k >= 3) {
      $params[] = $arg;
  }
}
if(count($params) > 0) {
    $arguments['params'] = $params;
}
// define global constants for the current task and action
define('CURRENT_TASK', (isset($argv[1]) ? $argv[1] : null));
define('CURRENT_ACTION', (isset($argv[2]) ? $argv[2] : null));
try {
    // handle incoming arguments
    $console->handle($arguments);
}catch (\Phalcon\Exception $e) {
    echo $e->getMessage();
    exit(255);
}

生产者:ProductionTask.php

class ProductionTask extends \Phalcon\CLI\Task{    

  public function mainAction()    {    
      //连接Beanstalk
      $queue = new Phalcon\Queue\Beanstalk([
          'host' => '127.0.0.1',
          'port' => 11300
      ]); 
      //choose方法指定tube
      $queue->choose("my_tube");
      //创建任务
      for($i=0;$i<10;$i++){
         $queueId = $queue->put(['msg' => 'hello phalcon('.$i.')']);   
          echo '任务Id:'.$queueId."\n";
      }
  }

}

消费者:ConsumeTask.php

class ConsumeTask extends \Phalcon\CLI\Task{    

  public function mainAction()    {          
      //连接Beanstalk 
      $queue = new Phalcon\Queue\Beanstalk([
          'host' => '127.0.0.1', 
          'port' => 11300 
      ]);
      //监视指定tube
      $queue->watch("my_tube"); 

      while(true){
          echo 'Waiting for a job... STRG+C to abort.'."\n";
          //获取任务
          $job = $queue->reserve();
          if(!$job){
              echo 'Invalid job found. Not processing.'."\n";
          }else{
              $job_id = $job->getId();
              echo 'Processing job '.$job_id."\n";
              //获取任务详情
              $jobInfo = $job->getBody();
              echo 'Msg:'.$jobInfo['msg']."\n";
              $job->delete();
              echo 'Success Job '.$job_id.'. Deleting.'."\n";
          }
      }
  }

}

操作

1、开启Beanstalkd
# beanstalkd -l 127.0.0.1 -p 11301 &
2、打开两个控制台
3、控制台1:
# php phalcon_cli/app/cli.php ConsumeTask main
4、控制台2:
# php phalcon_cli/app/cli.php Production main

注意:

watch 监控特定tube
choose 指定特定tube
reserve 获取任务,搭配watch使用
peekReady 获取任务,搭配choose使用

消费者用 watch 选择多个管道 (tube), 然后用 reserve 命令获取待执行的任务,这个命令是阻塞的。客户端直到有任务可执行才返回。当任务处理完毕后, 消费者可以彻底删除任务 (DELETE), 释放任务让别人处理 (RELEASE), 或者保留 (BURY) 任务。

相关文章

  • Phalcon消息队列

    参考:消息队列phalcon消息队列PhalconFcMvc CLI应用结构 cli.php 生产者:Produc...

  • 消息队列:消息队列简介

    1. 什么是消息队列 消息队列(message queue),是一种应用程序的通信方法; 消息队列是 生产者-消费...

  • 消息中间件

    消息中间件,也可以叫做中央消息队列或者是消息队列(区别于本地消息队列,本地消息队列指的是 JVM 内实现的队列实现...

  • 消息队列对比

    引用: 常用消息队列对比消息队列及常见消息队列介绍 常用消息队列 1. RabbitMQ 用erlang语言开发的...

  • 消息队列应用-使用异步队列就解耦了吗

    消息队列作用一文介绍了为什么要使用消息队列。我们再来讨论下如何有效使用消息队列。 消息队列模式 目前主流消息队列主...

  • Kafka常见问题解决

    概要:我们使用消息队列,就需要知道为什么要用消息队列,什么场景需要用消息队列,使用消息队列能带来哪些好处和消息队列...

  • 进程间通信(下)

    消息队列 在UNP第二卷中详细介绍了两种消息队列:Posix消息队列和System V消息队列。这两种消息队列很相...

  • APUE读书笔记-15进程内部通信(6)

    7、消息队列 消息队列是存放在内核中的消息的链表,通过消息队列标识进行标记。我们把消息队列称为队列,把它的标识称为...

  • java消息机制

    1.什么是消息队列? 1.消息队列是一个队列,先进先出,你无法读取消息队列中间的消息,只能按照顺序,从消息队列的头...

  • springboot项目架构(4)activemq、rabbit

    消息队列实现 支持的消息队列 ActiveMq RabbitMq RocketMq Kafka 各个队列实现队列与...

网友评论

      本文标题:Phalcon消息队列

      本文链接:https://www.haomeiwen.com/subject/zwqbmttx.html