返回介绍

深入理解 Celery

发布于 2025-04-20 18:52:16 字数 11736 浏览 0 评论 0 收藏

Celery 的依赖

Celery 依赖了相关的三个库:billiard、librabbitmq 和 kombu,这些库都由 Celery 的作者开发和维护。

billiard

billiard 是一个基于 Python 2.7 标准库多进程模块 multiprocessing 而改进的库,它主要用来提高性能和稳定性,也可以兼容还没有 multiprocessing 模块的早期 Python 版本。Celery 消费者的多进程模式称为 prefork。

librabbitmq

librabbitmq 是一个 C 语言实现的 Python 客户端。虽然之前我们介绍 RabbitMQ 的时候使用了 pika,而 Celery 也实现了 py-amqp 这个 Python 客户端,但是基于性能原因,请务必使用 librabbitmq,只有当 librabbitmq 不可用的时候才使用 py-amqp(和 librabbitmq 的接口一致)。我们来把之前的 pika 的例子移植成使用 librabbitmq。先看发布者(librabbitmq_producer.py):

import sys
    
from librabbitmq import Connection
    
connection=Connection(host='localhost', userid='dongwm',
                     password='123456', virtual_host='web_develop')
channel=connection.channel()
     
channel.exchange_declare('web_develop', 'direct',
                        passive=False, durable=True, auto_delete=False)
if len(sys.argv) !=1:
   msg=sys.argv[1]
    
else:
   msg='hah'
     
channel.basic_publish(msg, 'web_develop', 'xxx_routing_key')
     
connection.close()
     
再看消费者(librabbitmq_consumer.py):
     
from librabbitmq import Connection
     
connection=Connection(host='localhost', userid='dongwm',
                      password='123456', virtual_host='web_develop')
     
channel=connection.channel()
     
     
def on_message(message):
    print("Body:'%s', Properties:'%s', DeliveryInfo:'%s'"%(
        message.body, message.properties, message.delivery_info))
    message.ack()
     
     
channel.exchange_declare('web_develop', 'direct',
                        passive=False, durable=True, auto_delete=False)
     
channel.queue_declare('standard', auto_delete=True)
channel.queue_bind('standard', 'web_develop', 'xxx_routing_key')
channel.basic_consume('standard', callback=on_message)
     
     
try:
    while True:
        connection.drain_events()
except KeyboardInterrupt:
    exit(1)

kombu

kombu 是 Celery 自带的用来收发消息的库,它提供了符合 Python 语言习惯的、使用 AMQP 协议的高级接口。OpenStack 等项目都在使用它,原因如下:

  • 支持非常多的消息代理。
  • 支持对有效负荷数据的自动编码、序列化和压缩。
  • 优雅地处理连接和通道错误。
  • 提供多消息代理的一致性接口和异常处理方式,切换消息代理的代价很小。

把 pika 的例子替换为 kombu 的版本。先看发布者(kombu_producer.py):

import sys
    
from kombu import Connection, Exchange, Queue, Producer
    
web_exchange=Exchange('web_develop', 'direct', durable=True)
standard_queue=Queue('standard', exchange=web_exchange,
                    routing_key='web.develop')
URI='librabbitmq://dongwm:123456@localhost:5672/web_develop'
    
if len(sys.argv) !=1:
   msg=sys.argv[1]
else:
   msg='hah'
    
with Connection(URI) as connection:
    producer=Producer(connection)
    producer.publish(
        msg, exchange=web_exchange, declare=[standard_queue],
        routing_key='web.develop',
        serializer='json', compression='zlib')

然后是消费者(kombu_consumer.py):

from kombu import Connection, Exchange, Queue, Consumer
from kombu.async import Hub
    
web_exchange=Exchange('web_develop', 'direct', durable=True)
standard_queue=Queue('standard', exchange=web_exchange,
                    routing_key='web.develop')
    
URI='librabbitmq://dongwm:123456@localhost:5672/web_develop'
hub=Hub()
    
    
def on_message(body, message):
    print("Body:'%s', Properties:'%s', DeliveryInfo:'%s'"%(
        body, message.properties, message.delivery_info))
    message.ack()
    
    
with Connection(URI) as connection:
    connection.register_with_event_loop(hub)
with Consumer(connection, standard_queue, callbacks=[on_message]):
    try:
        hub.run_forever()
    except KeyboardInterrupt:
        exit(1)

任务调用

之前我们调用任务都使用 delay 方法,格式如下:

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

delay 其实是 apply_async 的别名,还可以使用如下方法调用:

task.apply_async(args=[arg1, arg2], kwargs={'kwarg1':'x', 'kwarg2':'y'})

二者的区别在于参数的表达方式,而且 apply_async 支持更多的参数。apply_async 支持的参数如下所述。

1.countdown:等待一段时间再执行。

add.apply_async((2, 2), countdown=5) # 5 秒后再执行

2.eta:ETA 是 estimated time of arrival 的缩写,也就是定义了任务的开始时间。countdown 相当于特殊的单位为秒的 eta。

