美文网首页
python asyncio并发编程(7)

python asyncio并发编程(7)

作者: TheRightPath1 | 来源:发表于2020-01-22 16:09 被阅读0次

    1.使用aiohttp客户端实现爬虫代码示例

    import asyncio
    import re
    from scrapy import Selector
    import aiohttp
    import aiomysql
    
    
    # 最开始的url
    start_url = 'https://news.sina.com.cn/'
    # 等待爬取的url
    waitting_urls = set()
    # 已经爬取过的url
    already_urls = set()
    # 定义并发的数量为3
    sem = asyncio.Semaphore(3)
    
    
    # 请求url获取源码
    async def fetch(url, session):
        async with sem:
            try:
                # get(url)是耗费网咯IO请求的过程,因此需要使用async with
                async with session.get(url) as response:
                    if response.status == 200:
                        return await response.text()
                    # 必须使用await获取源码
            except Exception as e:
                print(e)
    
    
    async def analysis_title(url, session, pool):
        try:
            html = await fetch(url, session)
            selector = Selector(text=html)
        except Exception as e:
            print(e)
            print(url)
            return ''
        title = selector.xpath('//h1[@class="main-title"]/text()').get()
        if title:
            # 从mysql的连接池中获取一个mysql的连接对象
            async with pool.acquire() as conn:
                async with conn.cursor() as cur:
                    try:
                        insert_sql = 'INSERT INTO `title` VALUES ("{}")'.format(title)
                        await cur.execute(insert_sql)
                        already_urls.add(url)
                    except Exception as e:
                        print(e)
                        print(url)
                        return ''
    
    
    # 从非新闻页的提取所有url并放入waitting_urls
    async def extract_url(url, session):
        html = await fetch(url, session)
        try:
            selector = Selector(text=html)
        except Exception as e:
            print(e)
            print(url)
            print(html)
            return ''
        urls = selector.xpath('//@href').extract()
        for url in urls:
            waitting_urls.add(url)
        already_urls.add(url)
    
    
    # 从waitting_urls中获取url并找出符合要求的url发送给解析方法
    async def consumer(pool):
        # 由于session是可以close的,因此需要使用async with来声明session
        # 为了防止每次请求都建立session而耗费并发, 因此只声明一次session并将其传入其他协程
        async with aiohttp.ClientSession() as session:
            while True:
                if len(waitting_urls) == 0:
                    await asyncio.sleep(2)
                    continue
                url = waitting_urls.pop()
                if url and url not in already_urls and re.findall('/\d{4}-\d{2}-\d{2}/doc.*?\d+\.shtml', url):
                    asyncio.ensure_future(analysis_title(url, session, pool))
                else:
                    asyncio.ensure_future(extract_url(url, session))
    
    
    async def main(loop):
        # 等待mysql连接成功, loop参数是协程中的loop,autocommit表示自动提交
        pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                                          user='root', password='zkr@123...',
                                          db='test', loop=loop, charset='utf8',
                                          autocommit=True)
        async with aiohttp.ClientSession() as session:
            await extract_url(start_url, session)
            # 为了防止每次入库都生成mysql连接池耗费资源,因此将pool连接池传入函数中
        asyncio.ensure_future(consumer(pool))
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        asyncio.ensure_future(main(loop))
        loop.run_forever()
    

    相关文章

      网友评论

          本文标题:python asyncio并发编程(7)

          本文链接:https://www.haomeiwen.com/subject/vgpuzctx.html