美文网首页
定时任务、Celery、消息队列

定时任务、Celery、消息队列

作者: 李霖弢 | 来源:发表于2022-06-06 10:05 被阅读0次

python定时任务有以下常见方案

  • python-crontab 系列
    如python-crontab、django-crontab
    封装了Linux提供的crontab命令
    在Linux上需开启crontab,不支持windows,适用于中小型项目
  • apscheduler 系列
    如apscheduler、django-apscheduler、flask-apscheduler
    支持windows和linux,适用于中小型项目
  • Celery 系列
    如celery、django-celery、flask-celery
    支持windows和linux,支持分布式,配置较复杂,适用于大型项目
  • 自建轮子
import os, sys, time, datetime
import threading
import django
base_apth = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# print(base_apth)
# 将项目路径加入到系统path中,这样在导入模型等模块时就不会报模块找不到了
sys.path.append(base_apth)
os.environ['DJANGO_SETTINGS_MODULE'] ='base_django_api.settings' # 注意:base_django_api 是我的模块名,你在使用时需要跟换为你的模块
django.setup()
from base.models import ConfDict

def confdict_handle():
    while True:
        try:
            loca_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            print('本地时间:'+str(loca_time))
            time.sleep(10)
        except Exception as e:
            print('发生错误,错误信息为:', e)
            continue


def main():
    '''
    主函数,用于启动所有定时任务,因为当前定时任务是手动实现,因此可以自由发挥
    '''
    try:
        # 启动定时任务,多个任务时,使用多线程
        task1 = threading.Thread(target=confdict_handle)
        task1.start()
    except Exception as e:
        print('发生异常:%s' % str(e))

if __name__ == '__main__':
    main()

Celery

Celery是一个任务队列管理工具,可用于实现异步接口、定期删除/缓存Redis数据、定期发送消息等。Celery本身不提供消息存储

  • Producer
    生产者,调用Celery的API产生任务并交给任务队列
  • Celery Beat
    任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
  • Brokers
    中间人,指任务队列,仅支持Redis、RabbitMQ。
  • Celery Workers
    消费者,从队列中取出任务并执行。可在多台服务器运行多个消费者提高效率
  • Result Stores / backend
    任务处理完后保存状态信息和结果,可以使用Redis、RabbitMQ或DjangoORM等。


    celery工作流程

消息队列

使用场景
  1. 解耦
    如在订单与库存系统中加入消息队列,使两个系统解耦
  2. 异步任务
    如发送短信、邮件、刷新缓存等
  3. 流量削峰
    如秒杀活动等高并发场景
broker选择
  • Redis
    其list适用于做轻量级的MQ存储,但功能和逻辑需上层应用自行实现
  • RabbitMQ
    使用生产-消费者模式,并引入了Echange(交换器)概念,根据调度策略将生产者的消息转发给符合的Queue,实现解耦

相比于redis的优势:

  1. 发布确认:生产者发布消息给Broker后,会收到Broker的反馈,保证发布成功
  2. 消费确认:消息提交给消费者后,如未成功消费确认,会返回到消息队列
  3. 高可用性:自带集群和负载均衡
  4. 持久化:redis只能将整个数据库持久化,而RabbitMQ可以对每条队列或消息分别设置持久化
基本使用DEMO
  1. 安装redis和celery
    如果celery>=4.0,需要确保redis>=2.10.4
apt-get install redis-server
pip install redis
pip install celery
  1. 建立task
#tasks.py
from celery import Celery
 
app = Celery('tasks',  backend='redis://:yourpassword@localhost:6379/0', broker='redis://:yourpassword@localhost:6379/0') #配置好celery的backend和broker
 
@app.task  #普通函数装饰为 celery task
def add(x, y):
    return x + y

也可以通过app.config_from_object() 加载配置模块:

app.config_from_object('celeryconfig')

# celeryconfig.py
broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
  1. 启动worker(开始从队列中读取任务并执行)
celery -A tasks worker --loglevel=info

