返回介绍

使用 Gevent

发布于 2025-04-20 18:52:18 字数 7933 浏览 0 评论 0 收藏

高并发编程时,采用多线程(或进程)是一种不可取的解决方案,因为线程(或进程)本质上都是操作系统的资源。每个线程都是需要额外占用内存的,由于线程的调度由操作系统完成,调度器会因为时间片用完等原因强制夺取某个线程的控制权,开发者还需要考虑加锁、使用队列等操作,这些都容易造成高并发情况下的性能瓶颈。

协程是用户空间线程,复杂的逻辑和异步都封装在底层,开发者还是在使用同步的方式编程,但这种协作式的任务调度可以让用户自己控制使用 CPU 的时间,除非自己放弃,否则不会被其他协程抢夺到控制权。

Python 2 通过 yield 提供了对协程的基本支持,但是功能很有限。而第三方的 Gevent 为 Python 提供了比较完善的协程支持。Gevent 是一个基于微线程库 Greenlet 的并发框架,虽然与直接使用 Greenlet、Eventlet 相比性能略低,但是它提供了和线程模型编程相仿的接口,而且提供 Monkey Patch 方法,可以在运行时动态修改标准库里大部分的阻塞式系统调用,如 socket、threading 和 select 等模块,让其变为协作式运行。

通过如下例子看一下 Gevent 的上下文切换(gevent_spawn.py):

import gevent
 
 
def a():
    print 'Start a'
    gevent.sleep(1)
    print 'End a'
 

def b():
    print 'Start b'
    gevent.sleep(2)
    print 'End b'
 
 
gevent.joinall([
    gevent.spawn(a),
    gevent.spawn(b),
])

执行的结果如下:

Start a
Start b
End a End b

在 Gevent 里面,上下文切换是通过 yield 来完成的,执行 gevent.sleep 会触发上下文切换,这个切换实际上交由程序来控制。

在开始爬取搜索页的工作之前,我们先使用 Gevent 过滤出可用的代理列表。为什么这么做呢?代理网站上发布的代理其实大部分是不可用的,为了不浪费我们的抓取资源,可以先过滤掉大部分不可用的代理地址(remove_unavailable_proxy.py):

from gevent.pool import Pool
from requests.exceptions import RequestException

from utils import fetch
from models import Proxy

pool=Pool(10)
 

def check_proxy(p):
    try:
        fetch('http://baidu.com', proxy=p['address'])
    except RequestException:
        p.delete()
 
  
pool.map(check_proxy, Proxy.objects.all())

如果代理不能访问百度就证明其不可用,然后将其从数据库中删除。

看一下过滤后的代理数量:

In : Proxy.objects.count()
Out: 123

代理数量减少到原来的 1/3,可见免费代理很不稳定。除了寻找更多的代理网站,还可以使用自己的代理服务器,保证抓取的效率和稳定。

然后给 Proxy 类添加获得随机文档的类方法:

@classmethod
def get_random_proxy(cls):
    proxy=cls.objects.aggregate({'$sample':{'size':1}}).next()
    return proxy

每次执行 Proxy.get_random() 都可以随机获得一个 Proxy 文档,尽量每次都用不同的代理来抓取,杜绝长时间使用单 IP 高频抓取。

文章都有发布者,为了未来检索起来方便,把发布者用独立的模型存放:

class Publisher(BaseModel):
    display_name=StringField(max_length=50, required=True)
 
    meta={'collection':'publisher'}
 
    @classmethod
    def get_or_create(cls, display_name):
    try:
        return cls.objects.get(display_name=display_name)
    except DoesNotExist:
        publisher=cls(display_name=display_name)
        publisher.save()
        return publisher

笔者特意添加了一个叫作 get_or_create 的类方法,保证通过发布者的名字就能获得对应的 Publisher 文档。

接着定义文章的模型:

class Article(BaseModel):
    title=StringField(max_length=120, required=True)
    img_url=StringField()
    url=StringField(required=True)
    summary=StringField()
    publisher=ReferenceField(Publisher, required=True)
    create_at=DateTimeField()
    meta={
       'collection':'article',
       'indexes':[
           '-create_at',
           {'fields':['title', 'summary', 'publisher'],
           'unique':True}
       ],
       'ordering':['-create_at']
    }

搜索页抓取到的内容还不全,13.2 节会把其余字段填充进来。为了提高查询效率,添加了两个索引,一个是按照创建时间降序,另外一个是 title+summary+publisher 的复合索引,保证不会插入三者结果值都相同的文章,索引不使用 url 字段,是因为同一篇文章每次获得文章地址的结果是不一样的。Article 对象默认以时间降序排序。

保存搜索结果的函数如下:

from mongoengine import DoesNotExist
from gevent import sleep, GreenletExit

 
def save_search_result(page, queue, retry=0):
    proxy=Proxy.get_random()['address']
    url=SEARCH_URL.format(SEARCH_TEXT, page)
 
    try:
        r=fetch(url, proxy=proxy)
    except (Timeout, ConnectionError):
        sleep(0.1)
        retry += 1
        if retry > 5:
            queue.put(page)
        raise GreenletExit()
        try:
            p=Proxy.objects.get(address=proxy)
            if p:
                p.delete()
            except DoesNotExist:
                pass
 
            return save_search_result(page, queue, retry)
    soup=BeautifulSoup(r.text, 'lxml')
    results=soup.find(class_='results')
    if results is None:
        # 此代理已经被封,换其他的代理
    sleep(0.1)
    retry += 1
    if retry > 5:
        queue.put(page)
        print 'retry too much!'
        raise GreenletExit()
    return save_search_result(page, queue, retry)
articles=results.find_all(
    'div', lambda x:'wx-rb' in x)
for article in articles:
    save_article(article)

