返回介绍

使用多进程

发布于 2025-04-20 18:52:18 字数 9119 浏览 0 评论 0 收藏

使用进程代替线程可以有效避开 GIL,因为每个进程都拥有自己的 Python 解释器实例,也就不受 GIL 的限制了。计算密集型的任务通常应该使用多进程方式,也就是使用 multiprocessing 模块。multiprocessing 模块允许程序充分利用多处理器,并可以跨平台使用。I/O 密集型的任务瓶颈主要在网络延迟,所以使用多线程或者多进程都可以。

我们来实现解析微信文章页面的功能。首先从 Article 集合中找一篇微信公众号文章:

> db.article.find({'publisher':ObjectId("57317619bc1f551139de5b16")},{'title':1, '
     url':1, '_id':0}).pretty()
{
     "title":"Python Web 开发的十个框架",
     "url":"http://mp.weixin.qq.com/s?src=3&timestamp=1462859281&ver=1&
        signature=2qvUQsQ6Tzf13kTij4VZ4cULEA7t1XgK8B6Ny*FKurB*HMwoUzEFYP
        CQyBEOg0XbGraIQxSJAoc-F8SGifgLH8t2JnKMF6B9tNcV87C6llfPcyfR98qbtQ
        3J3KegBlGNo0sQYcGVKYRe9CMj6Pw5BOkjjlGC-tExRvECLLDThQs="
}

url 的值就是微信页面地址,微信公众号文章的页面上除了正文和配图,还展示了评论列表、阅读数和点赞数,这些内容不是 HTML 直接渲染的,而是通过 API 接口返回的,在 Chrome Developer Tools 中可以看到一个来至 http://mp.weixin.qq.com/mp/getcomment 的 API 请求,其中 src、timestamp、signature 和 ver 的参数值和 url 的是一样的,其他额外增加的参数除了空就是 0,只需要根据 url 构造一下就好了:

import urlparse
import urllib
 
COMMENT_JS_URL='http://mp.weixin.qq.com/mp/getcomment'
  
 
def gen_js_url(url):
    query_dct=urlparse.parse_qs(urlparse.urlsplit(url).query)
    query_dct={k:v[0] for k, v in query_dct.items()}
    query_dct.update({'uin':'', 'key':'', 'pass_ticket':'', 'wxtoken':'',
                      'devicetype':'', 'clientversion':0, 'x5':0})
    return '{}?{}'.format(COMMENT_JS_URL, urllib.urlencode(query_dct))

爬取页面的意义就是抓取核心内容,然后在其他地方以某种方式展示出来,文章一般都会有配图,我们这里使用到“占位”这种技巧,在将来展示的时候再替换掉。举个例子,原来的一段页面文本是这样的:

<p>Flask</p>
<p><img src="http://some_picture_path"></p>
<p><span style="font-size:15px;">Flask 是一个轻量级的 Web 应用框架,使用 Python 编写
    ...</span></p>

被抽取内容之后就会变成:

Flask
<图片 1>
Flask 是一个轻量级的 Web 应用框架,使用 Python 编写...

<图片 1>和图片源地址的关系存在模型中。

对 Article 扩充如下字段:

class Article(BaseModel):
    ...
    content=StringField()
    pictures=DictField()
    comments=ListField(ReferenceField(Comment))
    like_num=IntField()
    read_num=IntField()

其中 pictures 字段就存放了<图片 1>和图片源地址的对应关系。

评论也是一个独立的集合:

class Comment(BaseModel):
    nick_name=StringField(max_length=120)
    content=StringField(max_length=120, required=True)
    like_num=IntField()
    comment_id=IntField()
    article=ReferenceField('Article', dbref=True)
 
    meta={
        'collection':'comment',
        'indexes':[
            {'fields':['article', 'comment_id'],
            'unique':True}
        ]
    }

    @classmethod
    def get_or_create(cls, article, comment_id,**kwargs):
        comments=cls.objects.filter(article=article,
                                    comment_id=comment_id)
    if comments:
        return comments[0]
    comment=cls(article=article, comment_id=comment_id,**kwargs)
    comment.save()
    return comment

在 Article 中的 comments 字段使用了 ListField(ReferenceField(Comment)),Comment 中的 article 字段使用的却是 ReferenceField('Article', dbref=True),这是因为 Comment 是先定义的,那个时候程序还没有从上到下地获取到 Article 这个模型,所以需要使用字符串的 Article,然后以设置 dbref 为 True 的方式实现互相引用。

给 Comment 添加的 get_or_create 方法是为了传入字段返回 comment 对象,如没有对象则先创建再返回。

这次用到的 fetch 函数需要改了,因为之前都是抓取单个页面,现在是用一个会话连续抓取两个页面:

from simplejson.scanner import JSONDecodeError
 
 
def fetch(url):
    s=requests.Session()
    s.headers.update({'user-agent':get_user_agent()})
    proxies={
        'http':Proxy.get_random()['address'],
    }
    html_text=s.get(url, timeout=TIMEOUT, proxies=proxies).text
    js_url=gen_js_url(url)
    try:
        js_data=s.get(js_url, timeout=TIMEOUT, proxies=proxies).json()
    except JSONDecodeError:
        raise RequestException()
    return html_text, js_data

在获取 API 接口的时候特意捕获 JSONDecodeError 的错误,是因为 API 页面也有防爬策略。修改异常的错误类型是为了保证在外面调用 fetch 的时候只需要捕获 RequestException 一种异常就好了。

requests 使用了 simplejson,simplejson 其实就是标准库 json(Python 2.6 时加入)的外部开发版本,它更新很频繁且兼容 Python 2.5。使用上面的 API 返回内容测试一下:

In : %timeit-n 1000 json.dumps(api_result)
1000 loops, best of 3:29.7 □s per loop
In : %timeit-n 1000 simplejson.dumps(api_result)
1000 loops, best of 3:44.6 □s per loop
In : dumped=simplejson.dumps(api_result)

In : %timeit-n 1000 simplejson.loads(dumped)
1000 loops, best of 3:44.1 □s per loop
In : %timeit-n 1000 json.loads(dumped)
1000 loops, best of 3:86.1 □s per loop

可以看到 json 的 dump 操作更快,simplejson 的 loads 操作更快。

引用 json 最推荐采用如下的方式:

try:
    import simplejson as json
except ImportError:
    import json

获得文章评论列表的函数如下:

def get_comments(js_data, article):
    comments=[]
    for comment in js_data['comment']:
        comment_id=comment['id']
        content=comment['content']
        create_at=datetime.fromtimestamp(float(comment['create_time']))
        nick_name=comment['nick_name']
        like_num=comment['like_num']
        comment=Comment.get_or_create(
            article, comment_id, content=content, create_at=create_at,
            nick_name=nick_name, like_num=like_num)
        comments.append(comment)
    return comments

更新文章的函数如下:

def update_article(article, html_text, js_data):
    soup=BeautifulSoup(html_text, 'lxml')
    p_contents=soup.find(class_='rich_media_content').find_all('p')
    content=[]
    pictures={}
    picture_count=1
    for p_content in p_contents:
        img=p_content.find('img')
        if img is None:
            content.append(p_content.text.encode('utf-8'))
        else:
            tag='<图片{}>'.format(picture_count)
            content.append(tag)
            pictures[tag]=img['data-src']
            picture_count+=1
        article.content='\n'.join(content)
        article.pictures=pictures
        article.comments=get_comments(js_data, article)
        article.like_num=js_data['like_num']
        article.read_num=js_data['read_num']
        article.save()
        return article

按照 13.2 节的思路,我们先创建保存文章结果的函数:

def save_article_result(article, queue=None, retry=0):
    url=article.article_url
 
    try:
        html_text, js_data=fetch(url)
    except RequestException:
        retry+=1
        if retry > 5:
            queue.put(url)
            return
        return save_article_result(article, queue, retry)
    return update_article(article, html_text, js_data)

其中参数 article 就是一个 Article 对象。

现在前期准备都完成了,接下来使用多进程和队列来更新 article 集合中存在的文章。多进程方式不再使用 Queue 模块,而是换成 multiprocessing.Queue 或者 multiprocess-ing.JoinableQueue。JoinableQueue 有 join 和 task_done 两个方法,所以适合存放统一的待执行任务,供并发进程获取。而 Queue 可以用来收集任务执行的结果:

import multiprocessing
 

def use_multiprocessing_with_queue():
    queue=multiprocessing.JoinableQueue()
    num_consumers=multiprocessing.cpu_count()*2

    for article in Article.objects.all():
        queue.put(article)
 
    for_in range(num_consumers):
        p=multiprocessing.Process(target=save_article_result_with_queue,
                                 args=(queue,))
        p.start()
    queue.join()

通常计算密集型的程序使用的线程和进程数量一般与 CPU 核数一致,I/O 密集型的程序使用的线程和进程数量一般是 CPU 核数的 2 倍。也可以根据网络情况,使用 3 倍甚至更多的倍数,在实际工作中通过测试获取最佳值。

看一下消费者进程需要执行的函数:

from Queue import Empty

from mongoengine.connection import disconnect

from models import Article, lazy_connect
 

def save_article_result_with_queue(queue):
    disconnect()
    lazy_connect()
    while 1:
        try:
            article=queue.get(timeout=1)
        except Empty:
            break
        save_article_result(article, queue)
        queue.task_done()

多进程队列为空时抛的异常是 Queue.Empty。这里用“先关闭 MongoDB 连接,再使用 lazy 模式连接”的原因,是为了解决 MongoDB 使用多进程时,调用 getaddrinfo 可能会发生的死锁(http://bit.ly/1sHScFY )的问题。

lazy_connect 在 models.py 中的定义如下:
 
from config import DB_HOST, DB_PORT, DATABASE_NAME
 
 
def lazy_connect():
    connect(DATABASE_NAME, host=DB_HOST, port=DB_PORT)
lazy_connect()

在 13.1 节,演示了通过第二个队列存放第一个队列的执行结果,最后获得抓取总数的方法。

本节我们再简化一些,不用创建第二个队列就能获得全部修改后的 Article 模型:

def save_article_result_with_queue2(in_queue, out_queue):
    while 1:
        try:
            article=in_queue.get(timeout=1)
        except Empty:
            break
        updated_article=save_article_result(article, in_queue)
        out_queue.put(updated_article) # 把更新好的文章对象放入 out_queue 这个队列中
        in_queue.task_done()

使用两个队列的多进程主程序如下:

def use_multiprocessing_with_queue2():
    queue=multiprocessing.JoinableQueue()
    num_consumers=multiprocessing.cpu_count()*2
    results_queue=multiprocessing.Queue()
 
    for article in Article.objects.all()[5:8]:
        queue.put(article)
 
    for _in range(num_consumers):
        p=multiprocessing.Process(target=save_article_result_with_queue2,
                                 args=(queue, results_queue))
        p.start()

    queue.join()

    results=[]
 
    while 1:
        try:
            updated_article=results_queue.get(timeout=1)
        except Empty:
           break
        results.append(updated_article)
    print len(results)

放入队列的对象只要是可 pickle 化的就可以,所以 Article 对象可以作为消息放入队列。

如果文章页面是动态注入的,需要使用 PhantomJS 之类的工具模拟页面打开的过程,再从加载完毕的页面文件中进行解析。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。