使用 Celery
Celery 是一个专注于实时处理和任务调度的分布式任务队列。所谓任务就是消息,消息中的有效载荷中包含要执行任务需要的全部数据。
使用 Celery 的常见场景如下:
- Web 应用。当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给 Celery 去异步执行,执行完再返回给用户。这段时间用户不需要等待,提高了网站的整体吞吐量和响应时间。
- 定时任务。生产环境经常会跑一些定时任务。假如你有上千台的服务器、上千种任务,定时任务的管理很困难,Celery 可以帮助我们快速在不同的机器设定不同种任务。
- 其他可以异步执行的任务。为了充分提高网站性能,对于请求和响应之外的那些不要求必须同步完成的附加工作都可以异步完成。比如发送短信/邮件、推送消息、清理/设置缓存等。
Celery 还提供了如下的特性:
- 方便地查看定时任务的执行情况,比如执行是否成功、当前状态、执行任务花费的时间等。
- 可以使用功能齐备的管理后台或者命令行添加、更新、删除任务。
- 方便把任务和配置管理相关联。
- 可选多进程、Eventlet 和 Gevent 三种模式并发执行。
- 提供错误处理机制。
- 提供多种任务原语,方便实现任务分组、拆分和调用链。
- 支持多种消息代理和存储后端。
Celery 的架构
Celery 包含如下组件。
- Celery Beat:任务调度器,Beat 进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
- Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
- Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。
- Producer:调用了 Celery 提供的 API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
- Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery 默认已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。
Celery 的架构图如图 9.3 所示。
图 9.3 Celery 的架构图
选择消息代理
Celery 目前支持 RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper 等作为消息代理,但适用于生产环境的只有 RabbitMQ 和 Redis,至于其他的方式,一是支持有限,二是可能得不到更好的技术支持。
Celery 官方推荐的是 RabbitMQ,Celery 的作者 Ask Solem Hoel 最初在 VMware 就是为 Rab-bitMQ 工作的,Celery 最初的设计就是基于 RabbitMQ,所以使用 RabbitMQ 会非常稳定,成功案例很多。如果使用 Redis,则需要能接受发生突然断电之类的问题造成 Redis 突然终止后的数据丢失等后果。
Celery 序列化
在客户端和消费者之间传输数据需要序列化和反序列化,Celery 支持如表 9.2 所示的序列化方案。
表 9.2 Celery 支持的序列化方案
方案 | 说明 |
pickle | pickle 是 Python 标准库中的一个模块,支持 Python 内置的数据结构,但是它是 Python 的专有协议。从 Celery 3.2 开始,由于安全性等原因 Celery 将拒绝 pickle 这个方案 |
json | json 支持多种语言,可用于跨语言方案 |
yaml | yaml 的表达能力更强,支持的数据类型比 json 多,但是 Python 客户端的性能不如 JSON |
msgpack | msgpack 是一个二进制的类 json 的序列化方案,但是比 json 的数据结构更小、更快 |
安装配置 Celery
为了提供更高的性能,我们选择如下方案:
- 选择 RabbitMQ 作为消息代理。
- RabbitMQ 的 Python 客户端选择 librabbitmq 这个 C 库。
- 选择 Msgpack 做序列化。
- 选择 Redis 做结果存储。
下面先安装它们。Celery 提供 bundles 的方式,也就是安装 Celery 的同时可以一起安装多种依赖:
> pip install"celery[librabbitmq,redis,msgpack]"
bundles 的原理是在 setup.py 的 setup 函数中添加 extras_require。
从一个简单的例子开始
先演示一个简单的项目让 Celery 运行起来。项目的目录结构如下:
先看一下主程序 celery.py:
from__future__import absolute_import from celery import Celery app=Celery('proj', include=['proj.tasks']) app.config_from_object('proj.celeryconfig') if__name__=='__main__': app.start()
解析一下这个程序:
- “from__future__import absolute_import”是拒绝隐式引入,因为 celery.py 的名字和 celery 的包名冲突,需要使用这条语句让程序正确地运行。
- app 是 Celery 类的实例,创建的时候添加了 proj.tasks 这个模块,也就是包含了 proj/tasks.py 这个文件。
- 把 Celery 配置存放进 proj/celeryconfig.py 文件,使用 app.config_from_object 加载配置。
看一下存放任务函数的文件 tasks.py:
from__future__import absolute_import from proj.celery import app @app.task def add(x, y): return x+y
tasks.py 只有一个任务函数 add,让它生效的最直接的方法就是添加 app.task 这个装饰器。
看一下我们的配置文件 celeryconfig.py:
BROKER_URL='amqp://dongwm:123456@localhost:5672/web_develop' # 使用 RabbitMQ 作为消息 代理 CELERY_RESULT_BACKEND='redis://localhost:6379/0' # 把任务结果存在了 Redis CELERY_TASK_SERIALIZER='msgpack' # 任务序列化和反序列化使用 msgpack 方案 CELERY_RESULT_SERIALIZER='json' # 读取任务结果一般性能要求不高,所以使用了可读 性更好的 JSON CELERY_TASK_RESULT_EXPIRES=60*60*24 # 任务过期时间,不建议直接写 86400,应该让 这样的 magic 数字表述更明显 CELERY_ACCEPT_CONTENT=['json', 'msgpack'] # 指定接受的内容类型
这个例子中没有任务调度相关的内容,所以只需要启动消费者:
> cd ~/web_develop/chapter9/section3 > celery -A proj worker -l info
-A 参数默认会寻找 proj.celery 这个模块,其实使用 celery 作为模块文件名字不怎么合理。可以使用其他名字。举个例子,假如是 proj/app.py,可以使用如下命令启动:
> celery -A proj.app worker -l info
如果看到如下的启动信息,就说明 worker 服务运行起来了:
--------------celery@WEB v3.1.18 (Cipater) ----****----- ---*****--Linux-4.4.0-22-generic-x86_64-with-Ubuntu-16.04-xenial --*-****--- -**----------[config] -**----------.>app: proj:0x7f444c1b01d0 -**----------.>transport: amqp://dongwm:**@localhost:5672/web_develop -**----------.>results: redis://localhost:6379/0 -***---*---.>concurrency: 1 (prefork) --*******---- ---*****-----[queues] --------------.>celery exchange=celery(direct) key=celery [tasks] . proj.tasks.add [2016-06-03 13:22:50,368:INFO/MainProcess] Connected to amqp://dongwm:**@localhost :5672/web_develop [2016-06-03 13:22:50,381:INFO/MainProcess] mingle:searching for neighbors [2016-06-03 13:22:51,392:INFO/MainProcess] mingle:all alone [2016-06-03 13:22:51,629:WARNING/MainProcess] celery@WEB ready.
上述信息提供了一些有帮助的内容,如消息代理和存储结果的地址、并发数量、任务列表、交换类型等。在对 Celery 不熟悉的时候可以通过如上信息判断设置和修改是否已生效。
现在开启另外一个终端,用 IPython 调用 add 函数:
In : from proj.tasks import add In : r=add.delay(1, 3) In : r Out : <AsyncResult:93288a00-94ee-4727-b815-53dc3474cf3f> In : r.result Out : 4 In : r.status Out : u'SUCCESS' In : r.successful() Out : True In : r.backend Out : <celery.backends.redis.RedisBackend at 0x7fb2529500d0> # 保存在 Redis 中
可以看到 worker 的终端上显示执行了任务:
[2016-06-03 13:34:40,749:INFO/MainProcess] Received task:proj.tasks.add[93288a00-94ee -4727-b815-53dc3474cf3f] [2016-06-03 13:34:40,755:INFO/MainProcess] Task proj.tasks.add[93288a00-94ee-4727-b815 -53dc3474cf3f] succeeded in 0.00511166098295s:4
通过 IPython 触发的任务就完成了。任务的结果都需要根据上面提到的 task_id 获得,我们还可以用如下两种方式随时找到这个结果:
task_id='93288a00-94ee-4727-b815-53dc3474cf3f' In : add.AsyncResult(task_id).get() Out : 4 或者: In : from celery.result import AsyncResult In : AsyncResult(task_id).get() Out : 4
指定队列
Celery 非常容易设置和运行,通常它会使用默认的名为 celery 的队列(可以通过 CEL-ERY_DEFAULT_QUEUE 修改)用来存放任务。我们可以使用优先级不同的队列来确保高优先级的任务不需要等待就得到响应。
基于 proj 目录下的源码,我们创建一个 projq 目录,并对 projq/celeryconfig.py 添加如下配置:
from kombu import Queue CELERY_QUEUES=( # 定义任务队列 Queue('default', routing_key='task.#'), # 路由键以“task.”开头的消息都进 default 队列 Queue('web_tasks', routing_key='web.#'), # 路由键以“web.”开头的消息都进 web_tasks 队列 ) CELERY_DEFAULT_EXCHANGE='tasks' # 默认的交换机名字为 tasks CELERY_DEFAULT_EXCHANGE_TYPE='topic' # 默认的交换类型是 topic CELERY_DEFAULT_ROUTING_KEY='task.default' # 默认的路由键是 task.default,这个路由 键符合上面的 default 队列 CELERY_ROUTES={ 'projq.tasks.add':{ # tasks.add 的消息会进入 web_tasks 队列 'queue':'web_tasks', 'routing_key':'web.add', } }
现在用指定队列的方式启动消费者进程:
> celery -A projq worker -Q web_tasks -l info
上述 worker 只会执行 web_tasks 中的任务,我们可以合理安排消费者数量,让 web_tasks 中任务的优先级更高。
使用任务调度
之前的例子都是由发布者触发的,本节展示一下使用 Celery 的 Beat 进程自动生成任务。基于 proj 目录下的源码,创建一个 projb 目录,对 projb/celeryconfig.py 添加如下配置:
CELERYBEAT_SCHEDULE={ 'add':{ 'task':'projb.tasks.add', 'schedule':timedelta(seconds=10), 'args':(16, 16) } }
CELERYBEAT_SCHEDULE 中指定了 tasks.add 这个任务每 10 秒跑一次,执行的时候的参数是 16 和 16。
启动 Beat 程序:
> celery beat -A projb
然后启动 Worker 进程:
> celery -A projb worker -l info
之后可以看到每 10 秒都会自动执行一次 tasks.add。
Beat 和 Worker 进程可以一并启动:
> celery -B -A projb worker -l info
使用 Django 可以通过 django-celery 实现在管理后台创建、删除、更新任务,是因为它使用了自定义的调度类 djcelery.schedulers.DatabaseScheduler,我们可以参考它实现 Flask 或者其他 Web 框架的管理后台来完成同样的功能。使用自定义调度类还可以实现动态添加任务。
任务绑定、记录日志和重试
任务绑定、记录日志和重试是 Celery 常用的 3 个高级属性。现在修改 proj/tasks.py 文件,添加 div 函数用于演示:
from celery.utils.log import get_task_logger logger=get_task_logger(__name__) @app.task(bind=True) def div(self, x, y): logger.info(('Executing task id{0.id}, args:{0.args!r}' 'kwargs:{0.kwargs!r}').format(self.request)) try: result=x/y except ZeroDivisionError as e: raise self.retry(exc=e, countdown=5, max_retries=3) return result
当使用 bind=True 后,函数的参数发生变化,多出了参数 self(第一个参数),相当于把 div 变成了一个已绑定的方法,通过 self 可以获得任务的上下文。
在 IPython 中调用 div:
In : from proj.tasks import div In : r=div.delay(2, 1)
可以看到如下执行信息:
[2016-06-03 15:50:31,853:INFO/Worker-1] proj.tasks.div[1da82fb8-20de-4d5a-9b48-045 da6db0cda]:Executing task id 1da82fb8-20de-4d5a-9b48-045da6db0cda, args:[2, 1] kwargs:{}
换成能造成异常的参数:
In : r=div.delay(2, 0)
可以发现每 5 秒就会重试一次,一共重试 3 次(默认重复 3 次),然后抛出异常。
在 Flask 应用中使用 Celery
在 Web 应用中,用户请求页面发布任务,交由 Celery 后端处理。一种是把任务发布然后请求继续进行,响应不需要获知任务的执行情况;另外一种是需要实时把任务的执行过程反馈到用户的浏览器上。本节将演示这两种任务的处理方式。
Socket.IO 是一个支持 WebSocket 协议,面向实时 Web 应用的 JavaScript 库。实现了浏览器与服务器之间的双向通信。它有两个部分:
- 在浏览器中运行的客户端库。这个库由 Socket.IO 官方提供。
- Python 实现的服务端库。
为了简化代码,我们使用 Flask-SocketIO 这个扩展,首先安装它:
> pip install Flask-SocketIO
Celery 默认使用多进程的方式运行 Worker 进程,这个例子将使用 eventlet 的方法运行 Worker 以及 SocketIO。首先需要引入依赖包和设置(celery_socketio.py):
from flask import Flask, render_template from flask_socketio import SocketIO from celery import Celery import eventlet eventlet.monkey_patch() app=Flask(__name__) here=os.path.abspath(os.path.dirname(__file__)) # 获取文件的绝对目录路径 app.config.from_pyfile(os.path.join(here, 'proj/celeryconfig.py')) SOCKETIO_REDIS_URL=app.config['CELERY_RESULT_BACKEND'] socketio=SocketIO( app, async_mode='eventlet', message_queue=SOCKETIO_REDIS_URL) # 使用 Redis 存储 SocketIO 的消息队列 celery=Celery(app.name) celery.conf.update(app.config)
接下来编写 Celery 任务:
@celery.task def background_task(): socketio.emit( # emit 可以理解为向浏览器发送数据 'my response',{'data':'Task starting ...'}, namespace='/task') time.sleep(10) socketio.emit( 'my response',{'data':'Task complete!'}, namespace='/task') @celery.task def async_task(): print 'Async!' time.sleep(5)
然后编写视图函数:
@app.route('/') def index(): return render_template('chapter9/section3/index.html') @app.route('/async') def async(): async_task.delay() # 访问/async 会异步地发布一个 async_task 任务,然后马上返回' Task complete!' return 'Task complete!' @app.route('/task') def start_background_task(): background_task.delay() return 'Started'
最后,我们设置启动应用的方式,和之前的 Flask 用法略有不同:
if__name__=='__main__': socketio.run(app, host='0.0.0.0', port=9000, debug=True)
创建一个简单的模板 index.html,它带有很少的样式,用 jQuery 操作 DOM。为了减少依赖, socket.io.js 引用了外部 CDN 的地址:
<script type="text/javascript"src="//cdn.socket.io/socket.io-1.4.5.js"></script>
先看一下模板的主体:
<body> <h3>Logging</h3> <p id="log"></p> <button id="background">Execute</button> </body>
再看一下 JavaScript 事件部分:
<script type="text/javascript"> $(document).ready(function(){ namespace='/task'; socket=io.connect('http://'+document.domain+':'+location.port+ namespace); socket.on('my response', function(msg){ console.log('Received:'+msg.data); $('#log').append('Received:'+msg.data+'<br>'); }); $('#background').on('click', function(){ $.get("{{url_for('start_background_task')}}"); }); }); </script>
上面这段代码表示当页面加载完成之后,我们会创建 Websocket 链接,并且给 id 为 background 的 DOM 添加一个只要点击就会访问视图函数名为 start_background_task(也就是访问/task)的事件。
现在运行 Flask 应用和 Celery:
> cd ~/web_develop/chapter9/section3 > python celery_socketio.py > celery -A celery_socketio.celery -P eventlet worker -l info
访问“http://localhost:9000/async”可以感受到这个请求是直接返回的,sleep 5 秒的操作并没有影响到页面响应时间。
访问首页“http://localhost:9000/”,单击 Execute 按钮,可以看到红色框里面的内容会更新,执行完毕后如图 9.4 所示。
图 9.4 红色框里面的内容会更新
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论