使用多进程
使用进程代替线程可以有效避开 GIL,因为每个进程都拥有自己的 Python 解释器实例,也就不受 GIL 的限制了。计算密集型的任务通常应该使用多进程方式,也就是使用 multiprocessing 模块。multiprocessing 模块允许程序充分利用多处理器,并可以跨平台使用。I/O 密集型的任务瓶颈主要在网络延迟,所以使用多线程或者多进程都可以。
我们来实现解析微信文章页面的功能。首先从 Article 集合中找一篇微信公众号文章:
> db.article.find({'publisher':ObjectId("57317619bc1f551139de5b16")},{'title':1, '
url':1, '_id':0}).pretty()
{
"title":"Python Web 开发的十个框架",
"url":"http://mp.weixin.qq.com/s?src=3×tamp=1462859281&ver=1&
signature=2qvUQsQ6Tzf13kTij4VZ4cULEA7t1XgK8B6Ny*FKurB*HMwoUzEFYP
CQyBEOg0XbGraIQxSJAoc-F8SGifgLH8t2JnKMF6B9tNcV87C6llfPcyfR98qbtQ
3J3KegBlGNo0sQYcGVKYRe9CMj6Pw5BOkjjlGC-tExRvECLLDThQs="
}
url 的值就是微信页面地址,微信公众号文章的页面上除了正文和配图,还展示了评论列表、阅读数和点赞数,这些内容不是 HTML 直接渲染的,而是通过 API 接口返回的,在 Chrome Developer Tools 中可以看到一个来至 http://mp.weixin.qq.com/mp/getcomment 的 API 请求,其中 src、timestamp、signature 和 ver 的参数值和 url 的是一样的,其他额外增加的参数除了空就是 0,只需要根据 url 构造一下就好了:
import urlparse
import urllib
COMMENT_JS_URL='http://mp.weixin.qq.com/mp/getcomment'
def gen_js_url(url):
query_dct=urlparse.parse_qs(urlparse.urlsplit(url).query)
query_dct={k:v[0] for k, v in query_dct.items()}
query_dct.update({'uin':'', 'key':'', 'pass_ticket':'', 'wxtoken':'',
'devicetype':'', 'clientversion':0, 'x5':0})
return '{}?{}'.format(COMMENT_JS_URL, urllib.urlencode(query_dct))
爬取页面的意义就是抓取核心内容,然后在其他地方以某种方式展示出来,文章一般都会有配图,我们这里使用到“占位”这种技巧,在将来展示的时候再替换掉。举个例子,原来的一段页面文本是这样的:
<p>Flask</p>
<p><img src="http://some_picture_path"></p>
<p><span style="font-size:15px;">Flask 是一个轻量级的 Web 应用框架,使用 Python 编写
...</span></p>
被抽取内容之后就会变成:
Flask <图片 1> Flask 是一个轻量级的 Web 应用框架,使用 Python 编写...
<图片 1>和图片源地址的关系存在模型中。
对 Article 扩充如下字段:
class Article(BaseModel):
...
content=StringField()
pictures=DictField()
comments=ListField(ReferenceField(Comment))
like_num=IntField()
read_num=IntField()
其中 pictures 字段就存放了<图片 1>和图片源地址的对应关系。
评论也是一个独立的集合:
class Comment(BaseModel):
nick_name=StringField(max_length=120)
content=StringField(max_length=120, required=True)
like_num=IntField()
comment_id=IntField()
article=ReferenceField('Article', dbref=True)
meta={
'collection':'comment',
'indexes':[
{'fields':['article', 'comment_id'],
'unique':True}
]
}
@classmethod
def get_or_create(cls, article, comment_id,**kwargs):
comments=cls.objects.filter(article=article,
comment_id=comment_id)
if comments:
return comments[0]
comment=cls(article=article, comment_id=comment_id,**kwargs)
comment.save()
return comment
在 Article 中的 comments 字段使用了 ListField(ReferenceField(Comment)),Comment 中的 article 字段使用的却是 ReferenceField('Article', dbref=True),这是因为 Comment 是先定义的,那个时候程序还没有从上到下地获取到 Article 这个模型,所以需要使用字符串的 Article,然后以设置 dbref 为 True 的方式实现互相引用。
给 Comment 添加的 get_or_create 方法是为了传入字段返回 comment 对象,如没有对象则先创建再返回。
这次用到的 fetch 函数需要改了,因为之前都是抓取单个页面,现在是用一个会话连续抓取两个页面:
from simplejson.scanner import JSONDecodeError
def fetch(url):
s=requests.Session()
s.headers.update({'user-agent':get_user_agent()})
proxies={
'http':Proxy.get_random()['address'],
}
html_text=s.get(url, timeout=TIMEOUT, proxies=proxies).text
js_url=gen_js_url(url)
try:
js_data=s.get(js_url, timeout=TIMEOUT, proxies=proxies).json()
except JSONDecodeError:
raise RequestException()
return html_text, js_data
在获取 API 接口的时候特意捕获 JSONDecodeError 的错误,是因为 API 页面也有防爬策略。修改异常的错误类型是为了保证在外面调用 fetch 的时候只需要捕获 RequestException 一种异常就好了。
requests 使用了 simplejson,simplejson 其实就是标准库 json(Python 2.6 时加入)的外部开发版本,它更新很频繁且兼容 Python 2.5。使用上面的 API 返回内容测试一下:
In : %timeit-n 1000 json.dumps(api_result) 1000 loops, best of 3:29.7 □s per loop In : %timeit-n 1000 simplejson.dumps(api_result) 1000 loops, best of 3:44.6 □s per loop In : dumped=simplejson.dumps(api_result) In : %timeit-n 1000 simplejson.loads(dumped) 1000 loops, best of 3:44.1 □s per loop In : %timeit-n 1000 json.loads(dumped) 1000 loops, best of 3:86.1 □s per loop
可以看到 json 的 dump 操作更快,simplejson 的 loads 操作更快。
引用 json 最推荐采用如下的方式:
try:
import simplejson as json
except ImportError:
import json
获得文章评论列表的函数如下:
def get_comments(js_data, article):
comments=[]
for comment in js_data['comment']:
comment_id=comment['id']
content=comment['content']
create_at=datetime.fromtimestamp(float(comment['create_time']))
nick_name=comment['nick_name']
like_num=comment['like_num']
comment=Comment.get_or_create(
article, comment_id, content=content, create_at=create_at,
nick_name=nick_name, like_num=like_num)
comments.append(comment)
return comments
更新文章的函数如下:
def update_article(article, html_text, js_data):
soup=BeautifulSoup(html_text, 'lxml')
p_contents=soup.find(class_='rich_media_content').find_all('p')
content=[]
pictures={}
picture_count=1
for p_content in p_contents:
img=p_content.find('img')
if img is None:
content.append(p_content.text.encode('utf-8'))
else:
tag='<图片{}>'.format(picture_count)
content.append(tag)
pictures[tag]=img['data-src']
picture_count+=1
article.content='\n'.join(content)
article.pictures=pictures
article.comments=get_comments(js_data, article)
article.like_num=js_data['like_num']
article.read_num=js_data['read_num']
article.save()
return article
按照 13.2 节的思路,我们先创建保存文章结果的函数:
def save_article_result(article, queue=None, retry=0):
url=article.article_url
try:
html_text, js_data=fetch(url)
except RequestException:
retry+=1
if retry > 5:
queue.put(url)
return
return save_article_result(article, queue, retry)
return update_article(article, html_text, js_data)
其中参数 article 就是一个 Article 对象。
现在前期准备都完成了,接下来使用多进程和队列来更新 article 集合中存在的文章。多进程方式不再使用 Queue 模块,而是换成 multiprocessing.Queue 或者 multiprocess-ing.JoinableQueue。JoinableQueue 有 join 和 task_done 两个方法,所以适合存放统一的待执行任务,供并发进程获取。而 Queue 可以用来收集任务执行的结果:
import multiprocessing
def use_multiprocessing_with_queue():
queue=multiprocessing.JoinableQueue()
num_consumers=multiprocessing.cpu_count()*2
for article in Article.objects.all():
queue.put(article)
for_in range(num_consumers):
p=multiprocessing.Process(target=save_article_result_with_queue,
args=(queue,))
p.start()
queue.join()
通常计算密集型的程序使用的线程和进程数量一般与 CPU 核数一致,I/O 密集型的程序使用的线程和进程数量一般是 CPU 核数的 2 倍。也可以根据网络情况,使用 3 倍甚至更多的倍数,在实际工作中通过测试获取最佳值。
看一下消费者进程需要执行的函数:
from Queue import Empty
from mongoengine.connection import disconnect
from models import Article, lazy_connect
def save_article_result_with_queue(queue):
disconnect()
lazy_connect()
while 1:
try:
article=queue.get(timeout=1)
except Empty:
break
save_article_result(article, queue)
queue.task_done()
多进程队列为空时抛的异常是 Queue.Empty。这里用“先关闭 MongoDB 连接,再使用 lazy 模式连接”的原因,是为了解决 MongoDB 使用多进程时,调用 getaddrinfo 可能会发生的死锁(http://bit.ly/1sHScFY )的问题。
lazy_connect 在 models.py 中的定义如下:
from config import DB_HOST, DB_PORT, DATABASE_NAME
def lazy_connect():
connect(DATABASE_NAME, host=DB_HOST, port=DB_PORT)
lazy_connect()
在 13.1 节,演示了通过第二个队列存放第一个队列的执行结果,最后获得抓取总数的方法。
本节我们再简化一些,不用创建第二个队列就能获得全部修改后的 Article 模型:
def save_article_result_with_queue2(in_queue, out_queue):
while 1:
try:
article=in_queue.get(timeout=1)
except Empty:
break
updated_article=save_article_result(article, in_queue)
out_queue.put(updated_article) # 把更新好的文章对象放入 out_queue 这个队列中
in_queue.task_done()
使用两个队列的多进程主程序如下:
def use_multiprocessing_with_queue2():
queue=multiprocessing.JoinableQueue()
num_consumers=multiprocessing.cpu_count()*2
results_queue=multiprocessing.Queue()
for article in Article.objects.all()[5:8]:
queue.put(article)
for _in range(num_consumers):
p=multiprocessing.Process(target=save_article_result_with_queue2,
args=(queue, results_queue))
p.start()
queue.join()
results=[]
while 1:
try:
updated_article=results_queue.get(timeout=1)
except Empty:
break
results.append(updated_article)
print len(results)
放入队列的对象只要是可 pickle 化的就可以,所以 Article 对象可以作为消息放入队列。
如果文章页面是动态注入的,需要使用 PhantomJS 之类的工具模拟页面打开的过程,再从加载完毕的页面文件中进行解析。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论