美文网首页技术文自动化
Django 进阶之 celery

Django 进阶之 celery

作者: 51reboot | 来源:发表于2018-08-27 09:44 被阅读4次

Django 集成 Celery 到项目:本节将 celery 集成到 Django 项目中,实现异步任务处理和定时任务处理。

Celery工作流程

Celery 的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

消息中间件

Celery 本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ。

任务执行单元

Worker 是 Celery 提供的任务执行的单元,worker 并发的运行在分布式的系统节点中。

任务结果存储

Task result store 用来存储 Worker 执行的任务的结果,Celery支持以不同方式存储任务的结果,包括 AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

1、Celery 安装与配置

在虚拟环境中安装:

pip install django-celery==3.2.2

pip install django-redis

pip install flower # celery 的 web 管理平台(异步任务可视化)

查看集成到 Django 中的 celery 版本, pip freeze

celery==3.1.26.post2 django-celery==3.2.2 flower==0.9.2

启动 redis 服务, 端口假设为 6379

发现 pip 安装比较慢的情况

pip install pillow -i https://pypi.doubanio.com/simple/

2、Django 中配置
(1)在主工程的配置文件 settings.py 中应用注册表 INSTALLED_APPS 中加入 djcelery
INSTALLED_APPS = [
   'django.contrib.admin',
   'django.contrib.auth',
   'django.contrib.contenttypes',
   'django.contrib.sessions',
   'django.contrib.messages',
   'django.contrib.staticfiles',
   'art',
   'xadmin',
   'crispy_forms',
   'DjangoUeditor',
   'djcelery',       #加入djcelery
]

(2)在 settings.py 中加入 celery 配置信息

#############################
# celery 配置信息 start
#############################
import djcelery
djcelery.setup_loader()
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_IMPORTS = ('art.tasks')
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' 
from celery.schedules import crontab
from celery.schedules import timedelta

CELERYBEAT_SCHEDULE = {    #定时器策略
   #定时任务一: 每隔30s运行一次
   u'测试定时器1': {
       "task": "art.tasks.tsend_email",
       #"schedule": crontab(minute='*/2'),  # or 'schedule':   timedelta(seconds=3),
       "schedule":timedelta(seconds=30),
       "args": (),
   },
}
#############################
# celery 配置信息 end
#############################

当 djcelery.setup_loader() 运行时,Celery 便会去查看 INSTALLD_APPS 下包含的所有 app 目录中的 tasks.py 文件,找到标记为task 的方法,将它们注册为 celery task.

BROKER_URL:broker 是代理人,它负责分发任务给 worker 去执行。我使用的是 Redis 作为 broker。没有设置 CELERY_RESULT_BACKEND,默认没有配置,此时 Django 会使用默认的数据库(也是你指定的 orm 数据库)。

CELERY_IMPORTS:是导入目标任务文件。

CELERYBEAT_SCHEDULER:使用了 django-celery 默认的数据库调度模型,任务执行周期都被存在默认指定的 orm 数据库中。

CELERYBEAT_SCHEDULE:设置定时的时间配置, 可以精确到秒,分钟,小时,天,周等。

(3)创建应用实例

在主工程目录添加 celery.py, 添加自动检索 django 工程 tasks 任务

vim artproject/celery.py

#!/usr/bin/env python  
# encoding: utf-8  
#目的是拒绝隐式引入,celery.py和celery冲突。
from __future__ import absolute_import,unicode_literals 
import os
from celery import Celery
from django.conf import settings

# 设置环境变量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "artproject.settings")

#创建celery应用
app = Celery('art_project')
app.config_from_object('django.conf:settings')

#如果在工程的应用中创建了tasks.py模块,那么Celery应用就会自动去检索创建的任务。比如你添加了一个任#务,在django中会实时地检索出来。
app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)

(4)创建任务 tasks

每个任务本质上就是一个函数,在 tasks.py 中,写入你想要执行的函数即可。

在应用 art 中添加我们需要提供的异步服务和定时服务。

vim art/tasks.py

#!/usr/bin/env python  
# encoding: utf-8  
from __future__ import absolute_import
import time
from django.core.mail import send_mail
from celery.utils.log import get_task_logger
from artproject.celery import app

