美文网首页爬虫相关Python开发程序员
Python异步爬虫试验[Celery,gevent,reque

Python异步爬虫试验[Celery,gevent,reque

作者: spencer404 | 来源:发表于2016-08-20 21:31 被阅读4607次

    以往爬虫都是用自己写的一个爬虫框架,一群Workers去Master那领取任务后开始爬。进程数量等于处理器核心数,通过增开线程数提高爬取速度。
    最近看了Celery,接口真是优美,挺想试验下异步模型来写个爬虫。

    模拟目标

    为了方便测试,用Tornado搭了一个简易的服务器,用来模拟被爬的网站。
    功能很简单,每个请求阻塞6秒才回复

    import tornado.web
    import tornado.ioloop
    import time
    from concurrent.futures import ThreadPoolExecutor
    from tornado.concurrent import run_on_executor
    import tornado.gen
    
    class MainHandler(tornado.web.RequestHandler):
        executor = ThreadPoolExecutor(40)
    
        @tornado.web.asynchronous
        @tornado.gen.coroutine
        def get(self):
            print(time.asctime())
            yield self.sleep(6)
            self.write('from server:' + time.asctime())
            self.finish()
    
        @run_on_executor
        def sleep(self, sec):
            time.sleep(sec)
    
    
    if __name__ == '__main__':
        app = tornado.web.Application(handlers=[
            ('^/.*', MainHandler)
        ])
        app.listen(10240)
        tornado.ioloop.IOLoop.instance().start()
    

    消费者

    task里就一个spider函数,功能是利用gevent去请求给定的目标

    import gevent.monkey
    gevent.monkey.patch_socket()
    
    from celery import Celery
    import socket
    import requests
    import gevent
    
    app = Celery('tasks',
                 broker='redis://127.0.0.1:6379/3',
                 backend='redis://127.0.0.1:6379/3')
    @app.task
    def spider(url):
        resp = gevent.spawn(requests.get, url)
        tmp = 0
        while True:
            print('wait...', tmp)
            if resp.ready():
                return 'from:' + socket.getfqdn() + '\nres:' + str(resp.value.text)
            gevent.sleep(1)
            tmp += 1
    

    用gevent模式启动Celery

    celery worker -A tasks --loglevel info -c 100 -P gevent

    生产者

    利用刚刚编写的spider函数去爬取目标
    测试中,下面代码开了6个进程,结果均在7秒内返回,证明成功了。

    from tasks import spider
    import time
    import random
    
    res = spider.delay('http://127.0.0.1:10240/{}'.format(random.randint(1, 999)))
    i = 0
    while True:
        if res.ready():
            print('res:', res.get())
            break
        else:
            print('wait...', i)
        time.sleep(1)
        i += 1
    

    Celery的部分日志输出:
    可以看出在一个Celery进程内,多个spider函数轮替执行的

    [2016-08-20 21:27:11,281: INFO/MainProcess] Starting new HTTP connection (1): 127.0.0.1
    [2016-08-20 21:27:11,313: INFO/MainProcess] Received task: tasks.spider[7b8b6f63-2bef-491e-a3a8-fdbcff824b9c]
    [2016-08-20 21:27:11,314: WARNING/MainProcess] wait...
    [2016-08-20 21:27:11,314: WARNING/MainProcess] 0
    [2016-08-20 21:27:11,316: INFO/MainProcess] Starting new HTTP connection (1): 127.0.0.1
    [2016-08-20 21:27:11,354: INFO/MainProcess] Received task: tasks.spider[5aa05e65-504d-4a04-8247-3f5708bfa46f]
    [2016-08-20 21:27:11,356: WARNING/MainProcess] wait...
    [2016-08-20 21:27:11,356: WARNING/MainProcess] 0
    [2016-08-20 21:27:11,357: INFO/MainProcess] Starting new HTTP connection (1): 127.0.0.1
    [2016-08-20 21:27:11,821: WARNING/MainProcess] wait...
    [2016-08-20 21:27:11,821: WARNING/MainProcess] 1
    [2016-08-20 21:27:11,989: WARNING/MainProcess] wait...
    [2016-08-20 21:27:11,990: WARNING/MainProcess] 1
    [2016-08-20 21:27:12,059: WARNING/MainProcess] wait...
    [2016-08-20 21:27:12,059: WARNING/MainProcess] 2
    [2016-08-20 21:27:12,208: WARNING/MainProcess] wait...
    [2016-08-20 21:27:12,209: WARNING/MainProcess] 1
    [2016-08-20 21:27:12,225: WARNING/MainProcess] wait...
    [2016-08-20 21:27:12,225: WARNING/MainProcess] 1
    [2016-08-20 21:27:12,246: WARNING/MainProcess] wait...
    [2016-08-20 21:27:12,247: WARNING/MainProcess] 2
    [2016-08-20 21:27:12,282: WARNING/MainProcess] wait...
    [2016-08-20 21:27:12,282: WARNING/MainProcess] 1
    [2016-08-20 21:27:12,316: WARNING/MainProcess] wait...
    [2016-08-20 21:27:12,316: WARNING/MainProcess] 1
    [2016-08-20 21:27:12,357: WARNING/MainProcess] wait...
    [2016-08-20 21:27:12,357: WARNING/MainProcess] 1
    [2016-08-20 21:27:12,823: WARNING/MainProcess] wait...
    [2016-08-20 21:27:12,823: WARNING/MainProcess] 2
    [2016-08-20 21:27:12,991: WARNING/MainProcess] wait...
    [2016-08-20 21:27:12,992: WARNING/MainProcess] 2
    [2016-08-20 21:27:13,061: WARNING/MainProcess] wait...
    [2016-08-20 21:27:13,061: WARNING/MainProcess] 3
    [2016-08-20 21:27:13,210: WARNING/MainProcess] wait...
    [2016-08-20 21:27:13,211: WARNING/MainProcess] 2
    [2016-08-20 21:27:13,227: WARNING/MainProcess] wait...
    [2016-08-20 21:27:13,227: WARNING/MainProcess] 2
    

    最后

    借助Celery,爬虫很容易实现横向扩展,在多台服务器上增加消费者进程即可;
    借助gevent,单进程内requests做到了非阻塞,而我过去是用多线程对付阻塞的。
    Celery,gevent我也是初学一天,这小玩意儿做出来后,得开始看文档了深入了解了!

    相关文章

      网友评论

      本文标题:Python异步爬虫试验[Celery,gevent,reque

      本文链接:https://www.haomeiwen.com/subject/qkmisttx.html