美文网首页IT@程序员猿媛技术干货
拒绝分散,试试任务引擎

拒绝分散,试试任务引擎

作者: 祥哥去哪里 | 来源:发表于2020-01-20 09:43 被阅读0次

    前言:

    大家做后端开发,一定都遇到过很多需求需要依靠定时任务去完成,可是不同的部门不同的开发会写在不同的项目中。定时任务也会越来越多越来越不好管理。既然大家的需求都是定时完成一些任务,只是任务内容不同,那我们就可以将管理任务执行任务这部分功能抽象出来做成一个服务。我称这样的服务为任务引擎

    流程

    流程大致如上图,我们可以来拆解一下。

    1. 首先需要做任务,那么就必须得有任务。写入任务其实也就是写表。这个应该需要封装成接口对外提供
    2. 根据任务状态获取出100条数据
    3. 将取出的数据遍历循环
    4. 对应单条数据的type值实例化对应的类
    5. 调用对应类上面的处理任务方法
    6. 修改任务状态
    7. 打印日志

    在这整个流程中,各个需要使用任务引擎的业务方其实只需要关注,写入任务执行任务这两个事件。其余的任务引擎会自动完成

    表设计、架构设计

    数据表结构如下:

    字段 类型 注释
    id int(11) 唯一主键
    bus_type int(11) 业务类型
    bus_key bigint(20) 业务id
    remark varchar(255) 备注
    params varchar(255) 可能会用到的参数
    status tinyint(1) 任务状态 1完成 2待执行 3结束 4失败重试
    create_time int(11) 创建时间
    update_time int(11) 更新时间

    索引设计,结合业务

    索引名 索引字段
    idx_status_create_time INDEX cp_schedule_task(status ASC, create_time DESC)

    开发框架我使用的是PHP7.3+ThinkPHP6.0
    首先使用composer create-project topthink/think tp6拉取一个框架代码。
    框架生成后,需要配置一些数据库连接之类的东西我就不介绍了,感兴趣的朋友可以前tp6官方手册里面查看
    首先我们将需要写的代码分为三部分

    1. 添加任务
    2. 做任务
    3. 修改任务状态

    首先来看添加任务
    添加任务实质上就是实现一个写表的操作,封装成接口对外调用就可以了

    创建模型类

    class ScheduleTask extends Model
    {
        protected $autoWriteTimestamp=true;//配置参数,配置之后,会自动写create_time和update_time
    }
    

    创建业务类,并且创建新增方法

    /**
     * 处理任务业务类
     * Class ScheduleTaskService
     * @package app\schedule\service
     */
    class ScheduleTaskService
    {
     /**
         * @title 新增任务方法
         * @param $data ["bus_id"=>1234,"bus_type"=>1,"remark"=>"hhhh","params"=>"{'lll':'hhh'}"]
         * @special bus_id 和bus_type必填
         */
        public function addTask($data){
            ScheduleTask::create($data);
        }
    }
    

    创建控制器,并且创建对外API

    class Task extends  BaseController
    {
    
        /**
         * @title 新增任务接口
         */
        public function add()
        {
            $post=$this->request->param();
            //其实需要对入参做判断,这里就先省略
            $service=new ScheduleTaskService();
            $service->addTask($post);
            response("加入成功",200,[],"json");
        }
    
    }
    

    第二步就是任务的重点做任务
    由于各个业务方的需求共同点是做任务,而任务的内容又不尽相同。所以我们需要将做这个任务给抽象出接口(interface)而任务内容由各个业务方自己实现

    先创建一个对象接口,定义执行任务的方法。有了这个接口,后面业务方需要添加不同的业务都可以继承这个接口并实现run方法

    interface ScheduleTaskComponent
    {
        /**
         * 执行任务
         * @param $data array 从表里出的一条数据
         * @return mixed
         */
        public function run($data);
    }
    

    实现一个假装发送消息的任务消耗类

    class SendMessageSchedule implements ScheduleTaskComponent
    {
        /**
         * 执行任务
         * @param $data array 从表里出的一条数据
         * @return mixed
         */
        public function run($data)
        {
            echo "假装发送了一条消息".$data['bus_id'].$data['params'].PHP_EOL;
        }
    }
    

    再实现一个假装写日志的任务消耗类

    
    class AddLogSchedule implements ScheduleTaskComponent
    {
    
        /**
         * 执行任务
         * @param $data array 从表里出的一条数据
         * @return mixed
         */
        public function run($data)
        {
            echo "假装写了一条日志".$data['remark'].PHP_EOL;
        }
    }
    

    那么现在消耗任务的方法有了,怎么才能让任务池里的任务根据不同的业务调用不同的方法呢?这也是本文的关键。
    既然是计划任务,传统的方式一般是,写一个控制器里面全部都是计划任务的业务。然后通过linux自带的crontab来定时发起curl请求。这样做方便是方便,但是还是有几个缺点~
    比如linux的计划任务最小时间单位是分钟,还有就是通过curl请求会走一次公网域名解析产生回环调用。就算你直接写ip不走域名。也会对nginx造成压力。
    为了解决这个问题,我就想到使用php的CLI模式去调用,这样就会直接执行php脚本,不走nginx。比较节约资源,而且更加灵活。
    起初我是打算写控制器然后cli的形式调用就好,但是没想到tp6不支持这样调用了。随后我研究了下tp的文档,发现tp6应该是更希望你使用他的自定义指令。看了文档之后发现其实非常的简单

    1. 先定义一个自定义命令
    php think make:command app\\schedule\\command\\scheduleStart 任务引擎启动
    

    这里上面的命令有个小问题,和文档里有些出入,文档里说的是不需要转移也就是只需要用一个\隔开命名空间的。但是试了下有问题。我看了下源码,应该是他们的bug。已经给作者反应了,应该在下个版本会修复

    随后就会帮你生成好一个用于命令行的类


    生成的类中会有两个方法 一个configureexecute

    configure可以配置接收参数,不过我们这次没有用。直接来看execute方法。顾名思义,这个是用来执行的方法
    我们来看代码

    protected function execute(Input $input, Output $output)
        {
            // 指令输出
            $output->writeln('任务引擎启动');
            while (true){//死循环
                $service = new ScheduleTaskService();
                //在表里获取状态为未执行、失败重试的100条任务
                $task100 = $service->get100Task();
                foreach ($task100 as $item) {
                    $updateId=$item['id'];
                    $updateRemark='';
                    try {
                        $type = $item['bus_type'];
                        //工厂模式,通过type生产不同的类实例
                        $taskComponent = ScheduleTaskService::getTaskComponent($type);
                        //调用类上的run方法
                        $taskComponent->run($item);
                        $updateStatus=1;
                    } catch (BusinessException $be) {//约定的业务异常
                        $updateRemark=$be->getMessage();
                        //如果异常是101就视为失败,不会重复执行
                        if ($be->getCode()=="101") {
                            //报错,不执行
                            $updateStatus=3;
                        }else{
                            //如果不是101就说明,还有的救,改为状态为再执行
                            $updateStatus=4;
                        }
                    } catch (Exception $e) {//系统异常
                        //系统异常
                        Log::error($e->getMessage());
                        $updateStatus=3;//将执行状态改为失败
                        $updateRemark=$e->getMessage();
                    }
                    //修改任务状态
                    $service->updateTask($updateId,$updateStatus,$updateRemark);
                }
                //打印本次操作了多少条
                Log::info("Command/scheduleStart 任务完成数{num}",['num'=>$task100->count()]);
                //线程挂起60秒
                sleep(60);
            }
        }
    

    上面的代码中,有一个生产对应任务的对象的方法getTaskComponent。值得拆开讲一下
    入参是一个任务的bus_type。每个type代表不同的业务,也代表了不同的类。而type和类的对应关系呢,我选择了tp内部的配置文件来做。其实也可以用户数据库记录对应关系,也可以直接冗余到每条数据上。看大家的需求

     /**
         * 获取一个任务类的实现对象
         * @param $type int 任务的typeId
         * @return ScheduleTaskComponent
         * @throws BusinessException
         */
        public static function getTaskComponent($type){
            //根据配置type获取配置
            $className=config("register.$type");
            if (empty($className)) {
                //如果没有配置,则抛出不再执行的业务异常
                Log::error("配置参数{type}在注册文件中不存在",['type',$type]);
                throw new BusinessException("101","配置参数错误");
            }
            //实例化对象
            return new $className();
        }
    

    配置的话 我就在config文件夹下新建了一个配置


    配置目录

    配置内容如下

    return [
        1=>"app\schedule\impl\SendMessageSchedule",//假装发消息
        2=>"app\schedule\impl\AddLogSchedule"//假装写日志
    ];
    

    这样就可以通过配置取到不同业务的处理任务类。最后调用从接口继承的run方法完成调用

    启动

    准备4条数据
    #在项目根目录使用命令启动自定义命令
     php think scheduleStart
    
    执行结果

    由于我的任务执行类就是打印一个字符串,所以能在控制台里看到


    数据表里的状态

    数据库里的任务状态也是修改为完成状态了。这个时候,任务引擎会持续的监听数据表里的任务。发现有任务就会去执行它。任劳任怨永不停止

    本文 源码 在github上面,需要自取

    结语

    首先很感谢你能看到这里,本次介绍的工具只为学习使用,有很多细节还有待考究,如果需要在生产环境使用,一定要控制好入参校验和错误预警。如果你觉得我的博客对你有帮助的话,点个赞再走呗。最后在新年之际,给各位读者拜个年! 大家新年快乐~

    相关文章

      网友评论

        本文标题:拒绝分散,试试任务引擎

        本文链接:https://www.haomeiwen.com/subject/jxhczctx.html