美文网首页
swoole进程池中使用beanstalkd,异步处理任务

swoole进程池中使用beanstalkd,异步处理任务

作者: 骑蚂蚁上高速_jun | 来源:发表于2020-03-01 21:34 被阅读0次

    beanstalkd的优缺点:

    优点:

    1.beanstalkd是基于内存的任务队列,性能较高。每个job有多种状态,状态之间可以相互转换。这些状态为job的使用者提供了使用的方便。在启动的时候, 带上 -b 参数也可将任务保存在文件中,防止服务器宕机后任务丢失。
    2.在网络事件驱动方面,使用异步,高效的epoll作为事件驱动框架,但使用的是单线程模式。

    缺点:

    1.保存数据格式比较单一,只有List 链表一种数据结构。可以考虑加上 hash数据类型,可供查询功能。
    2.不适合做任务量超高的大型系统,也不建议在分布式系统中使用。大系统可考虑使用RabbitMq
    3.只能单独删除一个个的任务,不能一次删除整个管道(队列) 。 当管道中的任务全部删除后,beanstalk会自动将该管道删除{严重吐槽这个设计}
    我的踩坑:1当生产者未产生消息时; 2管道中无可用任务(删除的不算)时; 3消费者未启动的情况下。无法使用$p->statsTube(tube); 检查项目中的管道状态 。 因为这三个条件全部满足的时候,该管道已经被beanstalk自动删除了。。连管道都不存在了,肯定无法查看管道的状态
    4.beanstalkd 在消费端守护进程使用的时候,不能像redis那样 通过 redis.setOption(Redis::OPT_READ_TIMEOUT,-1);和 redis.brPop(keys, timeout ); 两个方法实现 没有任务进入时而永久阻塞[不报超时错误]的方法。
    当时我在使用的时候踩过坑,不过也可在代码上规避这个问题。特此做下记录

    消费端使用

    环境说明: php7.2 , swoole 扩展; composer包 "pda/pheanstalk": "^4.0"

    require "./vendor/autoload.php";
    require "./sms.php";
    use Pheanstalk\Pheanstalk;
    use Swoole\Process;
    use Swoole\Process\Pool;
    
    
    class swoole
    {
        public $daemon=false;
        private $events=[
            "workerStart","workerStop"
        ];
        /*
         * 进程池入口方法
         */
        public function main(){
            // 使用swoole 的进程池 做消费端
            $pool = new Pool(20,0,0,true);
            foreach ($this->events as $event){
                $pool->on($event,[$this,$event]);
            }
            $this->daemon &&  Process::daemon(); // 是否守护进程
            $pool->start();
        }
        public function workerStart(Pool $pool,int $workerId){
    
            set_error_handler(function ($errno,$errstr,$errfile,$errline)  {
                //....
            });
    
            swoole_set_process_name("swoole:{$workerId}");
            ini_set('default_socket_timeout', -1);
            $socketFactory =new SocketFactory("127.0.0.1", 11300);
            $run = true;
    
            try{
                $socketFactory->create();
                $b = Pheanstalk::createWithFactory($socketFactory);
            }catch(Exception $e){
                // 连接失败
                echo "connect falied.\n";
                $run = false;
                sleep(12);// 连接失败,退出当前子进程,重新启动新的进程,重新连接
                return false;
            }
            $logPath="/youPath/log.txt";
            while($run){
                /*echo "wait task\n";*/
                try{
                    // 在此一定必须设置阻塞时间,不能设置<=0的无限制阻塞,切记。
     /*
    踩坑:
    我当时在此踩过坑 我设置-1让其无限制阻塞等待任务进入导致消费端bug。
    因为当生产者长时间未生产消息时,消费端一直阻塞导致假死的情况[出现僵死进程]。
    这应该是beanstalk本身的问题。 一旦产生假死,有新任务进入的时候。消费端无法消费。
    解决方案:
    在 reserveWithTimeout(int); 方法中设置一个阻塞时间;当没有任务进入时,消费者达到阻塞时间也会向下执行而返回 null值。
    我只需要在下面代码判断一下,而跳出当次循环重新阻塞int 秒钟即可。用以保证benstalk消费端的活性(类似线程检测),而不会出现僵死进程
    */
                    $job = $p->watch("tubeName")->ignore("default")->reserveWithTimeout(60);// 阻塞时间
                    if(!$job){
                        echo "超时跳出一次 \n";
                        continue;
                    }
                }catch(Throwable $e){
                    if($e instanceof  \Pheanstalk\Exception\ConnectionException){
                        // 与服务器连接中断异常,退出循环.退出了当前子进程,由父进程重新创建子进程进行检测beanstalk
                        $run = false; // 
                    }
                    // 1. beanstalk 因为异常原因断开连接的时候会触发异常,跳出当前循环即可
                    // 2 . 当beanstalk 链接正常后, 该消费端会自动进行连接正常。 $p->watch() 会正常消费的
                    echo "超时跳出一次";
                    continue;
                }
                // 接收到客户端投递的任务
                $std=json_decode($job->getData());
                $std->coenv=true;
                $sms = new sms();
                $r= $sms->yunsmsSendSMS($std); // 执行业务,发送短信
                if($r=="100"){
                    $p->delete($job); // 发送成功,删除任务
                    $log = "发送成功\n";
                }else {
                    $p->bury($job); // 发送失败,将任务预留。以供维护者查看原因和在此将其变为 ready 状态进行消费
                    $log = "发送失败\n";
                }
            }
        }
    
        public function workerStop(Pool $pool,int $workerId){
            echo "重新启动\{$workerId}\n";
        }
    }
    $swoole = new swoole();
    $swoole ->daemon = ($argv[1] == "daemon") ? true : false;
    $swoole->main();
    
    服务启动说明
    $ php swoole.php #调试启动
    $ nohup php swoole.php daemon &   # 守护进程启动
    // 启动之后建议前往运维端查看 消费者数量与 swoole进程池数量是否一致,
    // 不一致则需要检查消费端启动的异常情况了
    # 以下通过简单的运维代码,检查beanstalk 的运行状况和 该管道消费端的启动状况
    require "./vendor/autoload.php";
    $p=Pheanstalk::create("ip",11300);
    try{
        var_dump($p->statsTube("tubeName")->getArrayCopy()); // 成功返回该管道信息 是一个数组
    // 如果执行成功,重点 查看以下2项,重点查看以下2项的值与  swoole进程池数量是否一致
    //不一致则需要检查消费端启动的异常情况了
        [
            current-watching=> 20,//当前消费者数量,该值应与swoole进程池数量一致
            current-waiting=> 20 // 空闲消费者在等待的数量 = 消费者数量 - 当前reserved(正在处理任务进程数)值。
                                          //  当前没有任务在处理的时候,等于  current-watching 数量                                    
        ]
    }catch(Throwable $e){
       // 当以下3种情况都出现时,该管道就被beanstalk系统自动删除了,会出现错误
        1当生产者未产生消息时; 
        2管道中无可用任务(删除的不算)时; 
        3消费者未启动的情况下。
        var_dump($e->getMessage()); // 错误信息提示管道不存在
    }
    
    
    生产者
    require "./vendor/autoload.php";
    $p=Pheanstalk::create("ip",11300);
    $job=$p->useTube("tubeName")->put(json_encode([
            "mobile"=>"13012345678",
            "content"=>"XXXXXXXXXX",
        ],JSON_UNESCAPED_UNICODE));
     var_dump($job);
    

    相关文章

      网友评论

          本文标题:swoole进程池中使用beanstalkd,异步处理任务

          本文链接:https://www.haomeiwen.com/subject/cfelkhtx.html