返回介绍

使用 asyncio

发布于 2025-04-20 18:52:18 字数 10784 浏览 0 评论 0 收藏

Python 2 对协程的支持是通过生成器(Generator)实现的。利用 yield 实现生产者/消费者模型的例子如下(use_yield.py):

import random
 
 
def consumer():
    r=None
    while 1:
        data=yield r
        print 'Consuming:{}'.format(data)
        r=data+1
 
 
def producer(consumer):
    n=3
    consumer.send(None)
    while n:
        data=random.choice(range(10))
        print('Producing:{}'.format(data))
        rs=consumer.send(data)
        print 'Consumer return:{}'.format(rs)
        n-=1
    consumer.close()
 
 
c=consumer()
producer(c)

执行的结果如下:

> python chapter13/section5/use_yield.py
Producing: 0
Consuming: 0
Consumer return: 1
Producing: 4
Consuming: 4
Consumer return: 5
Producing: 6
Consuming: 6
Consumer return: 7

c.send(None) 和 c.next() 的作用一样,都是让协程运行起来,通过之后不断 send 数据给消费者,消费者再通过“yield r”把执行结果返回给发布者这样的方式,实现一个生产/消费循环。

Python 2 内置的支持大抵如此。大量的项目(尤其是网络编程相关的项目)都是通过使用第三方库实现的协程来编写程序,如 Eventlet 和 Gevent。由于 Python 2 语言的局限,协程的实现比较原始,众多第三方库的实现并不统一,并且通常都需要使用类似 Monkey Patch 的技术才能实现非阻塞 I/O 等特性来真正提高性能。

使用 yield 语句只能将 CPU 控制权还给直接调用者,Python 3.3 中添加了“yield from”表达式,允许生成器把它的部分操作委任给另外一个生成器。但是仍有一些缺点:

  • 协程与常规的生成器使用相同语法时容易混淆,尤其对于新的开发者而言。
  • 一个函数是否是协程需要通过主体代码中是否使用了 yield 或者 yield from 语句进行检测,这一点容易在重构中忽略,而导致迷惑和错误。
  • 对异步调用的支持被 yield 的语法限制了,不能使用更多的语法特性,比如 with 和 for。

Python 3.4 中 asyncio 被纳入了标准库,它提供了使用协程编写单线程并发代码,通过 I/O 多路复用访问套接字和其他资源,运行网络客户端和服务器等原语。而 Python 3.5 添加了 async 和 await 这两个关键字。自此,协程成为新的语法,而不再是一种生成器类型了。I/O 多路复用与协程的引入,可以极大提高高负载下程序的 I/O 性能。

async/await

async 用于声明一个协程:

async def foo():
    pass

在普通的函数前加上 async 关键字后,这个函数就变成了一个协程。

await 表示等待另一个协程执行完返回,获取协程执行结果,它必须在协程内才能使用。

Python 3.5 之前协程是这样写的(old_coroutine.py):

import asyncio


@asyncio.coroutine
def slow_operation(n):
    yield from asyncio.sleep(1)
    print('Slow operation{}complete'.format(n))


@asyncio.coroutine
def main():
    yield from asyncio.wait([
        slow_operation(1),
        slow_operation(2),
        slow_operation(3),
    ])
 
  
loop=asyncio.get_event_loop()
loop.run_until_complete(main())

asyncio 事件循环受到了 Tornado 与 Twisted 等的影响,使用事件循环,Tornado、Twisted 以及 Gevent 可以与 asyncio 一起工作。asyncio 还为每个平台选择了最佳的 I/O 机制,比如 UNIX 和 Linux 平台上使用 selectors 库来做系统级别的 I/O 切换。

调用 get_event_loop 将返回默认的事件循环,用于负责所有协程的调度。在大量协程并发执行的过程中,除了在协程中主动使用 await,当本地协程发生 I/O 等待时,调用 asyncio.sleep,程序的控制权也会在不同的协程间切换,从而在 GIL 的限制下实现最大程度的并发执行,不会由于等待 I/O 等原因导致程序阻塞,达到较高的性能。

执行结果如下:

> time python3 chapter13/section5/old_coroutine.py
Slow operation 2 complete
Slow operation 3 complete
Slow operation 1 complete
python3 chapter13/section5/old_coroutine.py 0.10s user 0.05s system 12%cpu 1.201
    total

接下来的例子都使用 Python 3.5.1 执行。

使用 async/await 关键词之后会是这样(new_coroutine.py):

import asyncio


async def slow_operation(n):
    await asyncio.sleep(1)
    print('Slow operation{}complete'.format(n))
    async def main():
        await asyncio.wait([
           slow_operation(1),
           slow_operation(2),
           slow_operation(3),
        ])
 
 
