返回介绍

使用多线程

发布于 2025-04-20 18:52:18 字数 7342 浏览 0 评论 0 收藏

讨论 Python 程序的效率时,经常提到的弊端之一是全局解释器锁(Global Interpreter Lock,GIL),GIL 限制任何时间都仅有一个线程在执行。但是存在即合理,由于线程是轻量级的,并且相互之间易于通信,GIL 保护了所有全局的解释器和环境状态变量。如果不使用 GIL 会带来了包括死锁、争用条件和高复杂性在内的各种问题。那么由于 GIL 的限制,使用多线程就完全没有意义了吗?其实不是的,原因如下:

1.Python 大部分是使用 C 语言编写的,一些标准库和第三方的模块也是用 C 编写的,而 C 语言代码是可以获取和释放 GIL 的,这在一定程度上缓解了 GIL 的问题。一些重要的、需要更高运行效率的模块还可以使用 C 语言编写,这样就可以利用更多的 CPU 资源。

2.多线程适合解决由于网络、磁盘等资源造成的 I/O 阻塞问题,它利用了等待 I/O 请求完成被阻塞而导致的 CPU 空闲时间。计算密集型的工作不适合使用多线程完成,因为计算需要 CPU 资源,如果 CPU 繁忙的话,使用多线程并没有更多地利用 CPU 空闲时间,反而由于 CPU 需要额外调度多线程,以及线程切换的各种开销,多线程的运行效率比单线程还要慢。

抓取 HTTP 代理列表,主要是网络请求,几乎没有计算相关的工作,适合使用多线程。在开始使用多线程之前,先添加基本设置到 config.py 中:

import re
 
PROXY_SITES=[
    'http://xxx.com',
    ...
]
 
REFERER_LIST=[
    'http://www.google.com/',
    ...
]

获得 UA 和 Referer 以及请求的函数(utils.py):

import random
 
import requests
from fake_useragent import UserAgent
 
from config import REFERER_LIST, TIMEOUT
 
 
def get_referer():
    return random.choice(REFERER_LIST)
 
 
def get_user_agent():
    ua=UserAgent()
    return ua.random

fake_useragent 在第一次启动的时候会将 http://useragentstring.com 网站上列出来的 UA 缓存到/tmp/fake_useragent.json,这个过程可能需要代理才能完成,可以使用项目源码目录下已经保存的文件:

> cp ~/web_develop/data/fake_useragent.json /tmp

添加代理的模型 Proxy:

class Proxy(BaseModel):
    address=StringField(unique=True) # 保证地址唯一
 
    meta={'collection':'proxy'} # 指定 Model 使用的集合(表)名

如果爬取非常多的代理网站,是不是要针对每个网站都写一个解析 HTML 或者代理列表呢?其实不用,直接解析 HTML 页面中的代理地址就好了。不需要那么严格地匹配,因为之后

我们还会验证这些地址,如果该代理地址不能使用,会自动把它从数据库中删掉的。匹配代理地址的正则表达式是:

PROXY_REGEX=re.compile('[0-9]+(?:\.[0-9]+){3}:\d{2,4}')

HTTP 请求函数如下:

def fetch(url, proxy=None):
    s=requests.Session()
    s.headers.update({'user-agent':get_user_agent()})
  
    proxies=None
    if proxy is not None:
       proxies={
           'http':proxy,
       }
    return s.get(url, timeout=TIMEOUT, proxies=proxies)

本节未使用代理,proxy 参数为默认的 None。

保存代理的函数如下:

def save_proxies(url):
    try:
        r=fetch(url)
    except requests.exceptions.RequestException:
        return False
    addresses=re.findall(PROXY_REGEX, r.text)
    for address in addresses:
        proxy=Proxy(address=address)
        try:
            proxy.save()
        except NotUniqueError:
            pass

有可能多个代理网站上都列出了某个代理地址,所以在上面的实现中忽略了 NotUniqueError 这种异常。这里强调一点,应该指定捕获的异常的类型,不应该使用模糊的捕获,如下例:

try:
    l[0]
except:
    pass

应该使用:

try:
    l[0]
except IndexError:
    pass

因为 l[0]只可能产生 IndexError 这个异常。作为开发者,你应该明确地知道会出现哪些类型的异常,把它们都列出来。使用异常的基类可以指定更少的错误类型,比如上面用到的 RequestException,requests 的异常都以它为基类,在抓取的时候捕捉由 requests 发出的全部异常,不管它是 Timeout 还是 ConnectionError。但是使用 Exception 并不是一个好的习惯,虽然它是一般自定义异常的基类,但是可能会把一些非预期产生的错误忽略掉,除非你明确知道这样做的后果。

为了确保执行的前提环境一致,抓取前我们都会清空 Proxy 表:

def cleanup():
    Proxy.drop_collection()

代理有时效性,现在可用的代理可能过一会儿就不可用了。应该周期性替换成最新的代理。

如果不使用多线程,可以用一个 for 循环,挨个网站地抓取:

def non_thread():
    cleanup()
    for url in PROXY_SITES:
        save_proxies(url)

这种串行执行的效率很低:

In : from proxy_fetcher import non_thread
In : time non_thread()
CPU times:user 1.32 s, sys:0 ns, total:1.32 s
Wall time:15.6 s
In : from models import Proxy
In : Proxy.objects.count()
Out: 376

看一下使用多线程的例子:

