4.2 加速爬虫: 异步加载 Asyncio
Last updated
Was this helpful?
Last updated
Was this helpful?
原理: 在单线程里使用异步计算, 下载网页的时候和处理网页的时候是不连续的, 更有效利用了等待下载的这段时间
传统的单线程下载处理网页就像下图(来源)左边蓝色那样, 计算机执行一些代码, 然后等待下载网页, 下好以后, 再执行一些代码… 或者在等待的时候,用另外一个线程执行其他的代码, 这是多线程的手段.
asyncio就像右边,只使用一个线程,但是将这些等待时间统统掐掉,下载都调到后台,这个时间里,执行其他异步的功能,下载好了之后,再调回来接着往下执行
asyncio是一个线程,是在 Python 的功能间切换着执行. 切换的点用await来标记, 能够异步的功能用async标记, 比如async def function():
# 不是异步的
import time
def job(t):
print('Start job ', t)
time.sleep(t) # wait for "t" seconds
print('Job ', t, ' takes ', t, ' s')
def main():
[job(t) for t in range(1, 3)]
t1 = time.time()
main()
print("NO async total time : ", time.time() - t1)
"""
Start job 1
Job 1 takes 1 s
Start job 2
Job 2 takes 2 s
NO async total time : 3.008603096008301
"""
使用 asyncio 的形式, job 1 在等待 time.sleep(t) 结束的时候, 比如是等待一个网页的下载成功, 在这个地方是可以切换给 job 2, 让它开始执行.
import asyncio
async def job(t): # async 形式的功能
print('Start job ', t)
await asyncio.sleep(t) # 等待 "t" 秒, 期间切换其他任务
print('Job ', t, ' takes ', t, ' s')
async def main(loop): # async 形式的功能
tasks = [
loop.create_task(job(t)) for t in range(1, 3)
] # 创建任务, 但是不执行
await asyncio.wait(tasks) # 执行并等待所有任务完成
t1 = time.time()
loop = asyncio.get_event_loop() # 建立 loop
loop.run_until_complete(main(loop)) # 执行 loop
loop.close() # 关闭 loop
print("Async total time : ", time.time() - t1)
"""
Start job 1
Start job 2
Job 1 takes 1 s
Job 2 takes 2 s
Async total time : 2.001495838165283
"""
job 1 触发了 await 的时候就切换到了 job 2 了. 这时, job 1 和 job 2 同时在等待 await asyncio.sleep(t), 所以最终的程序完成时间, 取决于等待最长的 t, 也就是 2秒
import requests
URL = 'https://morvanzhou.github.io/'
def normal():
for i in range(2):
r = requests.get(URL)
url = r.url
print(url)
t1 = time.time()
normal()
print("Normal total time:", time.time()-t1)
"""
https://morvanzhou.github.io/
https://morvanzhou.github.io/
Normal total time: 0.3869960308074951
"""
import aiohttp
async def job(session):
response = await session.get(URL) # 等待并切换
return str(response.url)
async def main(loop):
async with aiohttp.ClientSession() as session: # 官网推荐建立 Session 的形式
tasks = [loop.create_task(job(session)) for _ in range(2)]
finished, unfinished = await asyncio.wait(tasks)
all_results = [r.result() for r in finished] # 获取所有结果
print(all_results)
t1 = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
print("Async total time:", time.time() - t1)
"""
['https://morvanzhou.github.io/', 'https://morvanzhou.github.io/']
Async total time: 0.11447715759277344
"""
asyncio :解析网页还是用的和 multiprocessing 一样的并行处理, 因为asyncio 不支持解析网页的异步, 毕竟是计算密集型工序.
在下载网页时, 不用 multiprocessing, 改用 asyncio, 用一个单线程的东西挑战多进程.
import aiohttp
import asyncio
import time
from bs4 import BeautifulSoup
from urllib.request import urljoin
import re
import multiprocessing as mp
base_url = "https://morvanzhou.github.io/"
seen = set()
unseen = set([base_url])
def parse(html):
soup = BeautifulSoup(html, 'lxml')
urls = soup.find_all('a', {"href": re.compile('^/.+?/$')})
title = soup.find('h1').get_text().strip()
page_urls = set([urljoin(base_url, url['href']) for url in urls])
url = soup.find('meta', {'property': "og:url"})['content']
return title, page_urls, url
async def crawl(url, session):
r = await session.get(url)
html = await r.text()
await asyncio.sleep(0.1) # slightly delay for downloading
return html
async def main(loop):
pool = mp.Pool(8) # slightly affected
async with aiohttp.ClientSession() as session:
count = 1
while len(unseen) != 0:
print('\nAsync Crawling...')
tasks = [loop.create_task(crawl(url, session)) for url in unseen]
finished, unfinished = await asyncio.wait(tasks)
htmls = [f.result() for f in finished]
print('\nDistributed Parsing...')
parse_jobs = [pool.apply_async(parse, args=(html,)) for html in htmls]
results = [j.get() for j in parse_jobs]
print('\nAnalysing...')
seen.update(unseen)
unseen.clear()
for title, page_urls, url in results:
# print(count, title, url)
unseen.update(page_urls - seen)
count += 1
if __name__ == "__main__":
t1 = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
# loop.close()
print("Async total time: ", time.time() - t1)
"""
Async Crawling...
Distributed Parsing...
Analysing...
Async Crawling...
Distributed Parsing...
Analysing...
Async Crawling...
Distributed Parsing...
Analysing...
Async total time: 7.21798300743103
"""
from urllib.request import urlopen, urljoin
from bs4 import BeautifulSoup
import multiprocessing as mp
import re
import time
def crawl(url):
response = urlopen(url)
time.sleep(0.1) # slightly delay for downloading
return response.read().decode()
def parse(html):
soup = BeautifulSoup(html, 'lxml')
urls = soup.find_all('a', {"href": re.compile('^/.+?/$')})
title = soup.find('h1').get_text().strip()
page_urls = set([urljoin(base_url, url['href']) for url in urls])
url = soup.find('meta', {'property': "og:url"})['content']
return title, page_urls, url
if __name__ == '__main__':
# base_url = 'https://morvanzhou.github.io/'
base_url = "http://127.0.0.1:4000/"
# DON'T OVER CRAWL THE WEBSITE OR YOU MAY NEVER VISIT AGAIN
if base_url != "http://127.0.0.1:4000/":
restricted_crawl = True
else:
restricted_crawl = False
unseen = set([base_url,])
seen = set()
pool = mp.Pool(8) # number strongly affected
count, t1 = 1, time.time()
while len(unseen) != 0: # still get some url to visit
if restricted_crawl and len(seen) > 20:
break
print('\nDistributed Crawling...')
crawl_jobs = [pool.apply_async(crawl, args=(url,)) for url in unseen]
htmls = [j.get() for j in crawl_jobs] # request connection
htmls = [h for h in htmls if h is not None] # remove None
print('\nDistributed Parsing...')
parse_jobs = [pool.apply_async(parse, args=(html,)) for html in htmls]
results = [j.get() for j in parse_jobs] # parse html
print('\nAnalysing...')
seen.update(unseen)
unseen.clear()
for title, page_urls, url in results:
# print(count, title, url)
count += 1
unseen.update(page_urls - seen)
print('Total time: %.1f s' % (time.time()-t1, ))
"""
Distributed Crawling...
Distributed Parsing...
Analysing...
Distributed Crawling...
Distributed Parsing...
Analysing...
Distributed Crawling...
Distributed Parsing...
Analysing...
Total time: 11.5 s
"""
Number of Process
Multiprocessing
Asyncio
2
25.5s
7.5s
4
15.4s
7.0s
8
11.5s
7.2s