美文网首页
任务调度

任务调度

作者: Gin_714d | 来源:发表于2019-10-28 21:36 被阅读0次

介绍

在本方案中,使用celery作为任务分发平台。对于存入celery的大量任务,能达到以下的要求:

  1. 任务逻辑相互独立
  2. 横向扩展任务处理能力
  3. 抽象(抽象的意义在于,化繁为简) 除业务逻辑以外的 处理过程,将后续代码编写的关注点主要放在业务逻辑的实现上
  4. 链式任务触发

结构

flow.png

设计方案

任务逻辑封装

我们将任务逻辑按照约定的格式封装,并以设置name属性的方式为任务打上标记,而后通过current_node/default_node确定app 发送的节点。这些准备工作完成后,将任务名称和任务数据交由celery app进行发送。如此能够保持发送端的轻量级,使任务更快、更稳定、更无压力的发送到执行端。

以下是任务的封装功能伪代码。


class BaseLogic(object):

    logic_name = None
    nodes = []

    def __init__(self):
        if not self.logic_name or not self.nodes:
            raise attributeError

    def next_node(current_node=None): # 逻辑具有了修改下一个执行节点的能力。

    def send_other(self,  other_logic_name):

    def send_others(self, other_logic_names):

    def set(self,  key,  value):

    def get(self,  key,  default=None):

    def die(self):

    def __repr__(self):  

class SpiderLogic(BaseLogic):

    def crawl(self):

    def publish(self):

    def store(self):

      

发送器封装

class App():

    def __init__(self, app->celery.app):
        self.app = app

    def send(self, node_name, data=None): # 发送初始任务。发送任务名称和传递的数据(LogicClass.name, data)

任务处理流程简述

不同节点的worker会收到属于本节点的任务。worker提供任务的执行流程。

worker是无状态的,worker的每次运行会传入此次运行所需的数据,多次任务运行之间相互不会产生影响。对于无状态的系统,可以避免考虑数据同步等额外的交互问题。同时根据任务数量级和任务执行所需要的资源的不同,可以对worker进行横向扩展。

基础task处理逻辑的封装功能伪代码:

class ProcessTask(celery.Task):
    """
    worker中实际运行的任务流程封装
    """
    name = None

    def __init__(self):

    def run(self, data):
        logic_ins = logic_factory(data)
        self._run(logic_ins)
        self.send_next(logic_ins)
        self.send_others(logic_ins)

    def _run(self, logic_ins):
        raise NotImplementedError

    def send_next(self, logic_ins):
        node_name = logic_ins.next_nodes.pop()
        self.send_to_node(node_name)

    def send_others(self, logic_ins):

        for node_name in logic_ins.other_nodes:
            self.send_to_node(node_name)

    def send_to_node(self, node_name):

class CrawlProcessTask(ProcessTask):

    name = 'crawl'

    def _run(self, logic_ins):
        logic_ins.crawl()

收到任务的名称以后,worker会通过 工厂方法 根据任务名称实例化对应的类,并且按照worker既定的执行流程,执行对应的业务逻辑。

执行完成后,按照logic class 既定的顺序,自动触发下一个流程。当然如果需要将数据进行链式处理,那么在逻辑类中,通过定义other_logics,数据也会发送到对应节点开始新的流程。

总结

任务逻辑相互独立的意义在于,当一个任务需要调整逻辑时,会自然而然的将修改锁定在独立的代码块中,也就是最小化此次修改的影响范围。所以我们将任务相互独立的抽象成不同的逻辑类。而当任务相互独立以后,我们需要一个统一的任务运行机制,并且此机制希望对于任务毫无干预,也就是机制不关注运行任务的内容是什么,而是关注运行任务的流程。所以我们对于任务运行设计了一套流程。

在流程中,我们将任务封装成为一个个的类,在类中定义好业务逻辑以及处理节点的顺序。类通过一个统一的入口进入流程处理。同时,通过这个顺序,被封装后的celery app 可以找到首个接收的worker 节点,然后通过celery的分布式任务分发能力,进行任务的分发。

同时在类中设置链式处理逻辑,解除单个任务之间的壁垒,将任务链条串起来,解决任务之间数据交互的问题。

故此,我们将业务代码抽离出任务分发流程,任务相互独立,同时提供数据传递的方案,保证任务流程的正常执行。同时通过worker节点的无状态,以及celery节点的扩容能力,使得当有大量任务产生的时候,能够对任意任务节点数量进行横向扩展。

相关文章

  • linux定时任务

    一 、简介 Linux下的任务调度分为两类,系统任务调度和用户任务调度 系统任务调度:系统需要定期执行的任务,比如...

  • Linux 系统之crontab命令

    Linux下的任务调度分为两类,系统任务调度 和 用户任务调度。 系统任务调度:系统周期性所要执行的工作,比如写缓...

  • 任务调度

    http://www.cnblogs.com/langtianya/archive/2013/05/15/3079...

  • 任务调度

    介绍 在本方案中,使用celery作为任务分发平台。对于存入celery的大量任务,能达到以下的要求: 任务逻辑相...

  • 任务调度

    主要有3种方案:数据库扫表;小顶堆;时间轮。 数据库扫表 延迟比较大 小顶堆 首先维持一个小顶堆,即最快需要执行的...

  • 任务调度

    用了一个计算机类词汇作为标题。 在每天的工作生活中,会有各式各样的事情,有的重要,有的不重要,有的紧急,有的不太紧...

  • Linux任务调度

    crond任务调度 crontab用于定时任务的设置。任务调度:是指系统在某个时间执行的特定的命令或程序。任务调度...

  • crond任务调度(定时任务调度)

    基本语法crontab [选项]选项-e : 编辑crontab定时任务-l : 查询crontab定时任务...

  • 分布式任务调度 SchedulerX

    参考文档: 为应用实现任务调度(EDAS 部署) 什么是分布式任务调度SchedulerX?分布式任务调度Sche...

  • 分布式调度器Quartz解读

    术语: scheduler:任务调度器 job: 被调度的任务 trigger:触发器,用于定义Job调度时间规则...

网友评论

      本文标题:任务调度

      本文链接:https://www.haomeiwen.com/subject/redivctx.html