基于:https://www.jianshu.com/p/e2f9eef96f53
手动驱动任务示例,相关的文件主要涉及:
celeryconfig.py # 任务配置文件
__init__.py #实例化celery对象
task1.py #具体的任务实体
task2.py #具体的任务实体
client.py #任务的生产提交客户端
图示:
image.png
init.py 文件内容
#!/usr/bin/evn python
# coding=utf-8
"""
Author = zyx
@Create_Time: 2018/1/11 14:00
@version: v1.0.0
@Contact: 308711822@qq.com
@File: __init__.py.py
@文件功能描述:
"""
from celery import Celery
app = Celery('demo') # 创建 Celery 实例
app.config_from_object('celery_app.celeryconfig') # 通过 Celery 实例加载配置模块
celeryconfig.py文件内容
#!/usr/bin/evn python
# coding=utf-8
"""
Author = zyx
@Create_Time: 2018/1/11 14:01
@version: v1.0.0
@Contact: 308711822@qq.com
@File: celeryconfig.py
@文件功能描述:
"""
BROKER_URL = "redis://localhost:6379/0" # 指定 Broker
CELERY_RESULT_BACKEND = "redis://localhost:6379/1" # 指定 Backend
CELERY_TIMEZONE = 'Asia/Shanghai' # 指定时区,默认是 UTC
# CELERY_TIMEZONE='UTC'
CELERY_IMPORTS = ( # 指定导入的任务模块
'celery_app.task1',
'celery_app.task2'
)
task1.py文件内容
#!/usr/bin/evn python
# coding=utf-8
"""
Author = zyx
@Create_Time: 2018/1/11 14:53
@version: v1.0.0
@Contact: 308711822@qq.com
@File: task1.py.py
@文件功能描述:
"""
import time
from celery_app import app
@app.task
def add(x, y):
time.sleep(2)
return x + y
task2.py文件内容
#!/usr/bin/evn python
# coding=utf-8
"""
Author = zyx
@Create_Time: 2018/1/11 15:01
@version: v1.0.0
@Contact: 308711822@qq.com
@File: task2.py.py
@文件功能描述:
"""
import time
from celery_app import app
@app.task
def multiply(x, y):
time.sleep(2)
return x * y
client.py文件内容
#!/usr/bin/evn python
# coding=utf-8
# + + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + +
"""
Author = zyx
@Create_Time: 2018/1/11 15:02
@version: v1.0.0
@Contact: 308711822@qq.com
@File: client.py.py
@文件功能描述:
"""
import datetime
from datetime import timedelta
from celery_app import task1
from celery_app import task2
task1.add.apply_async(args=[2, 8]) # 也可用 task1.add.delay(2, 8)
task2.multiply.apply_async(args=[3, 7]) # 也可用 task2.multiply.delay(3, 7)
print('hello world')
# countdown:指定多少秒后执行任务
task1.add.apply_async(args=(2, 23), countdown=5) # 5 秒后执行任务
task1.add.apply_async(args=[6, 7], expires=10) # 10 秒后过期
# 当前 UTC 时间再加 10 秒后执行任务
# task1.add.multiply.apply_async(args=[8, 7], eta=datetime.utcnow() + timedelta(seconds=10))
# task1.add.apply_async(args=[8, 7], eta=datetime.utcnow() + timedelta(seconds=10))
# expires:任务过期时间,参数类型可以是 int,也可以是 datetime
# task1.add.multiply.apply_async(args=[6, 7], expires=10) # 10 秒后过期
# task1.add.apply_async(args=[6, 7], expires=10) # 10 秒后过期
运行的方式:
1:进入到目录:
cd celery_demo2
2: 启动 Celery Worker 进程,在项目的根目录下执行下面命令:
celery_demo2 $ celery -A celery_app worker --loglevel=info
3: 运行客户端:
运行client.py,查看对应的调式的信息
bast定时任务驱动任务示例,相关的文件主要涉及:
celeryconfig.py # 任务配置文件
task1.py #具体的任务实体
task2.py #具体的任务实体
image.png
celeryconfig.py文件内容
#!/usr/bin/evn python
# coding=utf-8
"""
Author = zyx
@Create_Time: 2018/1/11 15:35
@version: v1.0.0
@Contact: 308711822@qq.com
@File: celeryconfig.py.py
@文件功能描述:
"""
from datetime import timedelta
from celery.schedules import crontab
# Broker and Backend
BROKER_URL = "redis://localhost:6379/2" # 指定 Broker
# CELERY_RESULT_BACKEND = "redis://localhost:6379/3" # 指定 Backend
# Timezone
CELERY_TIMEZONE = 'Asia/Shanghai' # 指定时区,不指定默认为 'UTC'
# CELERY_TIMEZONE='UTC'
# import
CELERY_IMPORTS = (
'celery_app.task1',
'celery_app.task2'
)
# schedules
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'celery_app.task1.add',
'schedule': timedelta(seconds=3), # 每 30 秒执行一次
'args': (5, 8) # 任务函数参数
},
'multiply-at-some-time': {
'task': 'celery_app.task2.multiply',
'schedule': crontab(hour=9, minute=50), # 每天早上 9 点 50 分执行一次
'args': (3, 7) # 任务函数参数
}
}
task1.py文件内容
#!/usr/bin/evn python
# coding=utf-8
"""
Author = zyx
@Create_Time: 2018/1/11 15:39
@version: v1.0.0
@Contact: 308711822@qq.com
@File: task1.py.py
@文件功能描述:
"""
import time
from celery_app import app
@app.task
def add(x, y):
time.sleep(2)
return x + y
task2.py文件内容
#!/usr/bin/evn python
# coding=utf-8
# + + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + +
# ┏┓ ┏┓+ +
# ┏┛┻━━━┛┻┓ + +
# ┃ ┃
# ┃ ━ ┃ ++ + + +
# ████━████ ┃+
# ┃ ┃ +
# ┃ ┻ ┃
# ┃ ┃ + +
# ┗━┓ ┏━┛
# ┃ ┃
# ┃ ┃ + + + +
# ┃ ┃ Codes are far away from bugs with the animal protecting
# ┃ ┃ + 神兽保佑,代码无bug
# ┃ ┃
# ┃ ┃ +
# ┃ ┗━━━┓ + +
# ┃ ┣┓
# ┃ ┏┛
# ┗┓┓┏━┳┓┏┛ + + + +
# ┃┫┫ ┃┫┫
# ┗┻┛ ┗┻┛+ + + +
# + + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + +"""
"""
Author = zyx
@Create_Time: 2018/1/11 15:39
@version: v1.0.0
@Contact: 308711822@qq.com
@File: task2.py.py
@文件功能描述:
"""
import time
from celery_app import app
@app.task
def multiply(x, y):
time.sleep(2)
return x * y
运行的方式:
1:进入到目录
cd celery_periodic_tasks
2: 启动 Celery Beat 进程,定时将任务发送到 Broker,在项目根目录下执行下面命令:
celery_periodic_tasks $ celery beat -A celery_app
3: 启动 Celery Worker 进程,在项目的根目录下执行下面命令:
celery_periodic_tasks $ celery -A celery_app worker --loglevel=info
网友评论