美文网首页数据挖掘与机器学习
python-分布式任务队列

python-分布式任务队列

作者: yunpiao | 来源:发表于2018-07-18 01:10 被阅读44次

celery 分布式任务队列工具

Celery是一个分布式任务队列工具,是一个异步的任务队列基于分布式消息传递

基本

  • Broker: 消息队列使用的中间人 有RabbiMQ redis mongodb 等一系列数据库
  • Task: 用来定义任务
  • backend: 用来保存结果
  • Worker: 执行单元, 用来从中间人取出任务 , 并把结果发给backen

数据保存

各个sqalchemy数据库


CELERY_RESULT_BACKEND = 'db+scheme://user:password@host:port/dbname'

# sqlite (filename) CELERY_RESULT_BACKEND = ‘db+sqlite:///results.sqlite’

# mysql CELERY_RESULT_BACKEND = ‘db+mysql://scott:tiger@localhost/foo’

# postgresql CELERY_RESULT_BACKEND = ‘db+postgresql://scott:tiger@localhost/mydatabase’

# oracle CELERY_RESULT_BACKEND = ‘db+oracle://scott:tiger@127.0.0.1:1521/sidname’

worker 设置

CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle']
# 设置队列 feeds
CELERY_ROUTES = {'feed.tasks.import_feed': {'queue': 'feeds'}}
BROKER_URL = 'amqp://'      # 指定 Broker
CELERY_RESULT_BACKEND = ''  # 指定 Backend
CELERY_TIMEZONE='Asia/Shanghai'                     # 指定时区,默认是 UTC
# CELERY_TIMEZONE='UTC'
CELERY_IMPORTS = (                                  # 指定导入的任务模块
    'celery_app.service_monitor',
    'celery_app.service_diagnose'
)
消息机制    支持 RabbitMQ, Redis, Beanstalk, MongoDB, CouchDB,
以及 SQL 数据库.
容错      与 RabbitMQ配合可完美实现错误恢复
分布式     运行于一台或多台服务器。支持Broker群集和HA,可任意添加worker而无需在服务器中心节点配置。
并发      通过Python的multiprocessing, Eventlet, gevent 或者他们的混合实现并发执行.
Scheduling  支持cron类的递归式任务,或者指定时间、倒数等任务执行方式.
延迟      极低延迟.
返回值     任务运行结果可储存在指定的结果存储后台,你可以等待结果或忽略运算结果
返回值存储   支持SQL数据库, MongoDB, Redis, Tokyo
Tyrant, Cassandra, 或 AMQP (消息通知).
Webhooks    用户跨语言/平台任务分配。
Rate limiting   Supports rate limiting by using the token bucket algorithm, which accounts for bursts of traffic. Rate limits can be set for each task type, or globally for all.  
消息路由     通过AMQP灵活的路由模型你可以将任务路由到任意worker服务器,可配置或运行时指定。
远程控制     可通过广播消息远程控制worker节点。Celery内置了大量的相关命令,也可以轻松实现自定义命令(只适用AMQP和Redis)
监控      可实时获得workers的一切信息
对象序列化   支持 Pickle, JSON, YAML,或自定义序列化程序. 
错误追踪     
UUID    每个任务都有一个UUID用于查询该任务的运行状态以及返回值。
出错重试    当任务执行失败时可根据配置重试。配置内容包括最大重试次数,重试时间间隔。
任务集 任务集由多个子任务构成,可以获得子任务的数量,执行情况,以及各个子任务的运算结果。
Made for Web    可通过Ajax查询任务运行状态和运行结果。
出错通知    当任务出错是可通过邮件通知管理员

任务调用

service_monitor 为注册任务

延迟调用

add.s(2, 2)()

简单调用

service_monitor(url)

可设置回调调用

service_monitor.apply_async(args=["http://www.baidu.com"], link=res.s())

所有组任务执行完后 所有结果发送到res任务中去 也就是回调res任务

chord((service_monitor.s("http://www.baidu.com") for i in range(3)) , res.s())()

设置每个任务执行后的任务 .s 前任务作为参数的一部分

add.apply_async((2, 2), link=add.s(16))

链接任务 结果发给后任务

