介绍
在本方案中,使用celery作为任务分发平台。对于存入celery的大量任务,能达到以下的要求:
- 任务逻辑相互独立
- 横向扩展任务处理能力
- 抽象(抽象的意义在于,化繁为简) 除业务逻辑以外的 处理过程,将后续代码编写的关注点主要放在业务逻辑的实现上
- 链式任务触发
结构
flow.png设计方案
任务逻辑封装
我们将任务逻辑按照约定的格式封装,并以设置name属性的方式为任务打上标记,而后通过current_node/default_node确定app 发送的节点。这些准备工作完成后,将任务名称和任务数据交由celery app进行发送。如此能够保持发送端的轻量级,使任务更快、更稳定、更无压力的发送到执行端。
以下是任务的封装功能伪代码。
class BaseLogic(object):
logic_name = None
nodes = []
def __init__(self):
if not self.logic_name or not self.nodes:
raise attributeError
def next_node(current_node=None): # 逻辑具有了修改下一个执行节点的能力。
def send_other(self, other_logic_name):
def send_others(self, other_logic_names):
def set(self, key, value):
def get(self, key, default=None):
def die(self):
def __repr__(self):
class SpiderLogic(BaseLogic):
def crawl(self):
def publish(self):
def store(self):
发送器封装
class App():
def __init__(self, app->celery.app):
self.app = app
def send(self, node_name, data=None): # 发送初始任务。发送任务名称和传递的数据(LogicClass.name, data)
任务处理流程简述
不同节点的worker会收到属于本节点的任务。worker提供任务的执行流程。
worker是无状态的,worker的每次运行会传入此次运行所需的数据,多次任务运行之间相互不会产生影响。对于无状态的系统,可以避免考虑数据同步等额外的交互问题。同时根据任务数量级和任务执行所需要的资源的不同,可以对worker进行横向扩展。
基础task处理逻辑的封装功能伪代码:
class ProcessTask(celery.Task):
"""
worker中实际运行的任务流程封装
"""
name = None
def __init__(self):
def run(self, data):
logic_ins = logic_factory(data)
self._run(logic_ins)
self.send_next(logic_ins)
self.send_others(logic_ins)
def _run(self, logic_ins):
raise NotImplementedError
def send_next(self, logic_ins):
node_name = logic_ins.next_nodes.pop()
self.send_to_node(node_name)
def send_others(self, logic_ins):
for node_name in logic_ins.other_nodes:
self.send_to_node(node_name)
def send_to_node(self, node_name):
class CrawlProcessTask(ProcessTask):
name = 'crawl'
def _run(self, logic_ins):
logic_ins.crawl()
收到任务的名称以后,worker会通过 工厂方法 根据任务名称实例化对应的类,并且按照worker既定的执行流程,执行对应的业务逻辑。
执行完成后,按照logic class 既定的顺序,自动触发下一个流程。当然如果需要将数据进行链式处理,那么在逻辑类中,通过定义other_logics,数据也会发送到对应节点开始新的流程。
总结
任务逻辑相互独立的意义在于,当一个任务需要调整逻辑时,会自然而然的将修改锁定在独立的代码块中,也就是最小化此次修改的影响范围。所以我们将任务相互独立的抽象成不同的逻辑类。而当任务相互独立以后,我们需要一个统一的任务运行机制,并且此机制希望对于任务毫无干预,也就是机制不关注运行任务的内容是什么,而是关注运行任务的流程。所以我们对于任务运行设计了一套流程。
在流程中,我们将任务封装成为一个个的类,在类中定义好业务逻辑以及处理节点的顺序。类通过一个统一的入口进入流程处理。同时,通过这个顺序,被封装后的celery app 可以找到首个接收的worker 节点,然后通过celery的分布式任务分发能力,进行任务的分发。
同时在类中设置链式处理逻辑,解除单个任务之间的壁垒,将任务链条串起来,解决任务之间数据交互的问题。
故此,我们将业务代码抽离出任务分发流程,任务相互独立,同时提供数据传递的方案,保证任务流程的正常执行。同时通过worker节点的无状态,以及celery节点的扩容能力,使得当有大量任务产生的时候,能够对任意任务节点数量进行横向扩展。
网友评论