美文网首页
python爬虫之celery分布式任务(踩坑)

python爬虫之celery分布式任务(踩坑)

作者: 嗨_小罗哥 | 来源:发表于2019-11-27 15:04 被阅读0次

    一. celery和RabbitMQ简单介绍

    • Celery是一个基于Python开发的分布式异步消息队列,可以轻松实现任务的异步处理。它的基本工作就是管理分配任务到不同的服务器,并且取得结果。
    • RabbitMQ是一个由Erlang语言开发的AMQP的开源实现。AMQP即Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制

    二.分布式任务

    • 注:提前安装好RabbitMQ/redis
      安装RabbitMQ
    • 启动RabbitMQ:systemctl start rabbitmq-server
    • 重启RabbitMQ:systemctl restart rabbitmq-server
    • 关闭RabbitMQ:systemctl stop rabbitmq-server
    • 查看RabbitMQ状态:systemctl status rabbitmq-server
    (1)项目结构
    项目结构
    • celery_test.py:项目主程序,内容如下:
    from __future__ import absolute_import
    from celery import Celery
    app = Celery(include=['tasks'])
    app.config_from_object('celeryconfig')
    if __name__ == '__main__':
        app.start()
    
    
    • (1) "from future import absolute_import"是拒绝隐式引入,因为celery.py的名字和celery的包名冲突,需要使用这条语句让程序正确地运行
    • (2)app是Celery类的实例,创建的时候添加了tasks这个模块,也就是包含了tasks.py这个文件。
    • (3)把Celery配置存放进celeryconfig.py文件,使用app.config_from_object加载配置。
    • celeryconfig为配置文件内容去下:
    # -*- coding: UTF-8 -*-
    from __future__ import absolute_import, unicode_literals
    from datetime import timedelta
    from kombu import Queue, Exchange
    from celery.schedules import crontab
    BROKER_URL='amqp://guest:guest@localhost:5672//'
    # CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任务结果存在了Redis
    #默认celery与broker的连接池连接数
    BROKER_POOL_LIMIT = 10
    
    CELERY_ACKS_LATE = True
    CELERY_IGNORE_RESULT = True
    CELERY_DISABLE_RATE_LIMITS = True
    BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 86400}
    WORKER_MAX_MEMORY_PER_CHILD = 600
    CELERYD_MAX_TASKS_PER_CHILD = 1
    CELERY_TASK_SERIALIZER = 'json'
    #CELERY_RESULT_SERIALIZER = 'json'
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = 'Asia/Shanghai'
    TIME_ZONE = 'Asia/Shanghai'
    # 配置队列
    CELERY_QUEUES = {
            Queue('default', Exchange('default'),routing_key='default'),
            Queue('spider_001', Exchange('spider_001'), routing_key='spider_001'),
            Queue('spider_002', Exchange('spider_002'), routing_key='spider_002'),
            Queue('spider_003', Exchange('spider_003'), routing_key='spider_003'),
    }
    #队列路由
    CELERY_ROUTES = {
        'tasks.daily_spider_001': {'queue': 'spider_001', 'routing_key': 'spider_001'},
        'tasks.daily_spider_002': {'queue': 'spider_002', 'routing_key': 'spider_002'},
        'tasks.daily_spider_003': {'queue': 'spider_003', 'routing_key': 'spider_003'}
    }
    
    # 调度任务/定时任务
    CELERYBEAT_SCHEDULE = {
        'daily_spider_001': {
            'task': 'tasks.daily_spider_001',
            'schedule': timedelta(seconds=10), #每10秒执行一次
            # 'args': (16, 16)
        },
        'daily_spider_002': {
            'task': 'tasks.daily_spider_002',
            'schedule': timedelta(seconds=11), #每11秒执行一次
        },
        'daily_spider_003': {
            'task': 'tasks.daily_spider_003',
            'schedule': timedelta(seconds=12), #每11秒执行一次
        },
    }
    
    • 在celeryconfig.py文件中,首先设置了Broker(RabbitMQ)的URL,接下来定义了三个Message Queue,并且指明了Queue对应的Exchange(当使用Redis作为Broker时,Exchange的名字必须和Queue的名字一样)以及routing_key的值。
      CELERY_QUEUES中的routing_key与CELERY_ROUTES中的routing_key是一一对应的关系 (),

    • 任务调度:Celery的Beat进程自动生成任务
      CELERYBEAT_SCHEDULE 为设置定时任务

    • tasks.py内容如下:定义三个不同功能的函数

    from __future__ import absolute_import
    from celery_test import app
    
    @app.task
    def daily_spider_001():
        return 1 + 2
    
    
    @app.task
    def daily_spider_002():
        return 2 + 2
    
    
    @app.task
    def daily_spider_003():
        return 3 + 2
    

    程序启动:

    • 启动beat程序:
    celery beat -A celery_test
    
    beat启动效果图
    • 启动Worker进程:
    celery -A celery_test worker -l info
    
    任务启动
    • Beat和Worker进程同时启动命令
    celery -B -A celery_test worker -l info
    

    相关文章

      网友评论

          本文标题:python爬虫之celery分布式任务(踩坑)

          本文链接:https://www.haomeiwen.com/subject/liupwctx.html