如果执行 fetch 遇到 Timeout 和 ConnectionError,一般说明代理有问题,异常处理中会找到这个 Proxy 对象,从 MongoDB 中把它删掉。

其次是 fetch 的请求如果失败,会 sleep 一段时间,这里设置为的 0.1 只是为了演示,实际环境中可适当加大。如果重试次数超过 5 次就会抛出 GreenletExit 异常,保证不会因为大量代理异常造成协程不能结束的情况。参数包含 queue 的唯一原因,就是在重试失败之前可以把任务再放回队列,保证任务不丢失。

可以看到使用 BeautifulSoup 的 find 和 find_all 这两种方法就能实现找到对应元素属性和文本的结果。唯一要注意的是,由于“class”在 Python 中的作用是声明类,所以使用 CSS 类的条件时需要使用“class_”替代“class”关键词。

下面这个函数解析单篇文章需要的字段内容并保存到 MongoDB 中:

from pymongo.errors import InvalidBSON
from mongoengine import NotUniqueError
 
 
def save_article(article_):
    img_url=article_.find(class_='img_box2').find(
        'img').attrs['src'].split('url=')[1]
    text_box=article_.find(class_='txt-box')
    title=text_box.find('h4').find('a').text
    article_url=text_box.find('h4').find('a').attrs['href']
    summary=text_box.find('p').text
    create_at=datetime.fromtimestamp(float(text_box.find(
        class_='s-p').attrs['t']))
    publisher_name=text_box.find(class_='s-p').find('a').attrs['title']
 
    article=Article(img_url=img_url, title=title, url=article_url,
                    summary=summary, create_at=create_at,
                    publisher=Publisher.get_or_create(publisher_name))
    try:
        article.save()
        except (NotUniqueError, InvalidBSON):
            pass

使用 Gevent 的协程池和队列实现并发抓取:

from gevent import monkey
from gevent.queue import Queue, Empty
from gevent.pool import Pool
monkey.patch_all()
 
SEARCH_URL='http://weixin.sogou.com/weixin?query={}&type=2&page={}'
SEARCH_TEXT='Python'
 
 
def use_gevent_with_queue():
    queue=Queue()
    pool=Pool(5)

    for p in range(1, 7):
        queue.put(p)

    while pool.free_count():
        sleep(0.1)
        pool.spawn(save_search_result_with_queue, queue)
 
    pool.join()

我们创建一个包含 5 个线程的池,搜索关键词为 Python 的文章。默认先把第 1 到第 6 页作为地址初始任务放进队列。队列的执行者 save_search_result_with_queue 函数如下:

def save_search_result_with_queue(queue):
    while 1:
        try:
            p=queue.get(timeout=0)
        except Empty:
            break
 
        save_search_result(p, queue)
    print 'stopping crawler...'

如果队列为空,当前的协程就会结束。这里笔者有个经验,为了实现更好的扩展性,放入队列的是那些有变化的内容,也就是只放入页面数字,在 save_search_result 函数中,通过页数和搜索文本拼出 url,而不是把整个 url 拼好了再放入队列,这样除了让序列化工作更快以外,还有更大的灵活性。当然,如果搜索的文本不仅是 Python,那么要搜索的其他文本也需要放入队列,一步步地传递进 save_search_result 中。

可以发现,爬完第 6 页协程就全部结束了。程序没有继续翻页直到爬完全部搜索页面才结束。怎么办呢?思路很简单,在页面抓取结束后发现有“下一页”,就把当前页之后的页数作为任务放进队列。但是需要注意,初始时放进了第 1 到第 6 页,假设第 1 页抓去完成后,应该放入第 2 页,但事实上第 2 页已经完成或者正在被其他协程抓取中。而初始时只放入第 1 页会造成协程池没有被充分利用。最简单的解决方案是保存当前抓取的页面和已经完成的页数的列表,保证不会重复抓取。可以想象,修改这个页数的列表是需要加锁的。

信号量(Semaphore)可以用来保证多协程(或者线程)并发访问共享资源时不会发生冲突。在访问开始时,协程必须获取一个信号量;访问结束后,该协程必须释放信号量,如果一个信号量的范围已经降低到 0,其他想访问的协程会被阻塞在 acquire 操作上,直到另一个已经获得信号量的协程做出释放。如果信号量设为 1,其实就是锁(Lock),表示是互斥访问,保证资源只在程序上下文被单次使用。我们使用锁来确认某页是否可以放入队列:

from gevent.coros import BoundedSemaphore
 
sem=BoundedSemaphore(1)
IN_POOL_TASKS=[]
 
 
def put_new_page(p, queue):
    global IN_POOL_TASKS
    sem.acquire()
    sleep(0)
    if p not in IN_POOL_TASKS:
       queue.put(p)
       IN_POOL_TASKS.append(p)
    sem.release()

然后把程序中的 queue.put(page) 全部改成 put_new_page(p, queue)。

在函数 save_search_result 中添加爬取的其他页面的逻辑:

page_container=soup.find(id='pagebar_container')
if page_container and u'下一页' in page_container.text:
   last_page=int(page_container.find_all('a')[-2].text)
   current_page=int(page_container.find('span').text)
   for page in range(current_page+1, last_page+1):
       put_new_page(page, queue)

put_new_page 会把新的页面也放入队列,这样就可以可持续地翻页,直到没有下一页。

以一个优秀的爬虫标准来评价上述例子,还有如下两点需要改进:

  • 微信搜索需要登录才能查询超过 10 页的结果,所以还需要实现登录功能。应该把登录和抓取步骤放在一个 requests.Session 中,保证整个过程在一个会话中完成。
  • 没有支持增量爬取,当翻页时发现抓取到的文章之前已经抓取过,其实就不需要再往前翻页了。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。