Python的异步async处理,自3.5之后,基本可以成熟使用了。
使用async def
来定义异步事件,在需要等待耗时任务时,用await
返回,让系统调度其它任务异步执行。
最简async程序(需要Python3.7):
https://docs.python.org/zh-cn/3/library/asyncio.html
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello')
#await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
如果多个协程需要并行执行,则使用asyncio.gather
async def main():
print(f"started at {time.strftime('%X')}")
await asyncio.gather(
say_after(1, 'hello'),
say_after(2, 'world') )
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
注意:异步调用是非常复杂的,你仅在需要时,才使用async/await
使用Pycharm
查看调用:
局部放大:
image.png
异步协程,非常适用于爬虫应用。
对于普通爬虫,不需要Scrapy这么重的框架,我们只需要加入简单的控制并行数量、异常重试等功能就够了。
常用库:
-
aiojobs
简单的并行控制 -
aiohttp
requests的异步版本 -
aiofile
异步读写文件 -
asyncpg
异步连接postgresql -
aioredis
异步连接redis -
uvloop
更快的asyncio实现
实例
输入为urls.txt
文本,每一行是一个url地址:
https://regex101.com/
https://docs.python.org/3/this-url-will-404.html
https://www.nytimes.com/guides/
https://www.mediamatters.org/
https://1.1.1.1/
https://www.politico.com/tipsheets/morning-money
https://www.bloomberg.com/markets/economics
https://www.ietf.org/rfc/rfc2616.txt
爬虫并行访问这些地址fetch_html()
,然后处理parse()
。这里的例子是提取每个地址html文件里,包含的所有url字符串,并行写入到文件foundurls.txt
。
设想我们想控制,爬虫最多同时进行N(N=3)个http get协程: aiojobs.create_scheduler(limit=3)
等待所有任务完成后,程序结束:
while scheduler._jobs:
await asyncio.sleep(1)
await scheduler.close()
完整代码:
#!/usr/bin/env python3
"""Asynchronously get links embedded in multiple pages' HMTL."""
import asyncio
import logging
import re
import sys
from typing import IO
import urllib.error
import urllib.parse
import aiofiles
import aiohttp
import aiojobs as aiojobs
import uvloop
from aiohttp import ClientSession
logging.basicConfig(
format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
level=logging.DEBUG,
datefmt="%H:%M:%S",
stream=sys.stderr,
)
logger = logging.getLogger("areq")
logging.getLogger("chardet.charsetprober").disabled = True
HREF_RE = re.compile(r'href="(.*?)"')
async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
"""GET request wrapper to fetch page HTML.
kwargs are passed to `session.request()`.
"""
resp = await session.request(method="GET", url=url, **kwargs)
resp.raise_for_status()
logger.info("Got response [%s] for URL: %s", resp.status, url)
html = await resp.text()
return html
async def parse(url: str, session: ClientSession, **kwargs) -> set:
"""Find HREFs in the HTML of `url`."""
found = set()
try:
html = await fetch_html(url=url, session=session, **kwargs)
except (
aiohttp.ClientError,
# aiohttp.http_exceptions.HttpProcessingError,
) as e:
logger.error(
"aiohttp exception for %s [%s]: %s",
url,
getattr(e, "status", None),
getattr(e, "message", None),
)
return found
except Exception as e:
logger.exception(
"Non-aiohttp exception occured: %s", getattr(e, "__dict__", {})
)
return found
else:
for link in HREF_RE.findall(html):
try:
abslink = urllib.parse.urljoin(url, link)
except (urllib.error.URLError, ValueError):
logger.exception("Error parsing URL: %s", link)
pass
else:
found.add(abslink)
logger.info("Found %d links for %s", len(found), url)
return found
async def write_one(file: IO, url: str, **kwargs) -> None:
"""Write the found HREFs from `url` to `file`."""
res = await parse(url=url, **kwargs)
if not res:
return None
async with aiofiles.open(file, "a") as f:
for p in res:
await f.write(f"{url}\t{p}\n")
logger.info("Wrote results for source URL: %s", url)
async def bulk_crawl_and_write(file: IO, urlset: set, **kwargs) -> None:
"""Crawl & write concurrently to `file` for multiple `urls`."""
scheduler = await aiojobs.create_scheduler(limit=3)
async with ClientSession() as session:
for url in urlset:
await scheduler.spawn(write_one(file=file, url=url, session=session, **kwargs))
print(f'{scheduler} active_count:{scheduler.active_count} pending_count:{scheduler.pending_count}')
# await asyncio.sleep(3)
while scheduler._jobs:
# print(f'{scheduler} active_count:{scheduler.active_count} pending_count:{scheduler.pending_count}')
await asyncio.sleep(1)
await scheduler.close()
if __name__ == "__main__":
import pathlib
import sys
import time
assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
here = pathlib.Path(__file__).parent
with open(here.joinpath("urls.txt")) as infile:
urls = set(map(str.strip, infile))
outpath = here.joinpath("foundurls.txt")
with open(outpath, "w") as outfile:
outfile.write("source_url\tparsed_url\n")
t0 = time.perf_counter()
asyncio.run(bulk_crawl_and_write(file=outpath, urlset=urls))
print(f'{__file__} finish in {time.perf_counter()-t0:.2f}sec.')
很好的入门tutorial:https://realpython.com/async-io-python/
网友评论