大规模异步新闻爬虫【6】:用asyncio实现异步爬虫

作者: 一墨编程学习 | 来源:发表于2019-05-06 22:13 被阅读5次

    关于异步IO这个概念,可能有些小猿们不是非常明白,那就先来看看异步IO是怎么回事儿。
    为了大家能够更形象得理解这个概念,我们拿放羊来打个比方:

    • 下载请求开始,就是放羊出去吃草;
    • 下载任务完成,就是羊吃饱回羊圈。

    同步放羊的过程就是这样的:
    羊倌儿小同要放100只羊,他就先放一只羊出去吃草,等羊吃饱了回来在放第二只羊,等第二只羊吃饱了回来再放第三只羊出去吃草……这样放羊的羊倌儿实在是……

    再看看异步放羊的过程:
    羊倌儿小异也要放100只羊,他观察后发现,小同放羊的方法比较笨,他觉得草地一下能容下10只羊(带宽)吃草,所以它就一次放出去10只羊等它们回来,然后他还可以给羊剪剪羊毛。有的羊吃得快回来的早,他就把羊关到羊圈接着就再放出去几只,尽量保证草地上都有10只羊在吃草。

    很明显,异步放羊的效率高多了。同样的,网络世界里也是异步的效率高。

    到了这里,可能有小猿要问,为什么不用多线程、多进程实现爬虫呢? 没错,多线程和多进程也可以提高前面那个同步爬虫的抓取效率,但是异步IO提高的更多,也更适合爬虫这个场景。后面机会我们可以对比一下三者抓取的效率。

    Python学习交流群【 784758214 】内有安装包和学习视频资料,零基础,进阶,解答疑问。希望可以帮助你快速了解Python、学习python

    1. 异步的downloader

    还记得我们之前使用requests实现的那个downloader吗?同步情况下,它很好用,但不适合异步,所以我们要先改造它。幸运的是,已经有aiohttp模块来支持异步http请求了,那么我们就用aiohttp来实现异步downloader。

    async def fetch(session, url, headers=None, timeout=9):
        _headers = {
            'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; '
                           'Windows NT 6.1; Win64; x64; Trident/5.0)'),
        }
        if headers:
            _headers = headers
        try:
            async with session.get(url, headers=_headers, timeout=timeout) as response:
                status = response.status
                html = await response.read()
                encoding = response.get_encoding()
                if encoding == 'gb2312':
                    encoding = 'gbk'
                html = html.decode(encoding, errors='ignore')
                redirected_url = str(response.url)
        except Exception as e:
            msg = 'Failed download: {} | exception: {}, {}'.format(url, str(type(e)), str(e))
            print(msg)
            html = ''
            status = 0
            redirected_url = url
        return status, html, redirected_url
    
    

    这个异步的downloader,我们称之为fetch(),它有两个必须参数:

    • seesion: 这是一个aiohttp.ClientSession的对象,这个对象的初始化在crawler里面完成,每次调用fetch()时,作为参数传递。
    • url:这是需要下载的网址。

    实现中使用了异步上下文管理器(async with),编码的判断我们还是用cchardet来实现。
    有了异步下载器,我们的异步爬虫就可以写起来啦~

    2. 异步新闻爬虫

    跟同步爬虫一样,我们还是把整个爬虫定义为一个类,它的主要成员有:

    • self.urlpool 网址池
    • self.loop 异步的事件循环
    • self.seesion aiohttp.ClientSession的对象,用于异步下载
    • self.db 基于aiomysql的异步数据库连接
    • self._workers 当前并发下载(放出去的羊)的数量

    通过这几个主要成员来达到异步控制、异步下载、异步存储(数据库)的目的,其它成员作为辅助。爬虫类的相关方法,参加下面的完整实现代码:

    #!/usr/bin/env python3
    # File: news-crawler-async.py
    # Author: veelion
    
    import traceback
    import time
    import asyncio
    import aiohttp
    import urllib.parse as urlparse
    import farmhash
    import lzma
    
    import uvloop
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    
    import sanicdb
    
    from urlpool import UrlPool
    import functions as fn
    import config
    
    class NewsCrawlerAsync:
        def __init__(self, name):
            self._workers = 0
            self._workers_max = 30
            self.logger = fn.init_file_logger(name+ '.log')
    
            self.urlpool = UrlPool(name)
    
            self.loop = asyncio.get_event_loop()
            self.session = aiohttp.ClientSession(loop=self.loop)
            self.db = sanicdb.SanicDB(
                config.db_host,
                config.db_db,
                config.db_user,
                config.db_password,
                loop=self.loop
            )
    
        async def load_hubs(self,):
            sql = 'select url from crawler_hub'
            data = await self.db.query(sql)
            self.hub_hosts = set()
            hubs = []
            for d in data:
                host = urlparse.urlparse(d['url']).netloc
                self.hub_hosts.add(host)
                hubs.append(d['url'])
            self.urlpool.set_hubs(hubs, 300)
    
        async def save_to_db(self, url, html):
            urlhash = farmhash.hash64(url)
            sql = 'select url from crawler_html where urlhash=%s'
            d = await self.db.get(sql, urlhash)
            if d:
                if d['url'] != url:
                    msg = 'farmhash collision: %s <=> %s' % (url, d['url'])
                    self.logger.error(msg)
                return True
            if isinstance(html, str):
                html = html.encode('utf8')
            html_lzma = lzma.compress(html)
            sql = ('insert into crawler_html(urlhash, url, html_lzma) '
                   'values(%s, %s, %s)')
            good = False
            try:
                await self.db.execute(sql, urlhash, url, html_lzma)
                good = True
            except Exception as e:
                if e.args[0] == 1062:
                    # Duplicate entry
                    good = True
                    pass
                else:
                    traceback.print_exc()
                    raise e
            return good
    
        def filter_good(self, urls):
            goodlinks = []
            for url in urls:
                host = urlparse.urlparse(url).netloc
                if host in self.hub_hosts:
                    goodlinks.append(url)
            return goodlinks
    
        async def process(self, url, ishub):
            status, html, redirected_url = await fn.fetch(self.session, url)
            self.urlpool.set_status(url, status)
            if redirected_url != url:
                self.urlpool.set_status(redirected_url, status)
            # 提取hub网页中的链接, 新闻网页中也有“相关新闻”的链接,按需提取
            if status != 200:
                return
            if ishub:
                newlinks = fn.extract_links_re(redirected_url, html)
                goodlinks = self.filter_good(newlinks)
                print("%s/%s, goodlinks/newlinks" % (len(goodlinks), len(newlinks)))
                self.urlpool.addmany(goodlinks)
            else:
                await self.save_to_db(redirected_url, html)
            self._workers -= 1
    
        async def loop_crawl(self,):
            await self.load_hubs()
            last_rating_time = time.time()
            counter = 0
            while 1:
                tasks = self.urlpool.pop(self._workers_max)
                if not tasks:
                    print('no url to crawl, sleep')
                    await asyncio.sleep(3)
                    continue
                for url, ishub in tasks.items():
                    self._workers += 1
                    counter += 1
                    print('crawl:', url)
                    asyncio.ensure_future(self.process(url, ishub))
    
                gap = time.time() - last_rating_time
                if gap > 5:
                    rate = counter / gap
                    print('\tloop_crawl() rate:%s, counter: %s, workers: %s' % (round(rate, 2), counter, self._workers))
                    last_rating_time = time.time()
                    counter = 0
                if self._workers > self._workers_max:
                    print('====== got workers_max, sleep 3 sec to next worker =====')
                    await asyncio.sleep(3)
    
        def run(self):
            try:
                self.loop.run_until_complete(self.loop_crawl())
            except KeyboardInterrupt:
                print('stopped by yourself!')
                del self.urlpool
                pass
    
    if __name__ == '__main__':
        nc = NewsCrawlerAsync('yrx-async')
        nc.run()
    
    

    爬虫的主流程是在方法loop_crawl()里面实现的。它的主体是一个while循环,每次从self.urlpool里面获取定量的爬虫作为下载任务(从羊圈里面选出一批羊),通过ensure_future()开始异步下载(把这些羊都放出去)。而process()这个方法的流程是下载网页并存储、提取新的url,这就类似羊吃草、下崽等。

    通过self._workersself._workers_max来控制并发量。不能一直并发,给本地CPU、网络带宽带来压力,同样也会给目标服务器带来压力。

    至此,我们实现了同步和异步两个新闻爬虫,分别实现了NewsCrawlerSync和NewsCrawlerAsync两个爬虫类,他们的结构几乎完全一样,只是抓取流程一个是顺序的,一个是并发的。小猿们可以通过对比两个类的实现,来更好的理解异步的流程。

    爬虫知识点

    1. uvloop模块
    uvloop这个模块是用Cython编写建立在libuv库之上,它是asyncio内置事件循环的替代,使用它仅仅是多两行代码而已:

    import uvloop
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    
    

    uvloop使得asyncio很快,比odejs、gevent和其它Python异步框架的快至少2倍,接近于Go语言的性能。

    uvloop作者的性能测试

    这是uvloop作者的性能对比测试。
    目前,uvloop不支持Windows系统和Python 3.5 及其以上版本,这在它源码的setup.py文件中可以看到:

    if sys.platform in ('win32', 'cygwin', 'cli'):
        raise RuntimeError('uvloop does not support Windows at the moment')
    
    vi = sys.version_info
    if vi < (3, 5):
        raise RuntimeError('uvloop requires Python 3.5 or greater')
    
    

    所以,使用Windows的小猿们要运行异步爬虫,就要把uvloop那两行注释掉哦。

    思考题

    1. 给同步的downloader()或异步的fetch()添加功能
    或许有些小猿还没见过这样的html代码,它出现在<head>里面:

    <meta http-equiv="refresh" content="5; url=https://example.com/">
    
    

    它的意思是,告诉浏览器在5秒之后跳转到另外一个url:https://example.com/
    那么问题来了,请给downloader(fetch())添加代码,让它支持这个跳转。

    2. 如何控制hub的刷新频率,及时发现最新新闻
    这是我们写新闻爬虫要考虑的一个很重要的问题,我们实现的新闻爬虫中并没有实现这个机制,小猿们来思考一下,并对手实现实现。

    后面的章节,是介绍如何使用工具,比如如何使用charles抓包,如何管理浏览器cookie,如何使用selenium等等,也欢迎你的阅读。

    相关文章

      网友评论

        本文标题:大规模异步新闻爬虫【6】:用asyncio实现异步爬虫

        本文链接:https://www.haomeiwen.com/subject/ssjjoqtx.html