使用 Future
concurrent.futures 是 Python 3.2 引入的用于处理并发的模块,它提供一种优雅的用多线程或者多进程实现并发的方式。Python 2 中也有移植的对应包,可以使用如下命令安装:
> pip install futures
concurrent.futures 中包含 ThreadPoolExecutor 和 ProcessPoolExecutor 这两个执行器,分别产生线程池和进程池。我们把清理不可用代理的代码改成使用 ThreadPoolExecutor 模式:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def check_proxy(p):
try:
fetch('http://baidu.com', proxy=p['address'])
except RequestException:
p.delete()
return False
return True
with ThreadPoolExecutor(max_workers=5) as executor:
for p in Proxy.objects.all():
executor.submit(check_proxy, p)
max_workers 参数定义了供执行器使用的线程数量。executor.submit 表示手动提交单个任务,submit 执行的返回值是一个 concurrent.futures.Future 实例,如果调用 Future 实例的 result 方法就可以获得结果,结果被返回之前进程是阻塞的。还可以给 Future 实例添加回调函数:
from functools import partial
def when_done(p, f):
print '[{}]:{}'.format(p.address, 'succeed' if f.result() else 'failure')
def use_thread_pool_executor_with_cb():
with ThreadPoolExecutor(max_workers=5) as executor:
for p in Proxy.objects.all():
result=executor.submit(check_proxy, p)
result.add_done_callback(partial(when_done, p))
Future 类还提供取消任务、检查任务状态(运行中或者已完成)等功能,也可以汇报执行结果或者执行时发生的异常。
这样就能在检查完每个代理时打印出代理是否可用。由于 add_done_callback 方法只接受一个函数,这个函数只接受一个参数,也就是 Future 实例,而我们这个例子传入了两个参数,所以使用了 partial 将 p 预先传到函数中。
ProcessPoolExecutor 和 ThreadPoolExecutor 的接口一样,接下来修改成使用多进程执行器,并用 map 方法并行提交任务:
with ProcessPoolExecutor(max_workers=5) as executor:
executor.map(check_proxy, Proxy.objects.all())
最后使用 Queue 和 ProcessPoolExecutor 改写 13.3 节抓取微信文章的功能。爬取的逻辑还是用之前实现的 save_article_result。由于在 save_article_result 中可能因抓取异常造成重试,超过阈值时会将文章重新放入队列,而在 ThreadPoolExecutor 和 ProcessPoolExecutor 中队列被封装了,之前是我们自己维护队列,现在需要借用封装好的队列,其中 ThreadPoolExecutor 的任务队列存放在 executor._work_queue:
from functools import partial
from concurrent.futures import ThreadPoolExecutor
from save_article_content import save_article_result
from models import Article
with ThreadPoolExecutor(max_workers=5) as executor:
executor.map(partial(save_article_result, queue=executor._work_queue),
Article.objects.all())
上述的实现是不是比之前的版本要简洁得多呢?
如果希望获取执行的结果可以使用 as_completed:
from concurrent.futures import as_completed
with ProcessPoolExecutor(max_workers=5) as executor:
future_tasks={executor.submit(
check_proxy, p):p for p in Proxy.objects.all()}
for future in as_completed(future_tasks):
p=future_tasks[future]
try:
rs=future.result()
except Exception as exc:
print 'Receive exception:{}'.format(exc)
else:
print '[{}]:{}'.format(
p.address, 'succeed' if rs else 'failure')
这样就可以把结果收集起来,不再需要使用两个队列了。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论