beanstalkd的优缺点:
优点:
1.beanstalkd是基于内存的任务队列,性能较高。每个job有多种状态,状态之间可以相互转换。这些状态为job的使用者提供了使用的方便。在启动的时候, 带上 -b 参数也可将任务保存在文件中,防止服务器宕机后任务丢失。
2.在网络事件驱动方面,使用异步,高效的epoll作为事件驱动框架,但使用的是单线程模式。
缺点:
1.保存数据格式比较单一,只有List 链表一种数据结构。可以考虑加上 hash数据类型,可供查询功能。
2.不适合做任务量超高的大型系统,也不建议在分布式系统中使用。大系统可考虑使用RabbitMq
3.只能单独删除一个个的任务,不能一次删除整个管道(队列) 。 当管道中的任务全部删除后,beanstalk会自动将该管道删除{严重吐槽这个设计}。
我的踩坑:1当生产者未产生消息时; 2管道中无可用任务(删除的不算)时; 3消费者未启动的情况下。无法使用$p->statsTube(tube); 检查项目中的管道状态 。 因为这三个条件全部满足的时候,该管道已经被beanstalk自动删除了。。连管道都不存在了,肯定无法查看管道的状态
4.beanstalkd 在消费端守护进程使用的时候,不能像redis那样 通过 redis.setOption(Redis::OPT_READ_TIMEOUT,-1);和 redis.brPop(keys, timeout ); 两个方法实现 没有任务进入时而永久阻塞[不报超时错误]的方法。
当时我在使用的时候踩过坑,不过也可在代码上规避这个问题。特此做下记录
消费端使用
环境说明: php7.2 , swoole 扩展; composer包 "pda/pheanstalk": "^4.0"
require "./vendor/autoload.php";
require "./sms.php";
use Pheanstalk\Pheanstalk;
use Swoole\Process;
use Swoole\Process\Pool;
class swoole
{
public $daemon=false;
private $events=[
"workerStart","workerStop"
];
/*
* 进程池入口方法
*/
public function main(){
// 使用swoole 的进程池 做消费端
$pool = new Pool(20,0,0,true);
foreach ($this->events as $event){
$pool->on($event,[$this,$event]);
}
$this->daemon && Process::daemon(); // 是否守护进程
$pool->start();
}
public function workerStart(Pool $pool,int $workerId){
set_error_handler(function ($errno,$errstr,$errfile,$errline) {
//....
});
swoole_set_process_name("swoole:{$workerId}");
ini_set('default_socket_timeout', -1);
$socketFactory =new SocketFactory("127.0.0.1", 11300);
$run = true;
try{
$socketFactory->create();
$b = Pheanstalk::createWithFactory($socketFactory);
}catch(Exception $e){
// 连接失败
echo "connect falied.\n";
$run = false;
sleep(12);// 连接失败,退出当前子进程,重新启动新的进程,重新连接
return false;
}
$logPath="/youPath/log.txt";
while($run){
/*echo "wait task\n";*/
try{
// 在此一定必须设置阻塞时间,不能设置<=0的无限制阻塞,切记。
/*
踩坑:
我当时在此踩过坑 我设置-1让其无限制阻塞等待任务进入导致消费端bug。
因为当生产者长时间未生产消息时,消费端一直阻塞导致假死的情况[出现僵死进程]。
这应该是beanstalk本身的问题。 一旦产生假死,有新任务进入的时候。消费端无法消费。
解决方案:
在 reserveWithTimeout(int); 方法中设置一个阻塞时间;当没有任务进入时,消费者达到阻塞时间也会向下执行而返回 null值。
我只需要在下面代码判断一下,而跳出当次循环重新阻塞int 秒钟即可。用以保证benstalk消费端的活性(类似线程检测),而不会出现僵死进程
*/
$job = $p->watch("tubeName")->ignore("default")->reserveWithTimeout(60);// 阻塞时间
if(!$job){
echo "超时跳出一次 \n";
continue;
}
}catch(Throwable $e){
if($e instanceof \Pheanstalk\Exception\ConnectionException){
// 与服务器连接中断异常,退出循环.退出了当前子进程,由父进程重新创建子进程进行检测beanstalk
$run = false; //
}
// 1. beanstalk 因为异常原因断开连接的时候会触发异常,跳出当前循环即可
// 2 . 当beanstalk 链接正常后, 该消费端会自动进行连接正常。 $p->watch() 会正常消费的
echo "超时跳出一次";
continue;
}
// 接收到客户端投递的任务
$std=json_decode($job->getData());
$std->coenv=true;
$sms = new sms();
$r= $sms->yunsmsSendSMS($std); // 执行业务,发送短信
if($r=="100"){
$p->delete($job); // 发送成功,删除任务
$log = "发送成功\n";
}else {
$p->bury($job); // 发送失败,将任务预留。以供维护者查看原因和在此将其变为 ready 状态进行消费
$log = "发送失败\n";
}
}
}
public function workerStop(Pool $pool,int $workerId){
echo "重新启动\{$workerId}\n";
}
}
$swoole = new swoole();
$swoole ->daemon = ($argv[1] == "daemon") ? true : false;
$swoole->main();
服务启动说明
$ php swoole.php #调试启动
$ nohup php swoole.php daemon & # 守护进程启动
// 启动之后建议前往运维端查看 消费者数量与 swoole进程池数量是否一致,
// 不一致则需要检查消费端启动的异常情况了
# 以下通过简单的运维代码,检查beanstalk 的运行状况和 该管道消费端的启动状况
require "./vendor/autoload.php";
$p=Pheanstalk::create("ip",11300);
try{
var_dump($p->statsTube("tubeName")->getArrayCopy()); // 成功返回该管道信息 是一个数组
// 如果执行成功,重点 查看以下2项,重点查看以下2项的值与 swoole进程池数量是否一致
//不一致则需要检查消费端启动的异常情况了
[
current-watching=> 20,//当前消费者数量,该值应与swoole进程池数量一致
current-waiting=> 20 // 空闲消费者在等待的数量 = 消费者数量 - 当前reserved(正在处理任务进程数)值。
// 当前没有任务在处理的时候,等于 current-watching 数量
]
}catch(Throwable $e){
// 当以下3种情况都出现时,该管道就被beanstalk系统自动删除了,会出现错误
1当生产者未产生消息时;
2管道中无可用任务(删除的不算)时;
3消费者未启动的情况下。
var_dump($e->getMessage()); // 错误信息提示管道不存在
}
生产者
require "./vendor/autoload.php";
$p=Pheanstalk::create("ip",11300);
$job=$p->useTube("tubeName")->put(json_encode([
"mobile"=>"13012345678",
"content"=>"XXXXXXXXXX",
],JSON_UNESCAPED_UNICODE));
var_dump($job);
网友评论