返回介绍

使用 Future

发布于 2025-04-20 18:52:18 字数 2862 浏览 0 评论 0 收藏

concurrent.futures 是 Python 3.2 引入的用于处理并发的模块,它提供一种优雅的用多线程或者多进程实现并发的方式。Python 2 中也有移植的对应包,可以使用如下命令安装:

> pip install futures

concurrent.futures 中包含 ThreadPoolExecutor 和 ProcessPoolExecutor 这两个执行器,分别产生线程池和进程池。我们把清理不可用代理的代码改成使用 ThreadPoolExecutor 模式:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
 
 
def check_proxy(p):
    try:
        fetch('http://baidu.com', proxy=p['address'])
    except RequestException:
        p.delete()
        return False
    return True
 
 
with ThreadPoolExecutor(max_workers=5) as executor:
     for p in Proxy.objects.all():
         executor.submit(check_proxy, p)

max_workers 参数定义了供执行器使用的线程数量。executor.submit 表示手动提交单个任务,submit 执行的返回值是一个 concurrent.futures.Future 实例,如果调用 Future 实例的 result 方法就可以获得结果,结果被返回之前进程是阻塞的。还可以给 Future 实例添加回调函数:

from functools import partial
 
 
def when_done(p, f):
    print '[{}]:{}'.format(p.address, 'succeed' if f.result() else 'failure')

 
def use_thread_pool_executor_with_cb():
    with ThreadPoolExecutor(max_workers=5) as executor:
        for p in Proxy.objects.all():
            result=executor.submit(check_proxy, p)
            result.add_done_callback(partial(when_done, p))

Future 类还提供取消任务、检查任务状态(运行中或者已完成)等功能,也可以汇报执行结果或者执行时发生的异常。

这样就能在检查完每个代理时打印出代理是否可用。由于 add_done_callback 方法只接受一个函数,这个函数只接受一个参数,也就是 Future 实例,而我们这个例子传入了两个参数,所以使用了 partial 将 p 预先传到函数中。

ProcessPoolExecutor 和 ThreadPoolExecutor 的接口一样,接下来修改成使用多进程执行器,并用 map 方法并行提交任务:

with ProcessPoolExecutor(max_workers=5) as executor:
    executor.map(check_proxy, Proxy.objects.all())

最后使用 Queue 和 ProcessPoolExecutor 改写 13.3 节抓取微信文章的功能。爬取的逻辑还是用之前实现的 save_article_result。由于在 save_article_result 中可能因抓取异常造成重试,超过阈值时会将文章重新放入队列,而在 ThreadPoolExecutor 和 ProcessPoolExecutor 中队列被封装了,之前是我们自己维护队列,现在需要借用封装好的队列,其中 ThreadPoolExecutor 的任务队列存放在 executor._work_queue:

from functools import partial

from concurrent.futures import ThreadPoolExecutor
 
from save_article_content import save_article_result
from models import Article
 
 
with ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(partial(save_article_result, queue=executor._work_queue),
                Article.objects.all())

上述的实现是不是比之前的版本要简洁得多呢?

如果希望获取执行的结果可以使用 as_completed:

from concurrent.futures import as_completed
 

with ProcessPoolExecutor(max_workers=5) as executor:
    future_tasks={executor.submit(
        check_proxy, p):p for p in Proxy.objects.all()}
    for future in as_completed(future_tasks):
        p=future_tasks[future]
        try:
            rs=future.result()
        except Exception as exc:
            print 'Receive exception:{}'.format(exc)
        else:
            print '[{}]:{}'.format(
            p.address, 'succeed' if rs else 'failure')

这样就可以把结果收集起来,不再需要使用两个队列了。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。