add.apply_async((2, 2), eta=now+timedelta(seconds=10)) # 时间到了 now 加 10 秒,任务才开始

3.expires:设置超时时间。

add.apply_async((2, 2), expires=60) # 60 秒之后过期

4.retry:定义如果任务失败是否重试。

add.apply_async((2, 2), retry=False) # 失败了不会重试

5.retry_policy:重试策略。可以设置如表 9.3 所示的几个键。

表 9.3 可以设置的键及其含义

含义
max_retries最大重试次数,默认为 3 次
interval_start重试等待的间隔秒数,默认为 0,表示直接重试不等待
interval_step每次重试让重试间隔增加的秒数,比如第 1 次重试间隔 0 秒,第 2 次重试的间隔就是 0.2 秒……可以是数字或者浮点数,默认值是 0.2
interval_max重试间隔最大的秒数。也就是通过 interval_step 增大到多少秒之后就不再增加了。可以是数字或者浮点数,默认值是 0.2

使用 apply_async 也可以自定义发布者、交换机、路由键、队列、优先级(目前只支持 Redis 和 Beanstalk)、序列方案和压缩方法:

add.apply_async((2, 2), compression='zlib',
               serializer='json', queue='priority.high',
               routing_key='web.add', priority=0, exchange='web_exchange')

信号系统

Celery 支持 7 种信号类型:

1.任务信号。这个类型的信号最常用到,任务相关的信号有 8 种,如表 9.4 所示。

表 9.4 任务相关的信号及其含义

信号含义
before_task_publish任务发布前
after_task_publish任务发布后
task_prerun任务执行前
task_postrun任务执行后
task_retry任务重试时
task_success任务完成时
task_failure任务失败时
task_revoked任务被撤销或者终止时

2.应用信号。

3.Worker 信号。

4.Beat 信号。

5.Eventlet 信号。

6.日志信号。

7.命令信号。

