使用 Gevent
高并发编程时,采用多线程(或进程)是一种不可取的解决方案,因为线程(或进程)本质上都是操作系统的资源。每个线程都是需要额外占用内存的,由于线程的调度由操作系统完成,调度器会因为时间片用完等原因强制夺取某个线程的控制权,开发者还需要考虑加锁、使用队列等操作,这些都容易造成高并发情况下的性能瓶颈。
协程是用户空间线程,复杂的逻辑和异步都封装在底层,开发者还是在使用同步的方式编程,但这种协作式的任务调度可以让用户自己控制使用 CPU 的时间,除非自己放弃,否则不会被其他协程抢夺到控制权。
Python 2 通过 yield 提供了对协程的基本支持,但是功能很有限。而第三方的 Gevent 为 Python 提供了比较完善的协程支持。Gevent 是一个基于微线程库 Greenlet 的并发框架,虽然与直接使用 Greenlet、Eventlet 相比性能略低,但是它提供了和线程模型编程相仿的接口,而且提供 Monkey Patch 方法,可以在运行时动态修改标准库里大部分的阻塞式系统调用,如 socket、threading 和 select 等模块,让其变为协作式运行。
通过如下例子看一下 Gevent 的上下文切换(gevent_spawn.py):
import gevent def a(): print 'Start a' gevent.sleep(1) print 'End a' def b(): print 'Start b' gevent.sleep(2) print 'End b' gevent.joinall([ gevent.spawn(a), gevent.spawn(b), ])
执行的结果如下:
Start a Start b End a End b
在 Gevent 里面,上下文切换是通过 yield 来完成的,执行 gevent.sleep 会触发上下文切换,这个切换实际上交由程序来控制。
在开始爬取搜索页的工作之前,我们先使用 Gevent 过滤出可用的代理列表。为什么这么做呢?代理网站上发布的代理其实大部分是不可用的,为了不浪费我们的抓取资源,可以先过滤掉大部分不可用的代理地址(remove_unavailable_proxy.py):
from gevent.pool import Pool from requests.exceptions import RequestException from utils import fetch from models import Proxy pool=Pool(10) def check_proxy(p): try: fetch('http://baidu.com', proxy=p['address']) except RequestException: p.delete() pool.map(check_proxy, Proxy.objects.all())
如果代理不能访问百度就证明其不可用,然后将其从数据库中删除。
看一下过滤后的代理数量:
In : Proxy.objects.count() Out: 123
代理数量减少到原来的 1/3,可见免费代理很不稳定。除了寻找更多的代理网站,还可以使用自己的代理服务器,保证抓取的效率和稳定。
然后给 Proxy 类添加获得随机文档的类方法:
@classmethod def get_random_proxy(cls): proxy=cls.objects.aggregate({'$sample':{'size':1}}).next() return proxy
每次执行 Proxy.get_random() 都可以随机获得一个 Proxy 文档,尽量每次都用不同的代理来抓取,杜绝长时间使用单 IP 高频抓取。
文章都有发布者,为了未来检索起来方便,把发布者用独立的模型存放:
class Publisher(BaseModel): display_name=StringField(max_length=50, required=True) meta={'collection':'publisher'} @classmethod def get_or_create(cls, display_name): try: return cls.objects.get(display_name=display_name) except DoesNotExist: publisher=cls(display_name=display_name) publisher.save() return publisher
笔者特意添加了一个叫作 get_or_create 的类方法,保证通过发布者的名字就能获得对应的 Publisher 文档。
接着定义文章的模型:
class Article(BaseModel): title=StringField(max_length=120, required=True) img_url=StringField() url=StringField(required=True) summary=StringField() publisher=ReferenceField(Publisher, required=True) create_at=DateTimeField() meta={ 'collection':'article', 'indexes':[ '-create_at', {'fields':['title', 'summary', 'publisher'], 'unique':True} ], 'ordering':['-create_at'] }
搜索页抓取到的内容还不全,13.2 节会把其余字段填充进来。为了提高查询效率,添加了两个索引,一个是按照创建时间降序,另外一个是 title+summary+publisher 的复合索引,保证不会插入三者结果值都相同的文章,索引不使用 url 字段,是因为同一篇文章每次获得文章地址的结果是不一样的。Article 对象默认以时间降序排序。
保存搜索结果的函数如下:
from mongoengine import DoesNotExist from gevent import sleep, GreenletExit def save_search_result(page, queue, retry=0): proxy=Proxy.get_random()['address'] url=SEARCH_URL.format(SEARCH_TEXT, page) try: r=fetch(url, proxy=proxy) except (Timeout, ConnectionError): sleep(0.1) retry += 1 if retry > 5: queue.put(page) raise GreenletExit() try: p=Proxy.objects.get(address=proxy) if p: p.delete() except DoesNotExist: pass return save_search_result(page, queue, retry) soup=BeautifulSoup(r.text, 'lxml') results=soup.find(class_='results') if results is None: # 此代理已经被封,换其他的代理 sleep(0.1) retry += 1 if retry > 5: queue.put(page) print 'retry too much!' raise GreenletExit() return save_search_result(page, queue, retry) articles=results.find_all( 'div', lambda x:'wx-rb' in x) for article in articles: save_article(article)
如果执行 fetch 遇到 Timeout 和 ConnectionError,一般说明代理有问题,异常处理中会找到这个 Proxy 对象,从 MongoDB 中把它删掉。
其次是 fetch 的请求如果失败,会 sleep 一段时间,这里设置为的 0.1 只是为了演示,实际环境中可适当加大。如果重试次数超过 5 次就会抛出 GreenletExit 异常,保证不会因为大量代理异常造成协程不能结束的情况。参数包含 queue 的唯一原因,就是在重试失败之前可以把任务再放回队列,保证任务不丢失。
可以看到使用 BeautifulSoup 的 find 和 find_all 这两种方法就能实现找到对应元素属性和文本的结果。唯一要注意的是,由于“class”在 Python 中的作用是声明类,所以使用 CSS 类的条件时需要使用“class_”替代“class”关键词。
下面这个函数解析单篇文章需要的字段内容并保存到 MongoDB 中:
from pymongo.errors import InvalidBSON from mongoengine import NotUniqueError def save_article(article_): img_url=article_.find(class_='img_box2').find( 'img').attrs['src'].split('url=')[1] text_box=article_.find(class_='txt-box') title=text_box.find('h4').find('a').text article_url=text_box.find('h4').find('a').attrs['href'] summary=text_box.find('p').text create_at=datetime.fromtimestamp(float(text_box.find( class_='s-p').attrs['t'])) publisher_name=text_box.find(class_='s-p').find('a').attrs['title'] article=Article(img_url=img_url, title=title, url=article_url, summary=summary, create_at=create_at, publisher=Publisher.get_or_create(publisher_name)) try: article.save() except (NotUniqueError, InvalidBSON): pass
使用 Gevent 的协程池和队列实现并发抓取:
from gevent import monkey from gevent.queue import Queue, Empty from gevent.pool import Pool monkey.patch_all() SEARCH_URL='http://weixin.sogou.com/weixin?query={}&type=2&page={}' SEARCH_TEXT='Python' def use_gevent_with_queue(): queue=Queue() pool=Pool(5) for p in range(1, 7): queue.put(p) while pool.free_count(): sleep(0.1) pool.spawn(save_search_result_with_queue, queue) pool.join()
我们创建一个包含 5 个线程的池,搜索关键词为 Python 的文章。默认先把第 1 到第 6 页作为地址初始任务放进队列。队列的执行者 save_search_result_with_queue 函数如下:
def save_search_result_with_queue(queue): while 1: try: p=queue.get(timeout=0) except Empty: break save_search_result(p, queue) print 'stopping crawler...'
如果队列为空,当前的协程就会结束。这里笔者有个经验,为了实现更好的扩展性,放入队列的是那些有变化的内容,也就是只放入页面数字,在 save_search_result 函数中,通过页数和搜索文本拼出 url,而不是把整个 url 拼好了再放入队列,这样除了让序列化工作更快以外,还有更大的灵活性。当然,如果搜索的文本不仅是 Python,那么要搜索的其他文本也需要放入队列,一步步地传递进 save_search_result 中。
可以发现,爬完第 6 页协程就全部结束了。程序没有继续翻页直到爬完全部搜索页面才结束。怎么办呢?思路很简单,在页面抓取结束后发现有“下一页”,就把当前页之后的页数作为任务放进队列。但是需要注意,初始时放进了第 1 到第 6 页,假设第 1 页抓去完成后,应该放入第 2 页,但事实上第 2 页已经完成或者正在被其他协程抓取中。而初始时只放入第 1 页会造成协程池没有被充分利用。最简单的解决方案是保存当前抓取的页面和已经完成的页数的列表,保证不会重复抓取。可以想象,修改这个页数的列表是需要加锁的。
信号量(Semaphore)可以用来保证多协程(或者线程)并发访问共享资源时不会发生冲突。在访问开始时,协程必须获取一个信号量;访问结束后,该协程必须释放信号量,如果一个信号量的范围已经降低到 0,其他想访问的协程会被阻塞在 acquire 操作上,直到另一个已经获得信号量的协程做出释放。如果信号量设为 1,其实就是锁(Lock),表示是互斥访问,保证资源只在程序上下文被单次使用。我们使用锁来确认某页是否可以放入队列:
from gevent.coros import BoundedSemaphore sem=BoundedSemaphore(1) IN_POOL_TASKS=[] def put_new_page(p, queue): global IN_POOL_TASKS sem.acquire() sleep(0) if p not in IN_POOL_TASKS: queue.put(p) IN_POOL_TASKS.append(p) sem.release()
然后把程序中的 queue.put(page) 全部改成 put_new_page(p, queue)。
在函数 save_search_result 中添加爬取的其他页面的逻辑:
page_container=soup.find(id='pagebar_container') if page_container and u'下一页' in page_container.text: last_page=int(page_container.find_all('a')[-2].text) current_page=int(page_container.find('span').text) for page in range(current_page+1, last_page+1): put_new_page(page, queue)
put_new_page 会把新的页面也放入队列,这样就可以可持续地翻页,直到没有下一页。
以一个优秀的爬虫标准来评价上述例子,还有如下两点需要改进:
- 微信搜索需要登录才能查询超过 10 页的结果,所以还需要实现登录功能。应该把登录和抓取步骤放在一个 requests.Session 中,保证整个过程在一个会话中完成。
- 没有支持增量爬取,当翻页时发现抓取到的文章之前已经抓取过,其实就不需要再往前翻页了。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论