使用多进程
使用进程代替线程可以有效避开 GIL,因为每个进程都拥有自己的 Python 解释器实例,也就不受 GIL 的限制了。计算密集型的任务通常应该使用多进程方式,也就是使用 multiprocessing 模块。multiprocessing 模块允许程序充分利用多处理器,并可以跨平台使用。I/O 密集型的任务瓶颈主要在网络延迟,所以使用多线程或者多进程都可以。
我们来实现解析微信文章页面的功能。首先从 Article 集合中找一篇微信公众号文章:
> db.article.find({'publisher':ObjectId("57317619bc1f551139de5b16")},{'title':1, ' url':1, '_id':0}).pretty() { "title":"Python Web 开发的十个框架", "url":"http://mp.weixin.qq.com/s?src=3×tamp=1462859281&ver=1& signature=2qvUQsQ6Tzf13kTij4VZ4cULEA7t1XgK8B6Ny*FKurB*HMwoUzEFYP CQyBEOg0XbGraIQxSJAoc-F8SGifgLH8t2JnKMF6B9tNcV87C6llfPcyfR98qbtQ 3J3KegBlGNo0sQYcGVKYRe9CMj6Pw5BOkjjlGC-tExRvECLLDThQs=" }
url 的值就是微信页面地址,微信公众号文章的页面上除了正文和配图,还展示了评论列表、阅读数和点赞数,这些内容不是 HTML 直接渲染的,而是通过 API 接口返回的,在 Chrome Developer Tools 中可以看到一个来至 http://mp.weixin.qq.com/mp/getcomment 的 API 请求,其中 src、timestamp、signature 和 ver 的参数值和 url 的是一样的,其他额外增加的参数除了空就是 0,只需要根据 url 构造一下就好了:
import urlparse import urllib COMMENT_JS_URL='http://mp.weixin.qq.com/mp/getcomment' def gen_js_url(url): query_dct=urlparse.parse_qs(urlparse.urlsplit(url).query) query_dct={k:v[0] for k, v in query_dct.items()} query_dct.update({'uin':'', 'key':'', 'pass_ticket':'', 'wxtoken':'', 'devicetype':'', 'clientversion':0, 'x5':0}) return '{}?{}'.format(COMMENT_JS_URL, urllib.urlencode(query_dct))
爬取页面的意义就是抓取核心内容,然后在其他地方以某种方式展示出来,文章一般都会有配图,我们这里使用到“占位”这种技巧,在将来展示的时候再替换掉。举个例子,原来的一段页面文本是这样的:
<p>Flask</p> <p><img src="http://some_picture_path"></p> <p><span style="font-size:15px;">Flask 是一个轻量级的 Web 应用框架,使用 Python 编写 ...</span></p>
被抽取内容之后就会变成:
Flask <图片 1> Flask 是一个轻量级的 Web 应用框架,使用 Python 编写...
<图片 1>和图片源地址的关系存在模型中。
对 Article 扩充如下字段:
class Article(BaseModel): ... content=StringField() pictures=DictField() comments=ListField(ReferenceField(Comment)) like_num=IntField() read_num=IntField()
其中 pictures 字段就存放了<图片 1>和图片源地址的对应关系。
评论也是一个独立的集合:
class Comment(BaseModel): nick_name=StringField(max_length=120) content=StringField(max_length=120, required=True) like_num=IntField() comment_id=IntField() article=ReferenceField('Article', dbref=True) meta={ 'collection':'comment', 'indexes':[ {'fields':['article', 'comment_id'], 'unique':True} ] } @classmethod def get_or_create(cls, article, comment_id,**kwargs): comments=cls.objects.filter(article=article, comment_id=comment_id) if comments: return comments[0] comment=cls(article=article, comment_id=comment_id,**kwargs) comment.save() return comment
在 Article 中的 comments 字段使用了 ListField(ReferenceField(Comment)),Comment 中的 article 字段使用的却是 ReferenceField('Article', dbref=True),这是因为 Comment 是先定义的,那个时候程序还没有从上到下地获取到 Article 这个模型,所以需要使用字符串的 Article,然后以设置 dbref 为 True 的方式实现互相引用。
给 Comment 添加的 get_or_create 方法是为了传入字段返回 comment 对象,如没有对象则先创建再返回。
这次用到的 fetch 函数需要改了,因为之前都是抓取单个页面,现在是用一个会话连续抓取两个页面:
from simplejson.scanner import JSONDecodeError def fetch(url): s=requests.Session() s.headers.update({'user-agent':get_user_agent()}) proxies={ 'http':Proxy.get_random()['address'], } html_text=s.get(url, timeout=TIMEOUT, proxies=proxies).text js_url=gen_js_url(url) try: js_data=s.get(js_url, timeout=TIMEOUT, proxies=proxies).json() except JSONDecodeError: raise RequestException() return html_text, js_data
在获取 API 接口的时候特意捕获 JSONDecodeError 的错误,是因为 API 页面也有防爬策略。修改异常的错误类型是为了保证在外面调用 fetch 的时候只需要捕获 RequestException 一种异常就好了。
requests 使用了 simplejson,simplejson 其实就是标准库 json(Python 2.6 时加入)的外部开发版本,它更新很频繁且兼容 Python 2.5。使用上面的 API 返回内容测试一下:
In : %timeit-n 1000 json.dumps(api_result) 1000 loops, best of 3:29.7 □s per loop In : %timeit-n 1000 simplejson.dumps(api_result) 1000 loops, best of 3:44.6 □s per loop In : dumped=simplejson.dumps(api_result) In : %timeit-n 1000 simplejson.loads(dumped) 1000 loops, best of 3:44.1 □s per loop In : %timeit-n 1000 json.loads(dumped) 1000 loops, best of 3:86.1 □s per loop
可以看到 json 的 dump 操作更快,simplejson 的 loads 操作更快。
引用 json 最推荐采用如下的方式:
try: import simplejson as json except ImportError: import json
获得文章评论列表的函数如下:
def get_comments(js_data, article): comments=[] for comment in js_data['comment']: comment_id=comment['id'] content=comment['content'] create_at=datetime.fromtimestamp(float(comment['create_time'])) nick_name=comment['nick_name'] like_num=comment['like_num'] comment=Comment.get_or_create( article, comment_id, content=content, create_at=create_at, nick_name=nick_name, like_num=like_num) comments.append(comment) return comments
更新文章的函数如下:
def update_article(article, html_text, js_data): soup=BeautifulSoup(html_text, 'lxml') p_contents=soup.find(class_='rich_media_content').find_all('p') content=[] pictures={} picture_count=1 for p_content in p_contents: img=p_content.find('img') if img is None: content.append(p_content.text.encode('utf-8')) else: tag='<图片{}>'.format(picture_count) content.append(tag) pictures[tag]=img['data-src'] picture_count+=1 article.content='\n'.join(content) article.pictures=pictures article.comments=get_comments(js_data, article) article.like_num=js_data['like_num'] article.read_num=js_data['read_num'] article.save() return article
按照 13.2 节的思路,我们先创建保存文章结果的函数:
def save_article_result(article, queue=None, retry=0): url=article.article_url try: html_text, js_data=fetch(url) except RequestException: retry+=1 if retry > 5: queue.put(url) return return save_article_result(article, queue, retry) return update_article(article, html_text, js_data)
其中参数 article 就是一个 Article 对象。
现在前期准备都完成了,接下来使用多进程和队列来更新 article 集合中存在的文章。多进程方式不再使用 Queue 模块,而是换成 multiprocessing.Queue 或者 multiprocess-ing.JoinableQueue。JoinableQueue 有 join 和 task_done 两个方法,所以适合存放统一的待执行任务,供并发进程获取。而 Queue 可以用来收集任务执行的结果:
import multiprocessing def use_multiprocessing_with_queue(): queue=multiprocessing.JoinableQueue() num_consumers=multiprocessing.cpu_count()*2 for article in Article.objects.all(): queue.put(article) for_in range(num_consumers): p=multiprocessing.Process(target=save_article_result_with_queue, args=(queue,)) p.start() queue.join()
通常计算密集型的程序使用的线程和进程数量一般与 CPU 核数一致,I/O 密集型的程序使用的线程和进程数量一般是 CPU 核数的 2 倍。也可以根据网络情况,使用 3 倍甚至更多的倍数,在实际工作中通过测试获取最佳值。
看一下消费者进程需要执行的函数:
from Queue import Empty from mongoengine.connection import disconnect from models import Article, lazy_connect def save_article_result_with_queue(queue): disconnect() lazy_connect() while 1: try: article=queue.get(timeout=1) except Empty: break save_article_result(article, queue) queue.task_done()
多进程队列为空时抛的异常是 Queue.Empty。这里用“先关闭 MongoDB 连接,再使用 lazy 模式连接”的原因,是为了解决 MongoDB 使用多进程时,调用 getaddrinfo 可能会发生的死锁(http://bit.ly/1sHScFY )的问题。
lazy_connect 在 models.py 中的定义如下: from config import DB_HOST, DB_PORT, DATABASE_NAME def lazy_connect(): connect(DATABASE_NAME, host=DB_HOST, port=DB_PORT) lazy_connect()
在 13.1 节,演示了通过第二个队列存放第一个队列的执行结果,最后获得抓取总数的方法。
本节我们再简化一些,不用创建第二个队列就能获得全部修改后的 Article 模型:
def save_article_result_with_queue2(in_queue, out_queue): while 1: try: article=in_queue.get(timeout=1) except Empty: break updated_article=save_article_result(article, in_queue) out_queue.put(updated_article) # 把更新好的文章对象放入 out_queue 这个队列中 in_queue.task_done()
使用两个队列的多进程主程序如下:
def use_multiprocessing_with_queue2(): queue=multiprocessing.JoinableQueue() num_consumers=multiprocessing.cpu_count()*2 results_queue=multiprocessing.Queue() for article in Article.objects.all()[5:8]: queue.put(article) for _in range(num_consumers): p=multiprocessing.Process(target=save_article_result_with_queue2, args=(queue, results_queue)) p.start() queue.join() results=[] while 1: try: updated_article=results_queue.get(timeout=1) except Empty: break results.append(updated_article) print len(results)
放入队列的对象只要是可 pickle 化的就可以,所以 Article 对象可以作为消息放入队列。
如果文章页面是动态注入的,需要使用 PhantomJS 之类的工具模拟页面打开的过程,再从加载完毕的页面文件中进行解析。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论