使用多线程
讨论 Python 程序的效率时,经常提到的弊端之一是全局解释器锁(Global Interpreter Lock,GIL),GIL 限制任何时间都仅有一个线程在执行。但是存在即合理,由于线程是轻量级的,并且相互之间易于通信,GIL 保护了所有全局的解释器和环境状态变量。如果不使用 GIL 会带来了包括死锁、争用条件和高复杂性在内的各种问题。那么由于 GIL 的限制,使用多线程就完全没有意义了吗?其实不是的,原因如下:
1.Python 大部分是使用 C 语言编写的,一些标准库和第三方的模块也是用 C 编写的,而 C 语言代码是可以获取和释放 GIL 的,这在一定程度上缓解了 GIL 的问题。一些重要的、需要更高运行效率的模块还可以使用 C 语言编写,这样就可以利用更多的 CPU 资源。
2.多线程适合解决由于网络、磁盘等资源造成的 I/O 阻塞问题,它利用了等待 I/O 请求完成被阻塞而导致的 CPU 空闲时间。计算密集型的工作不适合使用多线程完成,因为计算需要 CPU 资源,如果 CPU 繁忙的话,使用多线程并没有更多地利用 CPU 空闲时间,反而由于 CPU 需要额外调度多线程,以及线程切换的各种开销,多线程的运行效率比单线程还要慢。
抓取 HTTP 代理列表,主要是网络请求,几乎没有计算相关的工作,适合使用多线程。在开始使用多线程之前,先添加基本设置到 config.py 中:
import re PROXY_SITES=[ 'http://xxx.com', ... ] REFERER_LIST=[ 'http://www.google.com/', ... ]
获得 UA 和 Referer 以及请求的函数(utils.py):
import random import requests from fake_useragent import UserAgent from config import REFERER_LIST, TIMEOUT def get_referer(): return random.choice(REFERER_LIST) def get_user_agent(): ua=UserAgent() return ua.random
fake_useragent 在第一次启动的时候会将 http://useragentstring.com 网站上列出来的 UA 缓存到/tmp/fake_useragent.json,这个过程可能需要代理才能完成,可以使用项目源码目录下已经保存的文件:
> cp ~/web_develop/data/fake_useragent.json /tmp
添加代理的模型 Proxy:
class Proxy(BaseModel): address=StringField(unique=True) # 保证地址唯一 meta={'collection':'proxy'} # 指定 Model 使用的集合(表)名
如果爬取非常多的代理网站,是不是要针对每个网站都写一个解析 HTML 或者代理列表呢?其实不用,直接解析 HTML 页面中的代理地址就好了。不需要那么严格地匹配,因为之后
我们还会验证这些地址,如果该代理地址不能使用,会自动把它从数据库中删掉的。匹配代理地址的正则表达式是:
PROXY_REGEX=re.compile('[0-9]+(?:\.[0-9]+){3}:\d{2,4}')
HTTP 请求函数如下:
def fetch(url, proxy=None): s=requests.Session() s.headers.update({'user-agent':get_user_agent()}) proxies=None if proxy is not None: proxies={ 'http':proxy, } return s.get(url, timeout=TIMEOUT, proxies=proxies)
本节未使用代理,proxy 参数为默认的 None。
保存代理的函数如下:
def save_proxies(url): try: r=fetch(url) except requests.exceptions.RequestException: return False addresses=re.findall(PROXY_REGEX, r.text) for address in addresses: proxy=Proxy(address=address) try: proxy.save() except NotUniqueError: pass
有可能多个代理网站上都列出了某个代理地址,所以在上面的实现中忽略了 NotUniqueError 这种异常。这里强调一点,应该指定捕获的异常的类型,不应该使用模糊的捕获,如下例:
try: l[0] except: pass
应该使用:
try: l[0] except IndexError: pass
因为 l[0]只可能产生 IndexError 这个异常。作为开发者,你应该明确地知道会出现哪些类型的异常,把它们都列出来。使用异常的基类可以指定更少的错误类型,比如上面用到的 RequestException,requests 的异常都以它为基类,在抓取的时候捕捉由 requests 发出的全部异常,不管它是 Timeout 还是 ConnectionError。但是使用 Exception 并不是一个好的习惯,虽然它是一般自定义异常的基类,但是可能会把一些非预期产生的错误忽略掉,除非你明确知道这样做的后果。
为了确保执行的前提环境一致,抓取前我们都会清空 Proxy 表:
def cleanup(): Proxy.drop_collection()
代理有时效性,现在可用的代理可能过一会儿就不可用了。应该周期性替换成最新的代理。
如果不使用多线程,可以用一个 for 循环,挨个网站地抓取:
def non_thread(): cleanup() for url in PROXY_SITES: save_proxies(url)
这种串行执行的效率很低:
In : from proxy_fetcher import non_thread In : time non_thread() CPU times:user 1.32 s, sys:0 ns, total:1.32 s Wall time:15.6 s In : from models import Proxy In : Proxy.objects.count() Out: 376
看一下使用多线程的例子:
def use_thread(): cleanup() threads=[] for url in PROXY_SITES: t=threading.Thread(target=save_proxies, args=(url,)) t.setDaemon(True) threads.append(t) t.start() for t in threads: t.join() # 等到线程都结束,再退出主程序
看一下效果,注意看花费的时间:
In : time use_thread() CPU times: user 1.51 s, sys: 148 ms, total: 1.66 s Wall time: 5.42 s
可以感受到速度提高了 3 倍,但是这个例子有一个不好的用法:一个站点就创建一个线程,如果站点很多就需要创建很多个线程,而创建和销毁线程是一个比较重的开销。可以考虑使用线程池,重用线程池中的线程:
from multiprocessing.dummy import Pool as ThreadPool def use_thread_pool(): cleanup() pool=ThreadPool(5) pool.map(save_proxies, PROXY_SITES) pool.close() pool.join()
看一下效果:
In : time use_thread_pool() CPU times: user 1.41 s, sys: 93 ms, total: 1.51 s Wall time: 5.25 s
可以看到,在限制了使用线程数的前提下,和上面不限制线程数的方式保持同等水平的运行效率。
multiprocessing.dummy 模块与 multiprocessing 模块的区别在于:前者是多线程,而后者是多进程,但是它们的接口都是一样的,可以很方便地将代码在多线程和多进程之间切换。在不熟悉并发编程的时候,可以先使用非并发的模式做出大部分功能,然后再迁移到多线程或者多进程的版本上,使用这种接口一致的方式就会让迁移成本很低。
之前的例子就是单纯地爬取入库,并没有返回抓取结果。在爬取结束时,我们还可以将抓取到的代理列表存放在某个变量中以备使用,这表示在多线程中需要共享数据。如果数据共享时可能被修改,就需要加锁来保护它,以确保同一时刻只能有一个线程访问这个数据。线程模块提供了许多同步原语,包括锁(Lock)、信号量(Semaphore)、条件变量(Condition)和事件(Event)。但是最好的做法是使用 Queue 模块。Queue 是线程安全的,使用它可以降低程序的复杂度,代码清晰、可读性更强。
先改成使用 Queue 的方式共享可修改数据:
import Queue def save_proxies_with_queue(queue): while True: url=queue.get() save_proxies(url) queue.task_done() # 向任务已经完成的队列发送一个信号 def use_thread_with_queue(): cleanup() queue=Queue.Queue() for i in range(5): t=threading.Thread(target=save_proxies_with_queue, args=(queue,)) t.setDaemon(True) t.start() for url in PROXY_SITES: queue.put(url) queue.join()
一定要调用 task_done,只有这样,在最后 join 方法时才能知道队列中的所有任务是不是都执行完了。
看一下效果:
In : from proxy_fetcher_with_queue import use_thread_with_queue In : time use_thread_with_queue() CPU times: user 1.46 s, sys: 0 ns, total: 1.46 s Wall time: 6.21 s
使用 Queue 效率虽然差一些,但是扩展性很好。现在增加复杂度,抓取之后要把结果保留下来,放进一个列表中。我们使用多个队列,把抓取到的结果放置到另一个队列中。
首先改造 save_proxies,让它返回当前线程获得的代理地址:
def save_proxies(url): proxies=[] try: r=fetch(url) except requests.exceptions.RequestException: return False addresses=re.findall(PROXY_REGEX, r.text) for address in addresses: proxy=Proxy(address=address) try: proxy.save() except NotUniqueError: pass else: proxies.append(address) return proxies
添加使用队列的 save_proxies_with_queue2 函数:
def save_proxies_with_queue2(in_queue, out_queue): while True: url=in_queue.get() rs=save_proxies(url) out_queue.put(rs) in_queue.task_done()
也就是从 in_queue 获取要执行的网站地址,把结果放进 out_queue 队列中。第二步是把放入 out_queue 队列中的结果存入一个全局的列表:
def append_result(out_queue, result): while True: rs=out_queue.get() if rs: result.extend(rs) out_queue.task_done()
result 就是一个全局变量,由于 Python 参数是引用传递,可以直接修改它。
现在基于 use_thread_with_queue,将其修改为函数 use_thread_with_queue2:
def use_thread_with_queue2(): cleanup() in_queue=Queue.Queue() out_queue=Queue.Queue() for i in range(5): t=threading.Thread(target=save_proxies_with_queue2, args=(in_queue, out_queue)) t.setDaemon(True) t.start() for url in PROXY_SITES: in_queue.put(url) result=[] for i in range(5): t=threading.Thread(target=append_result, args=(out_queue, result)) t.setDaemon(True) t.start() in_queue.join() out_queue.join() print len(result)
看一下运行效果:
In : time use_thread_with_queue2() 1806 CPU times: user 1.37 s, sys: 133 ms, total: 1.5 s Wall time: 2.43 s
可以看到最后执行 len(result) 的结果也是 376。
本节通过 time 计算的任务执行时间会受限于网络等客观原因,笔者选择了几次运行时间中最短的那一次结果,仅供参考。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论