from art.utils.send_mail import pack_html, send_email

@app.task
def tsend_email():
  url = "http://1000phone.com"
  receiver = 'diyuhuan@1000phone.com'
  content = pack_html(receiver, url)
  # content = 'this is email content.'
  send_email(receiver, content)
  print('send email ok!')


@app.task
def add(x, y):
  return x+y

上述我们把异步处理任务 add 和定时器任务 tsend_email 都放在了 tasks.py 中

(5)迁移生成 celery 需要的数据表

python manage.py migrate

此时数据库表结构多出了几个

celery_taskmeta            |
| celery_tasksetmeta         |
| djcelery_crontabschedule   |
| djcelery_intervalschedule  |
| djcelery_periodictask      |
| djcelery_periodictasks     |
| djcelery_taskstate         |
| djcelery_workerstate

3、启动服务,测试

我们可以采用 python manage.py help 发现多出了 celery 相关选项。

(1)启动 django celery 服务

启动服务:

python manage.py celery worker --loglevel=info

此时异步处理和定时处理服务都已经启动了

(2)web 端接口触发异步任务处理

我们在 web 端加入一个入口,触发异步任务处理 add 函数

在应用 art 的 urls.py 中加入如下对应关系

from art.views import add_handler


url(r'^add', add_handler),

art/views.py 中加入处理逻辑

def add_handler(request):
  x = request.GET.get('x', '1')
  y = request.GET.get('y', '1')
  from .tasks import add
  add.delay(int(x), int(y))
  res = {'code':200, 'message':'ok', 'data':[{'x':x, 'y':y}]}
  return HttpResponse(json.dumps(res))

启动 web 服务,通过 url 传入的参数,通过 handler 的 add.delay(x, y) 计算并存入 mysql

http://127.0.0.1:8000/art/add?x=188&y=22

(3)测试定时器,发送邮件

在终端输入 python manage.py celerybeat -l info

会自动触发每隔 30s 执行一次 tsend_email 定时器函数,发送邮件:

CELERYBEAT_SCHEDULE = {    #定时器策略
   #定时任务一: 每隔30s运行一次
   u'测试定时器1': {
       "task": "art.tasks.tsend_email",
       #"schedule": crontab(minute='*/2'),  # or 'schedule': timedelta(seconds=3),
       "schedule":timedelta(seconds=30),
       "args": (),
   },
}

具体发送邮件服务程序见下面的第 4 节

4、邮件发送服务

项目中经常会有定时发送邮件的情形,比如发送数据报告,发送异常服务报告等。

可以编辑文件 art/utils/send_mail.py, 内容编辑如下:

#!/usr/bin/env python
#-*- coding:utf-8 -*-
#written by diyuhuan
#发送邮件(wd_email_check123账号用于内部测试使用,不要用于其他用途)

import smtplib  
from email.mime.multipart import MIMEMultipart  
from email.mime.text import MIMEText  
from email.mime.image import MIMEImage 
from email.header import Header
import time

sender = 'wd_email_check123@163.com'  
subject = u'api开放平台邮箱验证'
smtpserver = 'smtp.163.com'
username = 'wd_email_check123'
password = 'wandacheck1234'
mail_postfix="163.com"

def send_email(receiver, content):
   try:
       me = username+"<"+username+"@"+mail_postfix+">"
       msg = MIMEText(content, 'html', 'utf-8')
       msg['Subject'] = subject
       msg['From'] = sender
       msg['To'] = receiver
       smtp = smtplib.SMTP()  
       smtp.connect(smtpserver)  
       smtp.login(username, password)
       smtp.sendmail(sender, receiver, msg.as_string())  
       smtp.quit()
       return True
   except Exception as e:
       print('send_email has error with : ' + str(e))
       return False


