美文网首页
Celery部署爬虫(一)

Celery部署爬虫(一)

作者: 鬼子口音 | 来源:发表于2019-12-30 00:19 被阅读0次
    celery

    Celery - 分布式任务队列

    用官方文档的原话说 ,Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。

    它是一个任务队列,专注于实时处理,同时还支持任务调度。

    Celery是用Python编写的,但协议可以用任何语言实现。

    除了 Python 之外,还有 Node.js 和 PHP 客户端。

    生产者消费者模式

    Celery的架构

    Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

    消息中间件

    celery消息中间件,也就是所谓的中间人,官方支持的两种稳定的消息队列数据库,一个是 RabbitMQ,另一个 Redis。当然选用其他数据库也是可行的,比如, MongoDB 等,用的比较多的当然就是高性能Redis数据库啦,当然MQ也同样强大。 Worker 进程会持续监视队列中是否有需要处理的新任务(如果有就消费,没有则持续监听队列)。

    任务执行单元

    Worker 是 Celery 提供的任务执行的单元,Worker 并发的运行在分布式的系统节点中,也就是充当了任务工人的角色(消费者),用于系统调度。

    在开启中间消息队列之后,任务单元会监听消息队列并从中间件里消费任务,执行任务单元,将结果存入后端数据库。

    任务结果存储

    Celery支持以不同方式存储任务的结果,后端存储包括 Redis,MongoDB,Mysql 等等。

    然后就该配置一些环境依赖了

    pip install celery
    pip install redis
    ubuntu 建议用4.0以上版本 redis>=3.2  否则报错
    windows 使用3.x  (其实使用4.0 和 低版本的redis也可以的 但是不建议) 当然3.x 对应低版本的redis==2.10.6
    

    接下来就需要测试一下Celery能否正常工作,运行一个简单的爬虫任务感受一下。

    # task.py
    from celery import Celery
    import requests
    from lxml import etree
    import pymongo
    app = Celery('tasks', broker='redis://localhost:6379/2')
    client = pymongo.MongoClient('localhost',27017)
    db = client['baike']
    @app.task
    def get_url(link):
        item = {}
        headers = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36'}
        res = requests.get(link,headers=headers)
        res.encoding = 'UTF-8'
        doc = etree.HTML(res.text)
        content = doc.xpath("//div[@class='lemma-summary']/div[@class='para']//text()")
        print(res.status_code)
        print(link,'\t','++++++++++++++++++++')
        item['link'] = link
        data = ''.join(content).replace(' ', '').replace('\t', '').replace('\n', '').replace('\r', '')
        item['data'] = data
        if db['Baike'].insert(dict(item)):
            print("OK ...")
        else:
            print('......')
    

    在 app 的实例中,使用redis数据库作为中间消息队列,mongodb 数据库作为后端存储。并用 app.task 声明一个任务函数。

    在终端键入

    celery -A task worker -l info -P gevent -c 10
    

    监听 redis 消息队列,-A 参数表示的是 Celery 的名称,这里就是 task.py, worker 是一个执行任务角色,-l 是日志等级,-P 指定并发的方法。

    这里使用 gevent 并发10个线程,-c 表示并发个数。

    注意:windows下gevent用法不兼容,报错。

    开启后的界面是这样的

    image

    调用任务

    # run_task.py
    from task import get_url
    from urls import url_list
    
    def delay(url):
        result = get_url.delay(url)
        return result
    
    def run():
        with open('./url.txt', 'r') as f:
            for url in f.readlines():
                delay(url.strip('\n'))
    
    if __name__ == '__main__':
        run()
    

    任务调度的日志信息会在终端打印出来,说明确实有 worker 得到系统的调度,并从消息队列消费任务,查看 redis 数据库发现已经有了缓存信息。

    总之,这一篇先提前体验了一把celery是如何工作的
    然而,更详细的,还在后面

    欢迎转载,但要声明出处,不然我顺着网线过去就是一拳。
    个人技术博客:http://www.gzky.live

    相关文章

      网友评论

          本文标题:Celery部署爬虫(一)

          本文链接:https://www.haomeiwen.com/subject/kfauoctx.html