深入理解 Celery
Celery 的依赖
Celery 依赖了相关的三个库:billiard、librabbitmq 和 kombu,这些库都由 Celery 的作者开发和维护。
billiard
billiard 是一个基于 Python 2.7 标准库多进程模块 multiprocessing 而改进的库,它主要用来提高性能和稳定性,也可以兼容还没有 multiprocessing 模块的早期 Python 版本。Celery 消费者的多进程模式称为 prefork。
librabbitmq
librabbitmq 是一个 C 语言实现的 Python 客户端。虽然之前我们介绍 RabbitMQ 的时候使用了 pika,而 Celery 也实现了 py-amqp 这个 Python 客户端,但是基于性能原因,请务必使用 librabbitmq,只有当 librabbitmq 不可用的时候才使用 py-amqp(和 librabbitmq 的接口一致)。我们来把之前的 pika 的例子移植成使用 librabbitmq。先看发布者(librabbitmq_producer.py):
import sys
from librabbitmq import Connection
connection=Connection(host='localhost', userid='dongwm',
password='123456', virtual_host='web_develop')
channel=connection.channel()
channel.exchange_declare('web_develop', 'direct',
passive=False, durable=True, auto_delete=False)
if len(sys.argv) !=1:
msg=sys.argv[1]
else:
msg='hah'
channel.basic_publish(msg, 'web_develop', 'xxx_routing_key')
connection.close()
再看消费者(librabbitmq_consumer.py):
from librabbitmq import Connection
connection=Connection(host='localhost', userid='dongwm',
password='123456', virtual_host='web_develop')
channel=connection.channel()
def on_message(message):
print("Body:'%s', Properties:'%s', DeliveryInfo:'%s'"%(
message.body, message.properties, message.delivery_info))
message.ack()
channel.exchange_declare('web_develop', 'direct',
passive=False, durable=True, auto_delete=False)
channel.queue_declare('standard', auto_delete=True)
channel.queue_bind('standard', 'web_develop', 'xxx_routing_key')
channel.basic_consume('standard', callback=on_message)
try:
while True:
connection.drain_events()
except KeyboardInterrupt:
exit(1)
kombu
kombu 是 Celery 自带的用来收发消息的库,它提供了符合 Python 语言习惯的、使用 AMQP 协议的高级接口。OpenStack 等项目都在使用它,原因如下:
- 支持非常多的消息代理。
- 支持对有效负荷数据的自动编码、序列化和压缩。
- 优雅地处理连接和通道错误。
- 提供多消息代理的一致性接口和异常处理方式,切换消息代理的代价很小。
把 pika 的例子替换为 kombu 的版本。先看发布者(kombu_producer.py):
import sys
from kombu import Connection, Exchange, Queue, Producer
web_exchange=Exchange('web_develop', 'direct', durable=True)
standard_queue=Queue('standard', exchange=web_exchange,
routing_key='web.develop')
URI='librabbitmq://dongwm:123456@localhost:5672/web_develop'
if len(sys.argv) !=1:
msg=sys.argv[1]
else:
msg='hah'
with Connection(URI) as connection:
producer=Producer(connection)
producer.publish(
msg, exchange=web_exchange, declare=[standard_queue],
routing_key='web.develop',
serializer='json', compression='zlib')
然后是消费者(kombu_consumer.py):
from kombu import Connection, Exchange, Queue, Consumer
from kombu.async import Hub
web_exchange=Exchange('web_develop', 'direct', durable=True)
standard_queue=Queue('standard', exchange=web_exchange,
routing_key='web.develop')
URI='librabbitmq://dongwm:123456@localhost:5672/web_develop'
hub=Hub()
def on_message(body, message):
print("Body:'%s', Properties:'%s', DeliveryInfo:'%s'"%(
body, message.properties, message.delivery_info))
message.ack()
with Connection(URI) as connection:
connection.register_with_event_loop(hub)
with Consumer(connection, standard_queue, callbacks=[on_message]):
try:
hub.run_forever()
except KeyboardInterrupt:
exit(1)
任务调用
之前我们调用任务都使用 delay 方法,格式如下:
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
delay 其实是 apply_async 的别名,还可以使用如下方法调用:
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1':'x', 'kwarg2':'y'})
二者的区别在于参数的表达方式,而且 apply_async 支持更多的参数。apply_async 支持的参数如下所述。
1.countdown:等待一段时间再执行。
add.apply_async((2, 2), countdown=5) # 5 秒后再执行
2.eta:ETA 是 estimated time of arrival 的缩写,也就是定义了任务的开始时间。countdown 相当于特殊的单位为秒的 eta。
add.apply_async((2, 2), eta=now+timedelta(seconds=10)) # 时间到了 now 加 10 秒,任务才开始
3.expires:设置超时时间。
add.apply_async((2, 2), expires=60) # 60 秒之后过期
4.retry:定义如果任务失败是否重试。
add.apply_async((2, 2), retry=False) # 失败了不会重试
5.retry_policy:重试策略。可以设置如表 9.3 所示的几个键。
表 9.3 可以设置的键及其含义
| 键 | 含义 |
| max_retries | 最大重试次数,默认为 3 次 |
| interval_start | 重试等待的间隔秒数,默认为 0,表示直接重试不等待 |
| interval_step | 每次重试让重试间隔增加的秒数,比如第 1 次重试间隔 0 秒,第 2 次重试的间隔就是 0.2 秒……可以是数字或者浮点数,默认值是 0.2 |
| interval_max | 重试间隔最大的秒数。也就是通过 interval_step 增大到多少秒之后就不再增加了。可以是数字或者浮点数,默认值是 0.2 |
使用 apply_async 也可以自定义发布者、交换机、路由键、队列、优先级(目前只支持 Redis 和 Beanstalk)、序列方案和压缩方法:
add.apply_async((2, 2), compression='zlib',
serializer='json', queue='priority.high',
routing_key='web.add', priority=0, exchange='web_exchange')
信号系统
Celery 支持 7 种信号类型:
1.任务信号。这个类型的信号最常用到,任务相关的信号有 8 种,如表 9.4 所示。
表 9.4 任务相关的信号及其含义
| 信号 | 含义 |
| before_task_publish | 任务发布前 |
| after_task_publish | 任务发布后 |
| task_prerun | 任务执行前 |
| task_postrun | 任务执行后 |
| task_retry | 任务重试时 |
| task_success | 任务完成时 |
| task_failure | 任务失败时 |
| task_revoked | 任务被撤销或者终止时 |
2.应用信号。
3.Worker 信号。
4.Beat 信号。
5.Eventlet 信号。
6.日志信号。
7.命令信号。
不同的信号参数格式不同,具体的格式可以参考官方文档(http://docs.celeryproject.org/en/latest/userguide/signals.html )。现在将 9.3 节的 proj 复制到 chapter9/section4 目录下,给 proj/celery.py 添加如下一段订阅 after_task_publish 信号的代码:
from celery.signals import after_task_publish
@after_task_publish.connect
def task_sent_handler(sender=None, body=None,**kwargs):
print 'after_task_publish:task_id:{body[id]};sender:{sender}'.format(
body=body, sender=sender)
现在执行 add 任务后会打印类似如下的信息:
after_task_publish:task_id:89c089d7-bcb5-4d7f-8281-f159bb8c1afa;sender:proj.tasks.
add
信号可以帮助我们了解任务执行情况,分析任务运行的瓶颈。
Worker 管理
之前通过如下命令启动 Worker:
celery -A proj worker -l info
这种方式不怎么好管理,因为每次关闭时都需要使用 Ctrl+C。Celery 提供 multi 这个子命令来管理。我们先使用 Daemon 的方式启动 Worker 进程:
> celery multi start web -A proj -l info--pidfile=/tmp/celery_%n.pid --logfile=/tmp/
celery_%n.log
web 是对项目启动的标识,之后的处理都使用这个标识来管理。--pidfile 和--logfile 使用了%n,这是对节点的格式化用法,支持的格式化参数如表 9.5 所示。
表 9.5 格式化参数及其含义
| 参数 | 含义 |
| %n | 只包含主机名 |
| %h | 包含域名的主机名 |
| %d | 只包含域名 |
| %i | Prefork 类型的进程索引,如果是主进程,则为 0 |
| %I | 带分隔符的 Prefork 类型的进程索引。假设主进程是 worker1,那么进程池的第一个进程则为 worker1-1 |
现在可以使用如下命令管理:
> celery multi show web # 查看 web 启动时的命令
/home/ubuntu/web_develop/venv/bin/python-m celery worker--detach-n web@WEB--pidfile
=web.pid--logfile=web.log--executable=/home/ubuntu/web_develop/venv/bin/python
> celery multi names web # 获取 web 的节点名字
web@WEB
> celery multi stop web # 停止 web 进程
celery multi v3.1.18 (Cipater)
>web@WEB:DOWN
> celery multi restart web # 重新启动 web
celery multi v3.1.18 (Cipater)
>web@WEB:DOWN
>Restarting node web@WEB:OK
> celery multi kill web # 杀掉 web 进程
celery multi v3.1.18 (Cipater)
Killing node web@WEB (23675)
监控和管理 Celery
Celery 提供了一些方便监控和管理的命令,常用的命令有如下 4 个。
1.shell:celery 也提供了一个 Python 交互环境,内置了 Celery 应用实例和全部已注册的任务,支持默认的 Python 解释器、IPython 和 BPython。
> celery shell-A proj In [1]:celery Out[1]:<Celery proj:0x7fad4aef8290> In [2]:add.delay(1, 2) # 可以直接使用而不需要 import
2.result:通过 task_id 在命令行获得执行结果。
> celery -A proj result eba7fc0b-3c16-4337-990c-d53a4c176a49 3
3.inspect active:列出当前正在执行的任务。
> celery-A proj inspect active
4.inspect stats:列出 Worker 的统计数据,常用来查看配置是否正确以及系统的使用情况。
> celery-A proj inspect stats
Celery 还可以撤销任务:
In : rs=add.delay(1, 2)
In : rs.revoke() # 只是撤销,如果任务已经在执行则撤销无效
In : rs.task_id
Out : 'd24a83e8-e985-48ef-adab-e779441e5557'
In : app.control.revoke('d24a83e8-e985-48ef-adab-e779441e5557') # 通过 task_id 撤销
In : app.control.revoke('d24a83e8-e985-48ef-adab-e779441e5557', terminate=True) # 撤销
正在执行的任务,默认使用 TERM 信号
In : app.control.revoke('d24a83e8-e985-48ef-adab-e779441e5557', terminate=True, signal
='SIGKILL') # 撤销正在执行的任务,使用 KILL 信号
In : app.control.revoke([
....:'d9078da5-9915-40a0-bfa1-392c7bde42ed'
....:'d24a83e8-e985-48ef-adab-e779441e5557'
....:'40fa44d1-d1b4-4e3d-9e9d-2cd8f6cfd676']) # 可以同时撤销多个任务
Worker 进程在内存中保存了撤销的任务,一重启就会丢失,如果需要这个历史记录持久化,可以添加--statedb 参数启动 Worker:
> celery -A proj worker -l info--statedb=/tmp/worker.state
Celery 官方推荐实时的 Web 监控工具 Flower(https://github.com/mher/flower ),它实现了如下特性:
- 可以看到任务历史、任务具体的参数、开始时间等。
- 提供图表和统计数据。
- 实现全面的远程控制功能,包括但不限于撤销/终止任务、关闭和重启 Worker 进程、查看正在运行的任务等。
- 提供一个 HTTP API,方便集成到运维系统中。
同时,Celery 也自带了一个事件监控工具显示任务历史等信息,可以用来检查任务和跟踪错误:
> celery-A projb events
需要注意是,要把设置 CELERY_SEND_TASK_SENT_EVENT 为 True 才可以获取事件。
事件监控工具默认每秒都会刷新终端,从消息代理以 celeryev 开头的队列中找到执行任务的历史记录,可以使用键盘按键查看任务信息、错误堆栈、执行结果和撤销任务等。
子任务
我们可以把任务通过签名的方法传给其他任务,成为一个子任务:
In : from celery import signature
In : task=signature('tasks.add', args=(2, 2), countdown=10)
In : task
Out : tasks.add(2, 2) # 通过签名生成了任务
In : task.apply_async()
Out : <AsyncResult:05fbccf3-092f-405f-8e16-28de911804f2>
还可以通过如下方式创建子任务:
In : from proj.tasks import add In : task=add.subtask((2, 2), countdown=10) # 可以使用 In : task.apply_async() Out : <AsyncResult:a402fe1d-ed11-48f8-b163-88d7f65ab0e3>
add.subtask 同样可以使用快捷方式 add.s(2, 2, countdown=10)。
子任务实现偏函数(Partial)的方式非常有用,这种方式可以让任务在传递过程中才传入参数。
In : partial=add.s(2) In : partial.apply_async((4,)) Out : <AsyncResult:b3aaa2ce-eddb-44eb-80fb-48ce5e36da00>
子任务支持如下 5 种原语来实现工作流。原语表示由若干条指令组成的,用于完成一定功能的过程。
1.chain:调用链。前面的执行结果作为参数传给后面的任务,直到全部完成。
In : from celery import chain
In : res=chain(add.s(2, 2), add.s(4), add.s(8))()
In : res.get()
chain 也可以使用管道(|):
In : (add.s(2, 2)|add.s(4)|add.s(8))().get()
Out : 16
2.group:一次创建多个(一组)任务。
In : from celery import group In : res=group(add.s(i, i) for i in range(10))() In : res.get() Out : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
3.chord:等待任务全部完成时添加一个回调任务。
In : res=chord((add.s(i, i) for i in range(10)), add.s(['a']))()
In : res.get() # 执行完前面的循环,把结果拼成一个列表之后,再对这个列表添
加'a'
Out : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, u'a']
4.map/starmap:每个参数都作为任务的参数执行一遍,map 的参数只有一个,starmap 支持的参数有多个。
In : ~add.starmap(zip(range(10), range(10)))
Out : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
它相当于:
@app.task
def temp():
return [add(i, i) for i in range(10)]
5.chunks:将任务分块。
In : res=add.chunks(zip(range(50), range(50)), 10)() In : res.get() Out : [[0, 2, 4, 6, 8, 10, 12, 14, 16, 18], [20, 22, 24, 26, 28, 30, 32, 34, 36, 38], [40, 42, 44, 46, 48, 50, 52, 54, 56, 58], [60, 62, 64, 66, 68, 70, 72, 74, 76, 78], [80, 82, 84, 86, 88, 90, 92, 94, 96, 98]]
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论