可通过celery worker --help查看命令列表,常用包括以下内容:

  • -A, --app
    指定使用的 Celery 实例,必须为module.path:attribute格式,如-A swallow.celery
  • -l, --loglevel [DEBUG(默认)|INFO|WARNING|ERROR|CRITICAL|FATAL]
    日志级别
  • -P, --pool [prefork(默认)|eventlet|gevent|solo|processes|threads]
    并发模式,其默认值prefork在windows上不支持,会报错not enough values to unpack (expected 3, got 0)
    可使用--pool=solo(单进程)或-P solo代替
    也可以使用geventeventlet,但需先用pip下载
  • -c, --concurrency
    同时处理任务的工作进程数量,超出的任务需等待其他任务完成后执行。默认为CPU数
  1. 触发任务
  • 通过delayapply_async直接触发
    当任务完成时result.ready() 为 True,然后用 result.get() 取结果即可。
    确保调用了 result.get()result.forget(),否则资源不会释放
#trigger.py
from tasks import add
result = add.delay(4, 4) #不要直接 add(4, 4),这里需要用 celery 提供的接口 delay 进行调用
while not result.ready():
    time.sleep(1)
print 'task done: {0}'.format(result.get())
  • 通过beat定时触发
    celery beat 是一个调度程序,会定期触发任务
    通过celery -A 【beat实例】 beat启动,如celery -A swallow.celery beat -l info
    此处的crontab模块并未调用系统的crontab,只是同名罢了
#settings.py
from celery.schedules import timedelta, crontab
# 默认使用UTC时区,建议改为`'Asia/Shanghai'`
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULE = {
    "task_every_30_minutes": {# 该key随便起,不需要调用。
        "task": "task_every_30_minutes",# 实际调用的是这个task
        "schedule": timedelta(minutes=30)
    },
    "task_every_day_start": {
        "task": "task_every_day_start",
        "schedule": crontab(minute=0, hour=0)
    },
    "task_every_even_hour": {
        "task": "task_every_even_hour",
        'schedule': crontab(minute=0, hour='0,2,4,6,8,10,12,14,16,18,20,22'),
    },
}

在django中使用celery
  • 独立使用celery
    需进行相关配置,使celery可以调用django中的内容,如django.setup()等。
  • 使用celery + djcelery
    配置更方便

相关文章

  • 定时任务、Celery、消息队列

    python定时任务有以下常见方案 python-crontab 系列如python-crontab、django...

  • 基于celery及redis封装sanic的api

    背景介绍 celery 简介 其实celery不是消息队列,是一任务异步调用及定时任务调用处理的工具,并提供了后端...

  • python-分布式任务队列

    celery 分布式任务队列工具 Celery是一个分布式任务队列工具,是一个异步的任务队列基于分布式消息传递 基...

  • celery 定时任务实现

    Celery 是什么? 异步任务队列工具,主要解决 realtime 事件的异步操作,但也支持定时任务。 什么是异...

  • Celery,Tornado,Supervisor构建和谐的分布

    Celery 分布式的任务队列 与rabbitmq消息队列的区别与联系: rabbitmq 调度的是消息,而Cel...

  • Celery简介

    Celery(芹菜)是一个异步任务队列/基于分布式消息传递的作业队列。 Celery用于生产系统每天处理数以百万计...

  • Python 异步任务队列Celery 使用

    Celery 介绍 celery是处理大量消息的分布式系统 专注于实时处理的异步任务队列 同时支持任务调度 在 P...

  • celery学习笔记

    Celery 标签(空格分隔): celery Celery是一个分布式任务队列工具,是一个异步的任务队列基于分布...

  • Celery初体验

    Celery与任务队列 Celery是Python中流行的分布式任务队列。所谓分布式任务队列,是一种将任务分发到不...

  • 使用Docke配置Rabbitmq及Celery的使用

    消息队列选择 这里对之前celery异步任务的使用做个总结,在生产环境使用celery时,最好选择rabbitmq...

网友评论

      本文标题:定时任务、Celery、消息队列

      本文链接:https://www.haomeiwen.com/subject/ikioprtx.html