文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
使用 Beanstalkd
Beanstalkd 是消息队列的后起之秀,它是一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想通过后台异步执行耗时的任务来降低 Web 应用的页面访问延迟。它支持过有 950 万用户的 Facebook Causes 应用,豆瓣也使用它作为消息队列。
Beanstalkd 有如下特点:
- 可持久化。Beanstalkd 运行使用内存,但也提供了持久性支持。在启动的时候使用-b 参数指定持久化目录,它会将所有的任务写入 binlog 文件。在发生断电等情况后,用同样的参数指定重启它,将恢复 binlog 中的内容。
- 支持任务优先级。值越小优先级越高。
- 任务超时重发。消费者必须在预设的 TTR (Time To Run)时间内发送 delete/re-lease/bury 改变任务状态,否则它会认为消息处理失败,把任务交给别的消费者节点执行。
- 支持任务预留。如果任务因为某些原因无法执行,消费者可以把任务置为 buried 状态保留这些任务。
- 支持分布式。客户端可以实现和 Memcached 一样的分布式。
- 灵活设置任务过期和 TTR 时间。
job 就是待异步执行的任务,也就是消息,是 Beanstalkd 中的基本单元。一个 job 通过生产者使用 put 命令时创建,然后被放在一个管道(tube)中。在整个生命周期中 job 可能有 4 个工作状态。
- ready:等待被取出并处理。
- reserved:如果 job 被消费者(worker)取出,将被此消费者预订,消费者将执行此 job。
- delayed:等待特定时间之后,状态再改为 ready 状态。
- buried:等待唤醒,通常在 job 处理失败时,会变成这个状态。
一个服务器有一个或者多个管道,管道用来存储统一类型的 job。每个管道由一个就绪队列与延迟队列组成。每个 job 所有的状态迁移在一个管道中完成。消费者监控/取消监控感兴趣的管道。当一个客户端连接上服务器时,客户端监控的管道默认为 default,如果客户端提交任务时没有使用 use 命令,那么这些 job 就存于名为 default 的管道中。管道都是按需求创建的,无论它们在什么时候被引用到。如果一个管道变为空也没有任何客户端引用,它将会被自动删除。
我们先安装它:
> git clone https://github.com/kr/beanstalkd > cd beanstalkd > sudo make install
adm 目录下包含一些管理服务的脚本。修改 beanstalkd.service:
[Unit] Description=Beanstalkd is a simple, fast work queue [Service] User=ubuntu ExecStart=/usr/local/bin/beanstalkd -b /opt/beanstalkd -p 11200
启动服务:
> sudo cp adm/systemd/beanstalkd.service /lib/systemd/system/ > sudo systemctl daemon-reload > sudo systemctl start beanstalkd
使用 Beanstalkc
Beanstalkd 借鉴了 Memcached 设计,它们的协议和使用方式的风格很像。本节使用 Beanstalkd 的 Python 客户端 Beanstalkc(http://bit.ly/28XuQYd )演示 Beanstalkd 的用法。
我们先安装它:
> pip install PyYAML beanstalkc # 使用 PyYAML 库可以让输出更直观
在交互模式下使用 Beanstalkc:
In : import beanstalkc In : beanstalk=beanstalkc.Connection(host='localhost', port=11200) In : beanstalk.tubes() # 列出全部管道 ['default'] In : beanstalk.use('web_app') # 切换到管道 web_app Out : 'web_app' In : beanstalk.watch('web_app') # 监控管道 web_app Out : 2 In : id=beanstalk.put('job_1', priority=21) # 放入一个任务,可以指定优先级 In : job=beanstalk.reserve() # 接收任务 In : job.body Out : 'job_1' In : job.stats() # 查看任务状态 Out: {'age':33, 'buries':0, 'delay':0, ... 'tube':'web_app'} In : beanstalk.stats_tube('web_app') # 查看管道状态 Out: {'cmd-delete':0, 'cmd-pause-tube':0, 'current-jobs-buried':0, 'current-jobs-delayed':0, 'current-jobs-ready':0, 'current-jobs-reserved':1, ... 'total-jobs':1} In : beanstalk.stats() # 查看链接状态 Out : {'binlog-current-index':3, ... 'cmd-reserve':3, 'cmd-reserve-with-timeout':0, 'cmd-stats':3, 'cmd-stats-job':7, 'cmd-stats-tube':3, ... 'version':'1.10+21+gb7b4a6a'} # 如果接收任务后没有修改任务状态,任务完成后应该删除任务,否则在 TTR 时间后会导致 任务超时重发 In : job.delete() In : beanstalk.close()
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论