def use_thread():
    cleanup()
    threads=[]
    for url in PROXY_SITES:
        t=threading.Thread(target=save_proxies, args=(url,))
        t.setDaemon(True)
        threads.append(t)
        t.start()
 
    for t in threads:
        t.join() # 等到线程都结束,再退出主程序

看一下效果,注意看花费的时间:

In : time use_thread()
CPU times: user 1.51 s, sys: 148 ms, total: 1.66 s
Wall time: 5.42 s

可以感受到速度提高了 3 倍,但是这个例子有一个不好的用法:一个站点就创建一个线程,如果站点很多就需要创建很多个线程,而创建和销毁线程是一个比较重的开销。可以考虑使用线程池,重用线程池中的线程:

from multiprocessing.dummy import Pool as ThreadPool

def use_thread_pool():
    cleanup()
    pool=ThreadPool(5)
    pool.map(save_proxies, PROXY_SITES)
    pool.close()
    pool.join()

看一下效果:

In : time use_thread_pool()
CPU times: user 1.41 s, sys: 93 ms, total: 1.51 s
Wall time: 5.25 s

可以看到,在限制了使用线程数的前提下,和上面不限制线程数的方式保持同等水平的运行效率。

multiprocessing.dummy 模块与 multiprocessing 模块的区别在于:前者是多线程,而后者是多进程,但是它们的接口都是一样的,可以很方便地将代码在多线程和多进程之间切换。在不熟悉并发编程的时候,可以先使用非并发的模式做出大部分功能,然后再迁移到多线程或者多进程的版本上,使用这种接口一致的方式就会让迁移成本很低。

之前的例子就是单纯地爬取入库,并没有返回抓取结果。在爬取结束时,我们还可以将抓取到的代理列表存放在某个变量中以备使用,这表示在多线程中需要共享数据。如果数据共享时可能被修改,就需要加锁来保护它,以确保同一时刻只能有一个线程访问这个数据。线程模块提供了许多同步原语,包括锁(Lock)、信号量(Semaphore)、条件变量(Condition)和事件(Event)。但是最好的做法是使用 Queue 模块。Queue 是线程安全的,使用它可以降低程序的复杂度,代码清晰、可读性更强。

先改成使用 Queue 的方式共享可修改数据:

import Queue
 
def save_proxies_with_queue(queue):
 
while True:
    url=queue.get()
    save_proxies(url)
    queue.task_done() # 向任务已经完成的队列发送一个信号
 
def use_thread_with_queue():
    cleanup()
    queue=Queue.Queue()
 
    for i in range(5):
        t=threading.Thread(target=save_proxies_with_queue, args=(queue,))
        t.setDaemon(True)
        t.start()
 
    for url in PROXY_SITES:
        queue.put(url)
    queue.join()

一定要调用 task_done,只有这样,在最后 join 方法时才能知道队列中的所有任务是不是都执行完了。

看一下效果:

In : from proxy_fetcher_with_queue import use_thread_with_queue
In : time use_thread_with_queue()
CPU times: user 1.46 s, sys: 0 ns, total: 1.46 s
Wall time: 6.21 s

使用 Queue 效率虽然差一些,但是扩展性很好。现在增加复杂度,抓取之后要把结果保留下来,放进一个列表中。我们使用多个队列,把抓取到的结果放置到另一个队列中。

首先改造 save_proxies,让它返回当前线程获得的代理地址:

def save_proxies(url):
    proxies=[]
    try:
        r=fetch(url)
    except requests.exceptions.RequestException:
        return False
    addresses=re.findall(PROXY_REGEX, r.text)
    for address in addresses:
        proxy=Proxy(address=address)
        try:
            proxy.save()
        except NotUniqueError:
            pass
            else:
                proxies.append(address)
        return proxies

添加使用队列的 save_proxies_with_queue2 函数:

def save_proxies_with_queue2(in_queue, out_queue):
    while True:
        url=in_queue.get()
        rs=save_proxies(url)
        out_queue.put(rs)
        in_queue.task_done()

也就是从 in_queue 获取要执行的网站地址,把结果放进 out_queue 队列中。第二步是把放入 out_queue 队列中的结果存入一个全局的列表:

def append_result(out_queue, result):
    while True:
        rs=out_queue.get()
        if rs:
            result.extend(rs)
        out_queue.task_done()

result 就是一个全局变量,由于 Python 参数是引用传递,可以直接修改它。

现在基于 use_thread_with_queue,将其修改为函数 use_thread_with_queue2:

def use_thread_with_queue2():
    cleanup()
    in_queue=Queue.Queue()
    out_queue=Queue.Queue()
 
    for i in range(5):
        t=threading.Thread(target=save_proxies_with_queue2,
                          args=(in_queue, out_queue))
        t.setDaemon(True)
        t.start()
 
    for url in PROXY_SITES:
        in_queue.put(url)

    result=[]
 
    for i in range(5):
        t=threading.Thread(target=append_result,
                           args=(out_queue, result))
        t.setDaemon(True)
        t.start()
    in_queue.join()
    out_queue.join()
    print len(result)

看一下运行效果:

In : time use_thread_with_queue2()
1806
CPU times: user 1.37 s, sys: 133 ms, total: 1.5 s
Wall time: 2.43 s

可以看到最后执行 len(result) 的结果也是 376。

本节通过 time 计算的任务执行时间会受限于网络等客观原因,笔者选择了几次运行时间中最短的那一次结果,仅供参考。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。