美文网首页程序员
Python asyncio + aiojobs简单异步框架

Python asyncio + aiojobs简单异步框架

作者: 非梦nj | 来源:发表于2019-03-08 11:39 被阅读0次

Python的异步async处理,自3.5之后,基本可以成熟使用了。
使用async def来定义异步事件,在需要等待耗时任务时,用await返回,让系统调度其它任务异步执行。

最简async程序(需要Python3.7):

https://docs.python.org/zh-cn/3/library/asyncio.html

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    #await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

如果多个协程需要并行执行,则使用asyncio.gather

async def main():
    print(f"started at {time.strftime('%X')}")

    await asyncio.gather(
        say_after(1, 'hello'),
        say_after(2, 'world') )

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

注意:异步调用是非常复杂的,你仅在需要时,才使用async/await
使用Pycharm查看调用:

image.png

局部放大:


image.png

异步协程,非常适用于爬虫应用。
对于普通爬虫,不需要Scrapy这么重的框架,我们只需要加入简单的控制并行数量、异常重试等功能就够了。

常用库:

  • aiojobs 简单的并行控制
  • aiohttp requests的异步版本
  • aiofile 异步读写文件
  • asyncpg 异步连接postgresql
  • aioredis 异步连接redis
  • uvloop 更快的asyncio实现

实例

输入为urls.txt文本,每一行是一个url地址:

https://regex101.com/
https://docs.python.org/3/this-url-will-404.html
https://www.nytimes.com/guides/
https://www.mediamatters.org/
https://1.1.1.1/
https://www.politico.com/tipsheets/morning-money
https://www.bloomberg.com/markets/economics
https://www.ietf.org/rfc/rfc2616.txt

爬虫并行访问这些地址fetch_html(),然后处理parse()。这里的例子是提取每个地址html文件里,包含的所有url字符串,并行写入到文件foundurls.txt

设想我们想控制,爬虫最多同时进行N(N=3)个http get协程: aiojobs.create_scheduler(limit=3)
等待所有任务完成后,程序结束:

while scheduler._jobs:
            await asyncio.sleep(1)
        await scheduler.close()

完整代码:

#!/usr/bin/env python3

"""Asynchronously get links embedded in multiple pages' HMTL."""

import asyncio
import logging
import re
import sys
from typing import IO
import urllib.error
import urllib.parse

import aiofiles
import aiohttp
import aiojobs as aiojobs
import uvloop
from aiohttp import ClientSession

logging.basicConfig(
    format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
    level=logging.DEBUG,
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)
logger = logging.getLogger("areq")
logging.getLogger("chardet.charsetprober").disabled = True

HREF_RE = re.compile(r'href="(.*?)"')


async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
    """GET request wrapper to fetch page HTML.

    kwargs are passed to `session.request()`.
    """

    resp = await session.request(method="GET", url=url, **kwargs)
    resp.raise_for_status()
    logger.info("Got response [%s] for URL: %s", resp.status, url)
    html = await resp.text()
    return html


async def parse(url: str, session: ClientSession, **kwargs) -> set:
    """Find HREFs in the HTML of `url`."""
    found = set()
    try:
        html = await fetch_html(url=url, session=session, **kwargs)
    except (
            aiohttp.ClientError,
            # aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        logger.error(
            "aiohttp exception for %s [%s]: %s",
            url,
            getattr(e, "status", None),
            getattr(e, "message", None),
        )
        return found
    except Exception as e:
        logger.exception(
            "Non-aiohttp exception occured:  %s", getattr(e, "__dict__", {})
        )
        return found
    else:
        for link in HREF_RE.findall(html):
            try:
                abslink = urllib.parse.urljoin(url, link)
            except (urllib.error.URLError, ValueError):
                logger.exception("Error parsing URL: %s", link)
                pass
            else:
                found.add(abslink)
        logger.info("Found %d links for %s", len(found), url)
        return found


async def write_one(file: IO, url: str, **kwargs) -> None:
    """Write the found HREFs from `url` to `file`."""
    res = await parse(url=url, **kwargs)
    if not res:
        return None
    async with aiofiles.open(file, "a") as f:
        for p in res:
            await f.write(f"{url}\t{p}\n")
        logger.info("Wrote results for source URL: %s", url)


async def bulk_crawl_and_write(file: IO, urlset: set, **kwargs) -> None:
    """Crawl & write concurrently to `file` for multiple `urls`."""
    scheduler = await aiojobs.create_scheduler(limit=3)
    async with ClientSession() as session:
        for url in urlset:
            await scheduler.spawn(write_one(file=file, url=url, session=session, **kwargs))
        print(f'{scheduler} active_count:{scheduler.active_count} pending_count:{scheduler.pending_count}')
        # await asyncio.sleep(3)
        while scheduler._jobs:
            # print(f'{scheduler} active_count:{scheduler.active_count} pending_count:{scheduler.pending_count}')
            await asyncio.sleep(1)
        await scheduler.close()

if __name__ == "__main__":
    import pathlib
    import sys
    import time

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    outpath = here.joinpath("foundurls.txt")
    with open(outpath, "w") as outfile:
        outfile.write("source_url\tparsed_url\n")
    t0 = time.perf_counter()
    asyncio.run(bulk_crawl_and_write(file=outpath, urlset=urls))
    print(f'{__file__} finish in {time.perf_counter()-t0:.2f}sec.') 

很好的入门tutorial:https://realpython.com/async-io-python/

相关文章

  • Python asyncio + aiojobs简单异步框架

    Python的异步async处理,自3.5之后,基本可以成熟使用了。使用async def来定义异步事件,在需要等...

  • asyncio+aiohttp异步IO的简单使用

    项目中需要用到Python里的Asyncio模块去异步发生HTTP请求,做一个简单的demo记录。 Asyncio...

  • Python 异步:完整教程

    Asyncio 允许我们在 Python 中使用基于协程的并发异步编程。尽管 asyncio 已经在 Python...

  • Python的5个顶级异步框架

    Python在3.4引入了 asyncio 库,3.6新增了关键字 async和await,此后,异步框架迅速发展...

  • python asyncio构建服务器

    asyncio asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。 asyncio...

  • Python 异步IO - asyncio

    python 异步IO 本文为个人学习python asyncio模块的内容,可以看为python异步编程的入门。...

  • asyncio

    asyncio asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持asyncio的编...

  • asyncio并发编程-上

    asyncio是Python中解决异步I/O高并发的一个模块。 asyncio的事件循环 我们先看下asyncio...

  • Python最新管理工具--pipenv

    前言 之前学习异步asyncio库的时候,因为asyncio库支持Python3.5以上的版本,而我的Ubuntu...

  • Python新利器之pipenv

    前言 之前学习异步asyncio库的时候,因为asyncio库支持Python3.5以上的版本,而我的Ubuntu...

网友评论

    本文标题:Python asyncio + aiojobs简单异步框架

    本文链接:https://www.haomeiwen.com/subject/ehbjuqtx.html