不同的信号参数格式不同,具体的格式可以参考官方文档(http://docs.celeryproject.org/en/latest/userguide/signals.html )。现在将 9.3 节的 proj 复制到 chapter9/section4 目录下,给 proj/celery.py 添加如下一段订阅 after_task_publish 信号的代码:

from celery.signals import after_task_publish
     
@after_task_publish.connect
def task_sent_handler(sender=None, body=None,**kwargs):
    print 'after_task_publish:task_id:{body[id]};sender:{sender}'.format(
        body=body, sender=sender)

现在执行 add 任务后会打印类似如下的信息:

after_task_publish:task_id:89c089d7-bcb5-4d7f-8281-f159bb8c1afa;sender:proj.tasks.
    add

信号可以帮助我们了解任务执行情况,分析任务运行的瓶颈。

Worker 管理

之前通过如下命令启动 Worker:

celery -A proj worker -l info

这种方式不怎么好管理,因为每次关闭时都需要使用 Ctrl+C。Celery 提供 multi 这个子命令来管理。我们先使用 Daemon 的方式启动 Worker 进程:

> celery multi start web -A proj -l info--pidfile=/tmp/celery_%n.pid --logfile=/tmp/
     celery_%n.log

web 是对项目启动的标识,之后的处理都使用这个标识来管理。--pidfile 和--logfile 使用了%n,这是对节点的格式化用法,支持的格式化参数如表 9.5 所示。

表 9.5 格式化参数及其含义

参数含义
%n只包含主机名
%h包含域名的主机名
%d只包含域名
%iPrefork 类型的进程索引,如果是主进程,则为 0
%I带分隔符的 Prefork 类型的进程索引。假设主进程是 worker1,那么进程池的第一个进程则为 worker1-1

现在可以使用如下命令管理:

> celery multi show web # 查看 web 启动时的命令
/home/ubuntu/web_develop/venv/bin/python-m celery worker--detach-n web@WEB--pidfile
     =web.pid--logfile=web.log--executable=/home/ubuntu/web_develop/venv/bin/python
> celery multi names web # 获取 web 的节点名字
web@WEB
> celery multi stop web # 停止 web 进程
celery multi v3.1.18 (Cipater)
>web@WEB:DOWN
> celery multi restart web # 重新启动 web
celery multi v3.1.18 (Cipater)
>web@WEB:DOWN
>Restarting node web@WEB:OK
> celery multi kill web # 杀掉 web 进程
celery multi v3.1.18 (Cipater)
Killing node web@WEB (23675)

监控和管理 Celery

Celery 提供了一些方便监控和管理的命令,常用的命令有如下 4 个。

1.shell:celery 也提供了一个 Python 交互环境,内置了 Celery 应用实例和全部已注册的任务,支持默认的 Python 解释器、IPython 和 BPython。

> celery shell-A proj
In [1]:celery
Out[1]:<Celery proj:0x7fad4aef8290>
In [2]:add.delay(1, 2) # 可以直接使用而不需要 import

2.result:通过 task_id 在命令行获得执行结果。

> celery -A proj result eba7fc0b-3c16-4337-990c-d53a4c176a49
3

3.inspect active:列出当前正在执行的任务。

> celery-A proj inspect active

4.inspect stats:列出 Worker 的统计数据,常用来查看配置是否正确以及系统的使用情况。

> celery-A proj inspect stats

Celery 还可以撤销任务:

In : rs=add.delay(1, 2)
In : rs.revoke() # 只是撤销,如果任务已经在执行则撤销无效
In : rs.task_id
Out : 'd24a83e8-e985-48ef-adab-e779441e5557'
In : app.control.revoke('d24a83e8-e985-48ef-adab-e779441e5557') # 通过 task_id 撤销
In : app.control.revoke('d24a83e8-e985-48ef-adab-e779441e5557', terminate=True) # 撤销
     正在执行的任务,默认使用 TERM 信号
In : app.control.revoke('d24a83e8-e985-48ef-adab-e779441e5557', terminate=True, signal
     ='SIGKILL') # 撤销正在执行的任务,使用 KILL 信号
In : app.control.revoke([
....:'d9078da5-9915-40a0-bfa1-392c7bde42ed'
....:'d24a83e8-e985-48ef-adab-e779441e5557'
....:'40fa44d1-d1b4-4e3d-9e9d-2cd8f6cfd676']) # 可以同时撤销多个任务

Worker 进程在内存中保存了撤销的任务,一重启就会丢失,如果需要这个历史记录持久化,可以添加--statedb 参数启动 Worker:

> celery -A proj worker -l info--statedb=/tmp/worker.state

Celery 官方推荐实时的 Web 监控工具 Flower(https://github.com/mher/flower ),它实现了如下特性:

  • 可以看到任务历史、任务具体的参数、开始时间等。
  • 提供图表和统计数据。
  • 实现全面的远程控制功能,包括但不限于撤销/终止任务、关闭和重启 Worker 进程、查看正在运行的任务等。
  • 提供一个 HTTP API,方便集成到运维系统中。

同时,Celery 也自带了一个事件监控工具显示任务历史等信息,可以用来检查任务和跟踪错误:

> celery-A projb events

需要注意是,要把设置 CELERY_SEND_TASK_SENT_EVENT 为 True 才可以获取事件。

事件监控工具默认每秒都会刷新终端,从消息代理以 celeryev 开头的队列中找到执行任务的历史记录,可以使用键盘按键查看任务信息、错误堆栈、执行结果和撤销任务等。

子任务

我们可以把任务通过签名的方法传给其他任务,成为一个子任务:

In : from celery import signature
In : task=signature('tasks.add', args=(2, 2), countdown=10)
In : task
Out : tasks.add(2, 2) # 通过签名生成了任务
In : task.apply_async()
Out : <AsyncResult:05fbccf3-092f-405f-8e16-28de911804f2>

还可以通过如下方式创建子任务:

In : from proj.tasks import add
In : task=add.subtask((2, 2), countdown=10) # 可以使用
In : task.apply_async()
Out : <AsyncResult:a402fe1d-ed11-48f8-b163-88d7f65ab0e3>

add.subtask 同样可以使用快捷方式 add.s(2, 2, countdown=10)。

子任务实现偏函数(Partial)的方式非常有用,这种方式可以让任务在传递过程中才传入参数。

In : partial=add.s(2)
In : partial.apply_async((4,))
Out : <AsyncResult:b3aaa2ce-eddb-44eb-80fb-48ce5e36da00>

子任务支持如下 5 种原语来实现工作流。原语表示由若干条指令组成的,用于完成一定功能的过程。

1.chain:调用链。前面的执行结果作为参数传给后面的任务,直到全部完成。

In : from celery import chain
In : res=chain(add.s(2, 2), add.s(4), add.s(8))()
In : res.get()
    
chain 也可以使用管道(|):
    
In : (add.s(2, 2)|add.s(4)|add.s(8))().get()
Out : 16

2.group:一次创建多个(一组)任务。

In : from celery import group
In : res=group(add.s(i, i) for i in range(10))()
In : res.get()
Out : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

3.chord:等待任务全部完成时添加一个回调任务。

In : res=chord((add.s(i, i) for i in range(10)), add.s(['a']))()
In : res.get() # 执行完前面的循环,把结果拼成一个列表之后,再对这个列表添
     加'a'
Out : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, u'a']

4.map/starmap:每个参数都作为任务的参数执行一遍,map 的参数只有一个,starmap 支持的参数有多个。

In : ~add.starmap(zip(range(10), range(10)))
Out : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    
它相当于:
    
@app.task
def temp():
    return [add(i, i) for i in range(10)]

5.chunks:将任务分块。

In : res=add.chunks(zip(range(50), range(50)), 10)()
In : res.get()
Out : 
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
 [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
 [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
 [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
 [80, 82, 84, 86, 88, 90, 92, 94, 96, 98]]

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。