Celery - 分布式任务队列
用官方文档的原话说 ,Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。
它是一个任务队列,专注于实时处理,同时还支持任务调度。
Celery是用Python编写的,但协议可以用任何语言实现。
除了 Python 之外,还有 Node.js 和 PHP 客户端。
生产者消费者模式
Celery的架构
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件
celery消息中间件,也就是所谓的中间人,官方支持的两种稳定的消息队列数据库,一个是 RabbitMQ,另一个 Redis。当然选用其他数据库也是可行的,比如, MongoDB 等,用的比较多的当然就是高性能Redis数据库啦,当然MQ也同样强大。 Worker 进程会持续监视队列中是否有需要处理的新任务(如果有就消费,没有则持续监听队列)。
任务执行单元
Worker 是 Celery 提供的任务执行的单元,Worker 并发的运行在分布式的系统节点中,也就是充当了任务工人的角色(消费者),用于系统调度。
在开启中间消息队列之后,任务单元会监听消息队列并从中间件里消费任务,执行任务单元,将结果存入后端数据库。
任务结果存储
Celery支持以不同方式存储任务的结果,后端存储包括 Redis,MongoDB,Mysql 等等。
然后就该配置一些环境依赖了
pip install celery
pip install redis
ubuntu 建议用4.0以上版本 redis>=3.2 否则报错
windows 使用3.x (其实使用4.0 和 低版本的redis也可以的 但是不建议) 当然3.x 对应低版本的redis==2.10.6
接下来就需要测试一下Celery能否正常工作,运行一个简单的爬虫任务感受一下。
# task.py
from celery import Celery
import requests
from lxml import etree
import pymongo
app = Celery('tasks', broker='redis://localhost:6379/2')
client = pymongo.MongoClient('localhost',27017)
db = client['baike']
@app.task
def get_url(link):
item = {}
headers = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36'}
res = requests.get(link,headers=headers)
res.encoding = 'UTF-8'
doc = etree.HTML(res.text)
content = doc.xpath("//div[@class='lemma-summary']/div[@class='para']//text()")
print(res.status_code)
print(link,'\t','++++++++++++++++++++')
item['link'] = link
data = ''.join(content).replace(' ', '').replace('\t', '').replace('\n', '').replace('\r', '')
item['data'] = data
if db['Baike'].insert(dict(item)):
print("OK ...")
else:
print('......')
在 app 的实例中,使用redis数据库作为中间消息队列,mongodb 数据库作为后端存储。并用 app.task 声明一个任务函数。
在终端键入
celery -A task worker -l info -P gevent -c 10
监听 redis 消息队列,-A 参数表示的是 Celery 的名称,这里就是 task.py, worker 是一个执行任务角色,-l 是日志等级,-P 指定并发的方法。
这里使用 gevent 并发10个线程,-c 表示并发个数。
注意:windows下gevent用法不兼容,报错。
开启后的界面是这样的
image调用任务
# run_task.py
from task import get_url
from urls import url_list
def delay(url):
result = get_url.delay(url)
return result
def run():
with open('./url.txt', 'r') as f:
for url in f.readlines():
delay(url.strip('\n'))
if __name__ == '__main__':
run()
任务调度的日志信息会在终端打印出来,说明确实有 worker 得到系统的调度,并从消息队列消费任务,查看 redis 数据库发现已经有了缓存信息。
总之,这一篇先提前体验了一把celery是如何工作的
然而,更详细的,还在后面
欢迎转载,但要声明出处,不然我顺着网线过去就是一拳。
个人技术博客:http://www.gzky.live
网友评论