返回介绍

进阶篇:Celery 最佳实践

发布于 2025-04-20 18:52:16 字数 3696 浏览 0 评论 0 收藏

使用自动扩展

多进程和 Gevent 模式的 Worker 支持自动扩展,通过--autoscale 参数就可以实现:

> celery -A proj worker -l info--autoscale=6,3

--autoscale 参数接受 2 个数字。在上面的句子中,“--autoscale=6,3”表示进程池平时保持 3 个进程,最大并发进程数可以达到 6 个。这里的取值和你的服务器 CPU 个数、任务闲忙程度、可用内存等指标有关。笔者个人的习惯是将最大并发数设置为 CPU 个数的 2 倍。理论上 Worker 进程数与 CPU 个数接近即可,但是也不尽然,它还与你的任务类型有关。比如笔者曾经遇到过的一个产品线的 Worker 数就比 CPU 数高很多,原因是每天上千万的任务中有大量的小任务,也有耗时比较久的任务,而且添加的任务会有瞬时高峰,比如每小时的整点、偶数小时的整点等。这很容易造成多个消费者在处理耗时的任务,让整体的及时性受到影响,而使用自动扩展可以比较好地解决这个问题。

善用远程 Debug

Celery 支持远程使用 pdb 调试任务,非常方便。

先添加一个任务:

from celery.contrib import rdb
     
@app.task
def sub(x, y):
    result=x-y
    rdb.set_trace() # 设置断点
    return result

重启 Worker 进程之后,使用 IPython 发布任务:

In : from proj.tasks import sub
In : sub(2, 1)
Remote Debugger:6899:Please telnet into 127.0.0.1 6899.
    
Type`exit`in session to continue.
    
Remote Debugger:6899:Waiting for client...

这个时候我们开启一个新的终端,使用 telnet 连接到 6899 端口:

> telnet localhost 6899
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
> /home/ubuntu/web_develop/chapter9/section4/proj/tasks.py(27)sub()
-> return result
(Pdb) result
1
(Pdb) x
2
(Pdb) continue
Connection closed by foreign host.

通过这样的方式,就不用在本地搭建 Celery 环境模拟任务失败的环境了。

合理安排任务周期

如果项目中有很多调度模式的任务,就要合理安排这些定时任务的执行时间。很多人会偷懒,随便选择一些整点和凌晨期间作为任务执行时间。但是需要注意如下问题:

1.需要和系统管理员和数据库管理员确认,确保你选择的时间段里面没有一些不合时宜的时间点。因为通常在凌晨,尤其是过了 12 点之后也是大量定时任务运行的时间,比如数据备份、生成前一天的数据报表这样的任务,如果选在凌晨执行你的任务,既可能让你的任务失败,也可能由于对数据库的访问压力让你和别人的任务花费更多的时间。

2.选择非整点的任务执行周期。根据任务特性,可以把任务执行时间打散,尽量减少每小时里前 45 分钟 Celery 集群非常忙碌,后 15 分钟集群大部分空闲的情况。

3.合理安排数据库、文件系统写任务。笔者的方式是把占用时间的数据库和文件系统写操作等任务安排在访问低峰期,并把写任务拆分后在分散的时间执行。

合理使用队列和优先级

不要把任务都放在默认队列。合理安排任务的优先级和队列,让应该及时完成的任务不会因为某些原因被阻塞而影响用户体验。其次是合理使用 apply_async 方法临时性地切换队列和优先级,提高整体的响应。

如果能明确知晓某些任务耗时很长,某些任务耗时很短,至少可以按照任务花费的时间来把任务安排到不同的队列。

保证业务逻辑的事务性

Celery 虽然提供错误重试机制,但是并没有提供任务的事务性。如果任务一部分执行成功之后失败,执行成功的那部分是没有回滚方法的,所以一开始就要在实现逻辑的时候对重试机制有明确的理解。

关闭你不想要的功能

如果你对任务的执行结果没有兴趣,可以关闭它:

@app.task(ignore_result=True)
def mytask(…):
    doit()

如果项目不需要限速,可以设置“CELERY_DISABLE_RATE_LIMITS=True”直接全局地把这个特性关闭。

使用阅后即焚模式

在使用队列的时候,默认使用持久化方式以确保任务被执行,如果你的任务不需要持久化,可以使用阅后即焚(transient)模式:

from kombu import Queue
Queue('transient', routing_key='transient',
     delivery_mode=1)

善用 Prefetch 模式

Worker 进程默认每次从消息代理获取 CELERYD_PREFETCH_MULTIPLIER 设置的任务数,假如你的任务都很细小,可以修改 CELERYD_PREFETCH_MULTIPLIER 的值,每次获取更多的任务数。这个值也是根据任务的执行情况决定的。

善用工作流

如果一个任务有调用链,下一步任务需要等待上一步的结果,就不应该使用同步子任务。举一个抓取某电商网站的爬虫例子,它一般分为以下几步:

1.获取要抓取的页面。

2.抓取对应页面。

3.解析页面数据。

4.把需要的数据存储到数据库和缓存中。

如果设计成下面的方式就是错误的:

@app.task
def page_crawler():
    url=get_url.delay().get()
    page=fetch_page.delay(url).get()
    info=parse_page.delay(page).get()
    store_page_info.delay(info)
    
@app.task
def get_url():
    return PageInfo.objects.filter_by(need_upate=True).first()
    
@app.task
def fetch_page(url):
    return requests.get(url)
    
@app.task
def parse_page(page):
    return myparser.parse_document(page)
     
@app.task
def store_page_info(info):
    return PageInfo.objects.create(info)

应该使用如下方式:

def page_crawler():
    # get_url -> fetch_page -> parse_page -> store_page
    chain=get_url.s() | fetch_page.s() | parse_page.s() | store_page_info.s()
    chain()

控制任务的粒度不要太细。举个例子,假设现在每小时都要通过某网站的开放平台获取对应的百万级条目的数据。如果为每个条目都创建一个任务显然是不合理的,应该更多地利用开放平台接口的限制。比如接口可以批量操作,一次最多取 30 个条目的数据,那么可以考虑把单个任务定为处理 30 个条目的数据。

在生成任务的时候应该充分利用 group/chain/chunks 这些原语。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。