返回介绍

使用 Beanstalkd

发布于 2025-04-20 18:52:16 字数 2839 浏览 0 评论 0 收藏

Beanstalkd 是消息队列的后起之秀,它是一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想通过后台异步执行耗时的任务来降低 Web 应用的页面访问延迟。它支持过有 950 万用户的 Facebook Causes 应用,豆瓣也使用它作为消息队列。

Beanstalkd 有如下特点:

  • 可持久化。Beanstalkd 运行使用内存,但也提供了持久性支持。在启动的时候使用-b 参数指定持久化目录,它会将所有的任务写入 binlog 文件。在发生断电等情况后,用同样的参数指定重启它,将恢复 binlog 中的内容。
  • 支持任务优先级。值越小优先级越高。
  • 任务超时重发。消费者必须在预设的 TTR (Time To Run)时间内发送 delete/re-lease/bury 改变任务状态,否则它会认为消息处理失败,把任务交给别的消费者节点执行。
  • 支持任务预留。如果任务因为某些原因无法执行,消费者可以把任务置为 buried 状态保留这些任务。
  • 支持分布式。客户端可以实现和 Memcached 一样的分布式。
  • 灵活设置任务过期和 TTR 时间。

job 就是待异步执行的任务,也就是消息,是 Beanstalkd 中的基本单元。一个 job 通过生产者使用 put 命令时创建,然后被放在一个管道(tube)中。在整个生命周期中 job 可能有 4 个工作状态。

  • ready:等待被取出并处理。
  • reserved:如果 job 被消费者(worker)取出,将被此消费者预订,消费者将执行此 job。
  • delayed:等待特定时间之后,状态再改为 ready 状态。
  • buried:等待唤醒,通常在 job 处理失败时,会变成这个状态。

一个服务器有一个或者多个管道,管道用来存储统一类型的 job。每个管道由一个就绪队列与延迟队列组成。每个 job 所有的状态迁移在一个管道中完成。消费者监控/取消监控感兴趣的管道。当一个客户端连接上服务器时,客户端监控的管道默认为 default,如果客户端提交任务时没有使用 use 命令,那么这些 job 就存于名为 default 的管道中。管道都是按需求创建的,无论它们在什么时候被引用到。如果一个管道变为空也没有任何客户端引用,它将会被自动删除。

我们先安装它:

> git clone https://github.com/kr/beanstalkd
> cd beanstalkd
> sudo make install

adm 目录下包含一些管理服务的脚本。修改 beanstalkd.service:

[Unit]
Description=Beanstalkd is a simple, fast work queue
    
[Service]
User=ubuntu
ExecStart=/usr/local/bin/beanstalkd -b /opt/beanstalkd -p 11200

启动服务:

> sudo cp adm/systemd/beanstalkd.service /lib/systemd/system/
> sudo systemctl daemon-reload
> sudo systemctl start beanstalkd

使用 Beanstalkc

Beanstalkd 借鉴了 Memcached 设计,它们的协议和使用方式的风格很像。本节使用 Beanstalkd 的 Python 客户端 Beanstalkc(http://bit.ly/28XuQYd )演示 Beanstalkd 的用法。

我们先安装它:

> pip install PyYAML beanstalkc # 使用 PyYAML 库可以让输出更直观

在交互模式下使用 Beanstalkc:

In : import beanstalkc
In : beanstalk=beanstalkc.Connection(host='localhost', port=11200)
In : beanstalk.tubes() # 列出全部管道
['default']
In : beanstalk.use('web_app') # 切换到管道 web_app
Out : 'web_app'
In : beanstalk.watch('web_app') # 监控管道 web_app
Out : 2
In : id=beanstalk.put('job_1', priority=21) # 放入一个任务,可以指定优先级
In : job=beanstalk.reserve() # 接收任务
In : job.body
Out : 'job_1'
In : job.stats() # 查看任务状态
Out:
{'age':33,
 'buries':0,
 'delay':0,
 ...
 'tube':'web_app'}
In : beanstalk.stats_tube('web_app') # 查看管道状态
Out:
{'cmd-delete':0,
 'cmd-pause-tube':0,
 'current-jobs-buried':0,
 'current-jobs-delayed':0,
 'current-jobs-ready':0,
 'current-jobs-reserved':1,
 ...
 'total-jobs':1}
In : beanstalk.stats() # 查看链接状态
Out : 
{'binlog-current-index':3,
 ...
 'cmd-reserve':3,
 'cmd-reserve-with-timeout':0,
 'cmd-stats':3,
 'cmd-stats-job':7,
 'cmd-stats-tube':3,
 ...
 'version':'1.10+21+gb7b4a6a'}
# 如果接收任务后没有修改任务状态,任务完成后应该删除任务,否则在 TTR 时间后会导致
    任务超时重发
In : job.delete()
In : beanstalk.close()

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。