loop=asyncio.get_event_loop()
loop.run_until_complete(main())

可以感受到,async 的使用简化了 asyncio.coroutine;await 的使用简化了 yield from。

除了“async def”,还有“async for”和“async with”关键字。

1.async for:异步迭代器语法。为了支持异步迭代,异步对象需要实现__aiter__方法,异步迭代器需要实现__anext__方法,停止迭代需要在__anext__方法内抛出 StopAsyncIteration 异常(async_for.py)。

import random
import asyncio


class AsyncIterable:
    def __init__(self):
        self.count=0
 
    async def __aiter__(self):
        return self
 
    async def __anext__(self):
        if self.count >= 5:
            raise StopAsyncIteration
        data=await self.fetch_data()
        self.count+=1
        return data

    async def fetch_data(self):
        return random.choice(range(10))
  
async def main():
    async for data in AsyncIterable():
        print(data)
 
loop=asyncio.get_event_loop()
loop.run_until_complete(main())

2.async with:异步上下文管理器语法。为了支持上下文管理器,需要实现__aenter__和__aexit__方法(async_with.py)。

async def log(msg):
    print(msg)
 
 
class AsyncContextManager:
    async def__aenter__(self):
        await log('entering context')
 
    async def__aexit__(self, exc_type, exc, tb):
        await log('exiting context')
 
 
    async def coro():
        async with AsyncContextManager():
            print('body')

除了使用事件循环,采用原来的 send(None) 方式也是可以的:

c=coro()
 
try:
    c.send(None)
except StopIteration:
    print('finished')

它的执行结果如下:

entering context
body
exiting context
finished

Future

Future 是一种异步编程范式,它对异步过程调用的结果做了抽象,它并不关心具体的异步机制。无论是线程、网络,还是 I/O,甚至 RPC,只要是异步过程调用,都可以通过 Future 的概念统一处理。基于 Future 的接口可以简化代码编写,让各种异步操作以一种顺序的、更接近人类逻辑思维的方式编写异步代码。

asyncio.Future 几乎兼容 13.4 节介绍的 concurrent.futures.Future:

> cat chapter13/section5/async_future.py
async def slow_operation(future):
    await asyncio.sleep(1)
    future.set_result('Done!')


loop=asyncio.get_event_loop()
future=asyncio.Future()
print('Future Done:{}'.format(future.done()))
asyncio.ensure_future(slow_operation(future))
loop.run_until_complete(future)
print('Future Done:{}'.format(future.done()))
print(future.result())
loop.close()

它的执行结果如下:

Future Done:False
Future Done:True
Done!

asyncio.Future 也支添加回调函数:

from functools import partial


def set_result(future, result):
    print('Setting future result to{!r}'.format(result))
    future.set_result(result)
 
 
def callback(who, future):
    print('[{}]:returned result:{!r}'.format(who, future.result()))
 

event_loop=asyncio.get_event_loop()
future=asyncio.Future()
future.add_done_callback(partial(callback, 'CB1'))
event_loop.call_soon(set_result, future, 'Done!')
future.add_done_callback(partial(callback, 'CB2'))
event_loop.run_until_complete(future)
event_loop.close()

通过 call_soon 添加修改结果的回调函数,通过 add_done_callback 添加 Future 完成的回调函数,它的执行结果如下:

Setting future result to 'Done!'
[CB1]: returned result: 'Done!'
[CB2]: returned result: 'Done!'

可见添加的回调是一个先进先出(FIFO)的队列,保证了回调的顺序。这里强调一点,Future 的结果在设置之后就不能修改了:

...
future.add_done_callback(partial(callback, 'CB1'))
event_loop.call_soon(set_result, future, 'Done!')
event_loop.call_soon(set_result, future, 'Done again!')
event_loop.run_until_complete(future)

如果再将结果设置为“Done again!”,就会收到 InvalidStateError 错误。

使用 aiohttp

现在把 13.4 节的抓取微信文章页面的爬虫用 asyncio 来实现。首先创建一个 Python 3 的虚拟环境,再安装相关依赖:

> pyvenv-3.5 ~/venv3
> source ~/venv3/bin/activate
> pip install beautifulsoup4 lxml aiohttp mongoengine fake_useragent cchardet

chardet 是一个常用的字符编码检测器,cchardet 是一个更快的实现,可以替代 chardet。

用 aiohttp 替代 requests 作为 HTTP 客户端,先感受下 aiohttp 的用法:

from asyncio import TimeoutError

import aiohttp
from aiohttp import ProxyConnectionError
 

