使用 asyncio
Python 2 对协程的支持是通过生成器(Generator)实现的。利用 yield 实现生产者/消费者模型的例子如下(use_yield.py):
import random def consumer(): r=None while 1: data=yield r print 'Consuming:{}'.format(data) r=data+1 def producer(consumer): n=3 consumer.send(None) while n: data=random.choice(range(10)) print('Producing:{}'.format(data)) rs=consumer.send(data) print 'Consumer return:{}'.format(rs) n-=1 consumer.close() c=consumer() producer(c)
执行的结果如下:
> python chapter13/section5/use_yield.py Producing: 0 Consuming: 0 Consumer return: 1 Producing: 4 Consuming: 4 Consumer return: 5 Producing: 6 Consuming: 6 Consumer return: 7
c.send(None) 和 c.next() 的作用一样,都是让协程运行起来,通过之后不断 send 数据给消费者,消费者再通过“yield r”把执行结果返回给发布者这样的方式,实现一个生产/消费循环。
Python 2 内置的支持大抵如此。大量的项目(尤其是网络编程相关的项目)都是通过使用第三方库实现的协程来编写程序,如 Eventlet 和 Gevent。由于 Python 2 语言的局限,协程的实现比较原始,众多第三方库的实现并不统一,并且通常都需要使用类似 Monkey Patch 的技术才能实现非阻塞 I/O 等特性来真正提高性能。
使用 yield 语句只能将 CPU 控制权还给直接调用者,Python 3.3 中添加了“yield from”表达式,允许生成器把它的部分操作委任给另外一个生成器。但是仍有一些缺点:
- 协程与常规的生成器使用相同语法时容易混淆,尤其对于新的开发者而言。
- 一个函数是否是协程需要通过主体代码中是否使用了 yield 或者 yield from 语句进行检测,这一点容易在重构中忽略,而导致迷惑和错误。
- 对异步调用的支持被 yield 的语法限制了,不能使用更多的语法特性,比如 with 和 for。
Python 3.4 中 asyncio 被纳入了标准库,它提供了使用协程编写单线程并发代码,通过 I/O 多路复用访问套接字和其他资源,运行网络客户端和服务器等原语。而 Python 3.5 添加了 async 和 await 这两个关键字。自此,协程成为新的语法,而不再是一种生成器类型了。I/O 多路复用与协程的引入,可以极大提高高负载下程序的 I/O 性能。
async/await
async 用于声明一个协程:
async def foo(): pass
在普通的函数前加上 async 关键字后,这个函数就变成了一个协程。
await 表示等待另一个协程执行完返回,获取协程执行结果,它必须在协程内才能使用。
Python 3.5 之前协程是这样写的(old_coroutine.py):
import asyncio @asyncio.coroutine def slow_operation(n): yield from asyncio.sleep(1) print('Slow operation{}complete'.format(n)) @asyncio.coroutine def main(): yield from asyncio.wait([ slow_operation(1), slow_operation(2), slow_operation(3), ]) loop=asyncio.get_event_loop() loop.run_until_complete(main())
asyncio 事件循环受到了 Tornado 与 Twisted 等的影响,使用事件循环,Tornado、Twisted 以及 Gevent 可以与 asyncio 一起工作。asyncio 还为每个平台选择了最佳的 I/O 机制,比如 UNIX 和 Linux 平台上使用 selectors 库来做系统级别的 I/O 切换。
调用 get_event_loop 将返回默认的事件循环,用于负责所有协程的调度。在大量协程并发执行的过程中,除了在协程中主动使用 await,当本地协程发生 I/O 等待时,调用 asyncio.sleep,程序的控制权也会在不同的协程间切换,从而在 GIL 的限制下实现最大程度的并发执行,不会由于等待 I/O 等原因导致程序阻塞,达到较高的性能。
执行结果如下:
> time python3 chapter13/section5/old_coroutine.py Slow operation 2 complete Slow operation 3 complete Slow operation 1 complete python3 chapter13/section5/old_coroutine.py 0.10s user 0.05s system 12%cpu 1.201 total
接下来的例子都使用 Python 3.5.1 执行。
使用 async/await 关键词之后会是这样(new_coroutine.py):
import asyncio async def slow_operation(n): await asyncio.sleep(1) print('Slow operation{}complete'.format(n)) async def main(): await asyncio.wait([ slow_operation(1), slow_operation(2), slow_operation(3), ]) loop=asyncio.get_event_loop() loop.run_until_complete(main())
可以感受到,async 的使用简化了 asyncio.coroutine;await 的使用简化了 yield from。
除了“async def”,还有“async for”和“async with”关键字。
1.async for:异步迭代器语法。为了支持异步迭代,异步对象需要实现__aiter__方法,异步迭代器需要实现__anext__方法,停止迭代需要在__anext__方法内抛出 StopAsyncIteration 异常(async_for.py)。
import random import asyncio class AsyncIterable: def __init__(self): self.count=0 async def __aiter__(self): return self async def __anext__(self): if self.count >= 5: raise StopAsyncIteration data=await self.fetch_data() self.count+=1 return data async def fetch_data(self): return random.choice(range(10)) async def main(): async for data in AsyncIterable(): print(data) loop=asyncio.get_event_loop() loop.run_until_complete(main())
2.async with:异步上下文管理器语法。为了支持上下文管理器,需要实现__aenter__和__aexit__方法(async_with.py)。
async def log(msg): print(msg) class AsyncContextManager: async def__aenter__(self): await log('entering context') async def__aexit__(self, exc_type, exc, tb): await log('exiting context') async def coro(): async with AsyncContextManager(): print('body')
除了使用事件循环,采用原来的 send(None) 方式也是可以的:
c=coro() try: c.send(None) except StopIteration: print('finished')
它的执行结果如下:
entering context body exiting context finished
Future
Future 是一种异步编程范式,它对异步过程调用的结果做了抽象,它并不关心具体的异步机制。无论是线程、网络,还是 I/O,甚至 RPC,只要是异步过程调用,都可以通过 Future 的概念统一处理。基于 Future 的接口可以简化代码编写,让各种异步操作以一种顺序的、更接近人类逻辑思维的方式编写异步代码。
asyncio.Future 几乎兼容 13.4 节介绍的 concurrent.futures.Future:
> cat chapter13/section5/async_future.py async def slow_operation(future): await asyncio.sleep(1) future.set_result('Done!') loop=asyncio.get_event_loop() future=asyncio.Future() print('Future Done:{}'.format(future.done())) asyncio.ensure_future(slow_operation(future)) loop.run_until_complete(future) print('Future Done:{}'.format(future.done())) print(future.result()) loop.close()
它的执行结果如下:
Future Done:False Future Done:True Done!
asyncio.Future 也支添加回调函数:
from functools import partial def set_result(future, result): print('Setting future result to{!r}'.format(result)) future.set_result(result) def callback(who, future): print('[{}]:returned result:{!r}'.format(who, future.result())) event_loop=asyncio.get_event_loop() future=asyncio.Future() future.add_done_callback(partial(callback, 'CB1')) event_loop.call_soon(set_result, future, 'Done!') future.add_done_callback(partial(callback, 'CB2')) event_loop.run_until_complete(future) event_loop.close()
通过 call_soon 添加修改结果的回调函数,通过 add_done_callback 添加 Future 完成的回调函数,它的执行结果如下:
Setting future result to 'Done!' [CB1]: returned result: 'Done!' [CB2]: returned result: 'Done!'
可见添加的回调是一个先进先出(FIFO)的队列,保证了回调的顺序。这里强调一点,Future 的结果在设置之后就不能修改了:
... future.add_done_callback(partial(callback, 'CB1')) event_loop.call_soon(set_result, future, 'Done!') event_loop.call_soon(set_result, future, 'Done again!') event_loop.run_until_complete(future)
如果再将结果设置为“Done again!”,就会收到 InvalidStateError 错误。
使用 aiohttp
现在把 13.4 节的抓取微信文章页面的爬虫用 asyncio 来实现。首先创建一个 Python 3 的虚拟环境,再安装相关依赖:
> pyvenv-3.5 ~/venv3 > source ~/venv3/bin/activate > pip install beautifulsoup4 lxml aiohttp mongoengine fake_useragent cchardet
chardet 是一个常用的字符编码检测器,cchardet 是一个更快的实现,可以替代 chardet。
用 aiohttp 替代 requests 作为 HTTP 客户端,先感受下 aiohttp 的用法:
from asyncio import TimeoutError import aiohttp from aiohttp import ProxyConnectionError async def fetch(retry=0): proxy='http://{}'.format(Proxy.get_random()['address']) headers={'user-agent':get_user_agent()} conn=aiohttp.ProxyConnector(proxy=proxy) url='http://httpbin.org/ip' try: with aiohttp.ClientSession(connector=conn) as session: with aiohttp.Timeout(TIMEOUT): async with session.get(url, headers=headers) as resp: return await resp.json() except (ProxyConnectionError, TimeoutError): try: p=Proxy.objects.get(address=proxy) if p: p.delete() except DoesNotExist: pass retry += 1 if retry > 5: raise TimeoutError() await asyncio.sleep(1) return await fetch(retry=retry)
requests 所支持的常用特性 aiohttp 也都是支持的,只是在用法上有比较大的调整。
使用事件循环调用 fetch 函数:
loop=asyncio.get_event_loop() f=asyncio.wait([fetch()]) completed, pending=loop.run_until_complete(f) for future in completed: print(future.result())
其中使用 asyncio.wait 接受了一个任务列表,run_until_complete 返回的也会是一个任务结果的列表。
aiohttp 也可以用来实现 HTTP 服务,我们实现一个简单的 API 服务(aiohttp_server.py):
import json import asyncio import aiohttp from aiohttp import web REQEUST_URLS=[ 'http://httpbin.org/ip', 'http://httpbin.org/user-agent', 'http://httpbin.org/headers' ] async def handle(request): coroutines=[aiohttp.request('get', url) for url in REQEUST_URLS] results=await asyncio.gather(*coroutines, return_exceptions=True) response_data={ url:not isinstance(result, Exception) and result.status==200 for url, result in zip(REQEUST_URLS, results) } body=json.dumps(response_data).encode('utf-8') return web.Response(body=body, content_type="application/json") loop=asyncio.get_event_loop() app=web.Application(loop=loop) app.router.add_route('GET', '/', handle) server=loop.create_server(app.make_handler(), '0.0.0.0', 8080) print('Server started at http://127.0.0.1:8080') loop.run_until_complete(server) try: loop.run_forever() except KeyboardInterrupt: pass
使用 asyncio.gather 替代 asyncio.wait 并设置 return_exceptions 为 True,会收集到全部内容,而且不会抛出异常,只是返回一个异常的对象。
使用队列
asyncio 同样支持多种原语,如锁(Lock)、事件(Event)、信号量(Semaphore)、条件变量(Condition),当然也支持队列。asyncio 中的队列的设计很像 Queue 模块,但是没有 timeout 参数,只能通过 asyncio.wait_for 在任务超时之后取消任务。队列包含如下三种。
- Queue:用生产者/消费者模型的队列,适合我们的微信文章抓取队列。
- PriorityQueue:Queue 的子类,带有优先级的队列。
- LifoQueue:Queue 的子类,最近添加优先的队列。
改写微信抓取,其实就是做如下四件事。
1.修改使用的标准库模块的引用方法。比如 url 解析相关的函数都放在 urllib.parse 下,需要修改为如下引用:
from urllib.parse import urlparse, urlsplit, parse_qs, urlencode
2.给希望改写成协程的函数添加 async 关键字。
3.在调用协程函数的地方添加 await 关键字。
4.使用事件循环。
看一下修改后的 save_article_result_with_queue 函数:
async def save_article_result_with_queue(queue): while 1: article=await queue.get() if article is None: queue.task_done() break await save_article_result(article, queue) queue.task_done()
和之前的处理空队列的逻辑不同,我们没有捕获 Empty 异常然后 break,而是向队列添加一个 None,get 的时候发现消息为 None 就结束循环。这样做的原因是 queue.get 不接受 timeout 参数,这样可以简化实现。
把文章放入队列的操作,我们放在一个协程函数中来做:
async def producer(queue): for article in Article.objects.all(): await queue.put(article) for i in range(5): # 有几个协程就放为几个为 None 的消息 await queue.put(None) await queue.join()
现在使用事件循环:
loop=asyncio.get_event_loop() queue=asyncio.Queue() consumers=[ loop.create_task(save_article_result_with_queue(queue,)) for i in range(5) ] prod=loop.create_task(producer(queue)) loop.run_until_complete( asyncio.wait(consumers+[prod]) )
create_task 创建的是 asyncio.tasks.Task 对象,它是 asyncio.Future 的子类,相当于 asynio 对 Future 的封装。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论