美文网首页
任务调度

任务调度

作者: Gin_714d | 来源:发表于2019-10-28 21:36 被阅读0次

    介绍

    在本方案中,使用celery作为任务分发平台。对于存入celery的大量任务,能达到以下的要求:

    1. 任务逻辑相互独立
    2. 横向扩展任务处理能力
    3. 抽象(抽象的意义在于,化繁为简) 除业务逻辑以外的 处理过程,将后续代码编写的关注点主要放在业务逻辑的实现上
    4. 链式任务触发

    结构

    flow.png

    设计方案

    任务逻辑封装

    我们将任务逻辑按照约定的格式封装,并以设置name属性的方式为任务打上标记,而后通过current_node/default_node确定app 发送的节点。这些准备工作完成后,将任务名称和任务数据交由celery app进行发送。如此能够保持发送端的轻量级,使任务更快、更稳定、更无压力的发送到执行端。

    以下是任务的封装功能伪代码。

    
    class BaseLogic(object):
    
        logic_name = None
        nodes = []
    
        def __init__(self):
            if not self.logic_name or not self.nodes:
                raise attributeError
    
        def next_node(current_node=None): # 逻辑具有了修改下一个执行节点的能力。
    
        def send_other(self,  other_logic_name):
    
        def send_others(self, other_logic_names):
    
        def set(self,  key,  value):
    
        def get(self,  key,  default=None):
    
        def die(self):
    
        def __repr__(self):  
    
    class SpiderLogic(BaseLogic):
    
        def crawl(self):
    
        def publish(self):
    
        def store(self):
    
          
    

    发送器封装

    class App():
    
        def __init__(self, app->celery.app):
            self.app = app
    
        def send(self, node_name, data=None): # 发送初始任务。发送任务名称和传递的数据(LogicClass.name, data)
    
    

    任务处理流程简述

    不同节点的worker会收到属于本节点的任务。worker提供任务的执行流程。

    worker是无状态的,worker的每次运行会传入此次运行所需的数据,多次任务运行之间相互不会产生影响。对于无状态的系统,可以避免考虑数据同步等额外的交互问题。同时根据任务数量级和任务执行所需要的资源的不同,可以对worker进行横向扩展。

    基础task处理逻辑的封装功能伪代码:

    class ProcessTask(celery.Task):
        """
        worker中实际运行的任务流程封装
        """
        name = None
    
        def __init__(self):
    
        def run(self, data):
            logic_ins = logic_factory(data)
            self._run(logic_ins)
            self.send_next(logic_ins)
            self.send_others(logic_ins)
    
        def _run(self, logic_ins):
            raise NotImplementedError
    
        def send_next(self, logic_ins):
            node_name = logic_ins.next_nodes.pop()
            self.send_to_node(node_name)
    
        def send_others(self, logic_ins):
    
            for node_name in logic_ins.other_nodes:
                self.send_to_node(node_name)
    
        def send_to_node(self, node_name):
    
    class CrawlProcessTask(ProcessTask):
    
        name = 'crawl'
    
        def _run(self, logic_ins):
            logic_ins.crawl()
    
    

    收到任务的名称以后,worker会通过 工厂方法 根据任务名称实例化对应的类,并且按照worker既定的执行流程,执行对应的业务逻辑。

    执行完成后,按照logic class 既定的顺序,自动触发下一个流程。当然如果需要将数据进行链式处理,那么在逻辑类中,通过定义other_logics,数据也会发送到对应节点开始新的流程。

    总结

    任务逻辑相互独立的意义在于,当一个任务需要调整逻辑时,会自然而然的将修改锁定在独立的代码块中,也就是最小化此次修改的影响范围。所以我们将任务相互独立的抽象成不同的逻辑类。而当任务相互独立以后,我们需要一个统一的任务运行机制,并且此机制希望对于任务毫无干预,也就是机制不关注运行任务的内容是什么,而是关注运行任务的流程。所以我们对于任务运行设计了一套流程。

    在流程中,我们将任务封装成为一个个的类,在类中定义好业务逻辑以及处理节点的顺序。类通过一个统一的入口进入流程处理。同时,通过这个顺序,被封装后的celery app 可以找到首个接收的worker 节点,然后通过celery的分布式任务分发能力,进行任务的分发。

    同时在类中设置链式处理逻辑,解除单个任务之间的壁垒,将任务链条串起来,解决任务之间数据交互的问题。

    故此,我们将业务代码抽离出任务分发流程,任务相互独立,同时提供数据传递的方案,保证任务流程的正常执行。同时通过worker节点的无状态,以及celery节点的扩容能力,使得当有大量任务产生的时候,能够对任意任务节点数量进行横向扩展。

    相关文章

      网友评论

          本文标题:任务调度

          本文链接:https://www.haomeiwen.com/subject/redivctx.html