async def fetch(retry=0):
    proxy='http://{}'.format(Proxy.get_random()['address'])
    headers={'user-agent':get_user_agent()}
    conn=aiohttp.ProxyConnector(proxy=proxy)

    url='http://httpbin.org/ip'
 
    try:
        with aiohttp.ClientSession(connector=conn) as session:
            with aiohttp.Timeout(TIMEOUT):
               async with session.get(url, headers=headers) as resp:
                   return await resp.json()
    except (ProxyConnectionError, TimeoutError):
        try:
            p=Proxy.objects.get(address=proxy)
            if p:
                p.delete()
        except DoesNotExist:
            pass
        retry += 1
        if retry > 5:
            raise TimeoutError()
        await asyncio.sleep(1)
        return await fetch(retry=retry)

requests 所支持的常用特性 aiohttp 也都是支持的,只是在用法上有比较大的调整。

使用事件循环调用 fetch 函数:

loop=asyncio.get_event_loop()
f=asyncio.wait([fetch()])
completed, pending=loop.run_until_complete(f)
 
for future in completed:
    print(future.result())

其中使用 asyncio.wait 接受了一个任务列表,run_until_complete 返回的也会是一个任务结果的列表。

aiohttp 也可以用来实现 HTTP 服务,我们实现一个简单的 API 服务(aiohttp_server.py):

import json
import asyncio

import aiohttp
from aiohttp import web
 
REQEUST_URLS=[
    'http://httpbin.org/ip',
    'http://httpbin.org/user-agent',
    'http://httpbin.org/headers'
]


async def handle(request):
    coroutines=[aiohttp.request('get', url) for url in REQEUST_URLS]
 
    results=await asyncio.gather(*coroutines, return_exceptions=True)
    response_data={
        url:not isinstance(result, Exception) and result.status==200
        for url, result in zip(REQEUST_URLS, results)
    }

    body=json.dumps(response_data).encode('utf-8')
    return web.Response(body=body, content_type="application/json")

loop=asyncio.get_event_loop()
app=web.Application(loop=loop)
app.router.add_route('GET', '/', handle)
 
server=loop.create_server(app.make_handler(), '0.0.0.0', 8080)
print('Server started at http://127.0.0.1:8080')
loop.run_until_complete(server)
try:
    loop.run_forever()
except KeyboardInterrupt:
   pass

使用 asyncio.gather 替代 asyncio.wait 并设置 return_exceptions 为 True,会收集到全部内容,而且不会抛出异常,只是返回一个异常的对象。

使用队列

asyncio 同样支持多种原语,如锁(Lock)、事件(Event)、信号量(Semaphore)、条件变量(Condition),当然也支持队列。asyncio 中的队列的设计很像 Queue 模块,但是没有 timeout 参数,只能通过 asyncio.wait_for 在任务超时之后取消任务。队列包含如下三种。

  • Queue:用生产者/消费者模型的队列,适合我们的微信文章抓取队列。
  • PriorityQueue:Queue 的子类,带有优先级的队列。
  • LifoQueue:Queue 的子类,最近添加优先的队列。

改写微信抓取,其实就是做如下四件事。

1.修改使用的标准库模块的引用方法。比如 url 解析相关的函数都放在 urllib.parse 下,需要修改为如下引用:

from urllib.parse import urlparse, urlsplit, parse_qs, urlencode

2.给希望改写成协程的函数添加 async 关键字。

3.在调用协程函数的地方添加 await 关键字。

4.使用事件循环。

看一下修改后的 save_article_result_with_queue 函数:

async def save_article_result_with_queue(queue):
    while 1:
        article=await queue.get()
        if article is None:
            queue.task_done()
            break
        await save_article_result(article, queue)
        queue.task_done()

和之前的处理空队列的逻辑不同,我们没有捕获 Empty 异常然后 break,而是向队列添加一个 None,get 的时候发现消息为 None 就结束循环。这样做的原因是 queue.get 不接受 timeout 参数,这样可以简化实现。

把文章放入队列的操作,我们放在一个协程函数中来做:

async def producer(queue):
    for article in Article.objects.all():
        await queue.put(article)
  
    for i in range(5): # 有几个协程就放为几个为 None 的消息
        await queue.put(None)
 
    await queue.join()

现在使用事件循环:

loop=asyncio.get_event_loop()
queue=asyncio.Queue()
consumers=[
    loop.create_task(save_article_result_with_queue(queue,))
    for i in range(5)
]
prod=loop.create_task(producer(queue))
loop.run_until_complete(
    asyncio.wait(consumers+[prod])
)

create_task 创建的是 asyncio.tasks.Task 对象,它是 asyncio.Future 的子类,相当于 asynio 对 Future 的封装。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。