chain(add.s(4, 4) | mul.s(8))().get()
另一种用法
g = chain(add.s(4) | mul.s(8))
g(4).get()
还有一种
(add.s(4, 4) | mul.s(8))().get()

任务可设置参数

add.apply_async((10, 10), serializer='json')
add.apply_async((2, 2), compression='zlib')
add.apply_async((2,2), countdown=10, debug=True)
s = add.subtask((2, 2), {'debug': True}, countdown=10)# args kwargs options

应用

task.py

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

运行

celery -A tasks worker --loglevel=info

此时 一个worker 已经开始运行

单机多个worker

$ celery worker --loglevel=INFO --concurrency=10 -n worker1.%h
$ celery worker --loglevel=INFO --concurrency=10 -n worker2.%h
$ celery worker --loglevel=INFO --concurrency=10 -n worker3.%h

下发任务

test.py

# -*- coding: utf-8 -*-
from celery_app.service_monitor import service_monitor, res
from celery import chord
result = chord((service_monitor.s("http://www.baidu.com") for i in range(3)) ,  res.s())()
# 查看任务
result.ready()
result.get(timeout=1)

交换与路由

enter description hereenter description here

三种交换方式

Direct Exchange
直接交换,也就是指定一个消息被其中一个队列接收,这个消息被celerybeat定义一个routing key,如果你发送给交换机并且那个队列且绑定的bingdingkey,那么就会直接被转给这个Queue.
Topic Exchange
这种交换方式可以根据类型的属性进行统配,然后根据统配的类型进行交换到指定的Queue.

Fanout Exchange
广播交换,如果你有某个task,可能处理时间比较长,但是却要求很高的实时性,那么你可能需要多台服务器的多个worker进行处理,每个worker负责其中一部分工作,但是celerybeat 只会生成一个任务,被某个worker取走就没有了,所以你需要让每个服务器的队列都要收到这个消息,这里很需要注意的是:你的fanout类型的消息在生成的时候要有多份,每个队列一份,而不是一个消息发送给单一队列的次数。

celery -A proj worker -Q feeds,celery
为该worker指定一或多个消息队列, worker只取该队列中的任务。可以指定多个队列.
调用

service_monitor.apply_async(args=['http://cnn.com/rss'],queue='feeds',routing_key='feeds')

优先级

单个任务优先级


from kombu import Exchange, Queue
app.conf.task_queues = [
    Queue('tasks', Exchange('tasks'), routing_key='tasks',
          queue_arguments={'x-max-priority': 10},
]

统一优先级

app.conf.task_queue_max_priority = 10

发送广播

current_app.control.broadcast('rate_limit',
arguments={'task_name': 'celery_app.service_monitor.service_monitor',
'rate_limit': '200/m'})

相关文章

  • python-分布式任务队列

    celery 分布式任务队列工具 Celery是一个分布式任务队列工具,是一个异步的任务队列基于分布式消息传递 基...

  • Celery初体验

    Celery与任务队列 Celery是Python中流行的分布式任务队列。所谓分布式任务队列,是一种将任务分发到不...

  • 分布式任务队列

    什么是分布式任务队列?说白了就是生成并发布任务-》任务存储-》任务接收处理 是不是和分布式消息队列很像?所以说用途...

  • Celery

    Celery 是 Distributed Task Queue 分布式任务队列。 不太理解异步任务队列,看到有人说...

  • Celery入门

    概念 Celery是一个异步任务的调度工具,是Distributed Task Queue,分布式任务队列,分布式...

  • celery--turorial

    Celery Celery (芹菜)是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/...

  • celery redis rabbitMQ各是什么及之间的区别?

    Celery: Celery是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线...

  • Python Celery学习扎记

    Celery是什么 Celery是一个管理分布式队列的工具,它封装好了常见任务队列的各种操作,能够快速进行任务队列...

  • celery学习笔记

    Celery 标签(空格分隔): celery Celery是一个分布式任务队列工具,是一个异步的任务队列基于分布...

  • Celery

    Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worke...

网友评论

    本文标题:python-分布式任务队列

    本文链接:https://www.haomeiwen.com/subject/kqorfftx.html