def pack_html(receiver, url):
   html_content = u"<html><div>尊敬的用户<font color='#0066FF'>%s</font> 您好!</div><br>" \
                  "<div>感谢您关注我们的平台 ,我们将为您提供最贴心的服务,祝您购物愉快。</div><br>" \
                  "<div>点击以下链接,即可完成邮箱安全验证:</div><br>"  \
                  "<div><a href='%s'>%s</a></div><br>"  \
                  "<div>为保障您的帐号安全,请在24小时内点击该链接; </div><br>" \
                  "<div>若您没有申请过验证邮箱 ,请您忽略此邮件,由此给您带来的不便请谅解。</div>" \
                  "</html>" % (receiver, url, url)
   html_content = html_content
   return html_content


if __name__ == "__main__":
   url = "http://1000phone.com"
   receiver = 'diyuhuan@1000phone.com'
   #content = pack_html(receiver, url)
   content = 'this is email content. at %s.'%int(time.time())
   send_email(receiver,  content)

至此,在 celery ui 界面可以看到两类,定时器处理和异步处理。

5、启动 flower 服务

python manager celery flower

案例

读书网站实现抢读功能

qd(request, id) :抢读视图函数

quereyQD(request,id) :查询抢读的视图函数

settings.py

INSTALLED_APPS = [
  'djcelery',
]

...
import djcelery

# 装载djcelery对象
djcelery.setup_loader()
# 配置消息中间件的位置
BROKER_URL = 'redis://127.0.0.1:6379/12'

CELERY_TIMEZONE = 'Asia/Shanghai'
# 配置批量调试器
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

在主工程目录添加 celery.py, 添加自动检索 django 工程 tasks任务

celery.py

from __future__ import absolute_import
import os
from celery import Celery

# 设置环境变量
from HArtPro import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'HArtPro.settings')

# 创建Celery对象
app = Celery('hart')

# 加载配置
app.config_from_object('django.conf:settings')

# 自动发现task的异步任务
app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)

当前 app 目录下建立 tasks.py

from MArtPro.celery import app
from utils import redis_cache


@app.task
def advanceArt(artId, userId):
   # 抢读文章(artId 文章id, userId 当前用户登录的Id)
   print('用户', userId, '正在抢读', artId)

   # 判断当前抢读的hash对象AdvanceArt长度是否达到5个
   if redis_cache.hlen('AdvanceArt') >= 5:
       return artId + '抢读失败'

   redis_cache.hset('AdvanceArt', userId, artId)

   return artId + '抢读成功!'
views.py
from redis_ import rd  # rd 对象
from art import tasks

def (request, artId):
   # 抢读
   login_user = request.session.get('login_user')

   if not login_user:
       return JsonResponse({'status': 101,
                            'msg': '亲,请先登录,再抢读,谢谢!'})

   # 判断当前用户是否已抢过
   user_id =json.loads(login_user).get('id')
   if redis_cache.hexists('AdvanceArt', user_id):
       return JsonResponse({'status': 205,
                            'msg': '亲,你只能抢一本'})
   # 任务延迟执行
   tasks.advanceArt.delay(artId, user_id)
   return JsonResponse({'status': 201,
                        'msg': '正在抢读...'})


def queryAdvance(request, artId):
   # 查询抢读是否成功
   login_user = request.session.get('login_user')

   if not login_user:
       return JsonResponse({'status': 101,
                            'msg': '亲,请先登录,再查看抢读,谢谢!'})

   user_id = json.loads(login_user).get('id')

   artId = redis_cache.hget('AdvanceArt', user_id)
   if artId:
       art = Art.objects.get(id=artId.decode())
       return JsonResponse({'status': 200,
                            'msg': '恭喜您,抢读%s 成功'%art.title})
   else:
       if redis_cache.hlen('AdvanceArt')< 5:
           return JsonResponse({'status': 202,
                                'msg': '正在抢读...'})
       else:
           return JsonResponse({'status': 203,
                                'msg': '抢读失败, 请下次碰碰运气!'})

前端通过定时器,每秒执行查询函数

作者:rottengeek
转载|原文链接:https://segmentfault.com/u/rottengeek
(如有侵权,请联系删除)

最新公告通知

第 19 期【Python自动化运维入门】正在火热招生中
第 8 期 【Python自动化运维进阶】正在火热招生中

相关文章

网友评论

    本文标题:Django 进阶之 celery

    本文链接:https://www.haomeiwen.com/subject/kzcziftx.html