深入理解 RabbitMQ
RabbitMQ 是一个实现了 AMQP 协议标准的开源消息代理和队列服务器。和 Beanstalkd 不同的是,它是企业级消息系统,自带了集群、管理、插件系统等特性,在高可用性、可扩展性、易用性等方面做得很好,现在被互联网公司广泛应用。
我们先安装它:
> sudo apt-get install rabbitmq-server -yq
再安装 RabbitMQ 的 Python 客户端,最常用的客户端是 pika:
> pip install pika
AMQP
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个异步消息传递所使用的应用层协议规范。它的设计初衷是为了摆脱商业 MQ 高额费用和不同 MQ 供应商的接口不统一的问题,所以一开始就设计成开放标准,以解决企业复杂的消息队列需求问题。我们先了解几个概念。
1.消息(Message)。消息实际包含两部分内容:
- 有效载荷(Payload),也就是要传输的数据,数据类型可以纯文本也可以是 JSON。
- 标签(Label),它包含交换机的名字和可选的主题(topic)标记等,AMQP 仅仅描述了标签,而 RabbitMQ 决定了把这个消息发给哪个消费者。
2.发布者(Producer):也就是生产者,它创建消息并且设置标签。
3.消费者(Consumer):消费者连接到代理服务器上,接收消息的有效载荷(注意,消费者并不需要消息中的标签)。
AMQP 的工作流程如图 9.1 所示。

图 9.1 AMQP 的工作流程图
为了保证消息被正确取出并执行、消息投递失败后会重发,AMQP 模块包含了一个消息确认的概念:当一个消息从队列中投递给消费者后,消费者会通知消息代理(也就是常说的 Broker),这个通知可以是自动完成的,也可以由处理消息的应用来执行。当消息确认(Ack)被启用的时候,消息代理不会完全将消息从队列中删除,除非收到来自消费者的确认回执。
交换机拿到一个消息之后会将它路由给队列。它使用哪种路由算法是由交换机类型和被称作“绑定”(queue_bind)的规则所决定的。目前 RabbitMQ 提供了如下四种交换机。
1.直连交换机(direct exchange):根据消息携带的路由键(routing key)将消息投递给对应队列。将一个队列绑定到某个交换机的同时赋予该绑定一个路由键,当一个携带着路由键为 XXX 的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为 XXX 的队列。直连交换机用来处理消息的单播路由。
2.主题交换机(topic exchange):通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机通常用来实现消息的多播路由。发送到主题交换机的消息的路由键,必须是一个由“.”分隔的词语列表,这些词语应该和对应的业务相关联,词语的个数可以随意,但是不要超过 255 字节。绑定键支持通配符:“*”(星号)用来表示一个单词;“#”(井号)用来表示任意数量(零个或多个)单词。
3.扇型交换机(fanout exchange):将消息路由给绑定到它身上的所有队列,且不理会绑定的路由键。扇型交换机用来处理消息的广播路由。扇型交换机非常有用,因为它允许你对单条消息做不同的处理,Web 开发中一个操作可能要做多个连带工作,比如用户创建一篇新的日记,需要更新用户的创建日记数、清除相关缓存、给关注这个用户的其他用户推消息、日记进审核后台、日记进最新日记池等等,可以使用扇型交换机把一个消息分发给多个任务队列,执行不一样的工作。尤其是当业务改变时,使用扇型交换机直接为新的消费者添加声明并绑定进来就可以了,否则需要修改发送方的代码来添加接收方。所以,使用扇型交换机可以有效地解耦发布者和消费者。
4.头交换机(headers exchange):允许匹配 AMQP 的头而非路由键,其实使用起来和直接交换机差不多,但是性能却差很多,一般用不到这种类型。
下面通过一个完备的直接交换的例子,演示一下 RabbitMQ 的工作流程。首先看一下发布者的代码(amqp_producer.py):
import sys
import pika
# %2F 是被转义的“/”,这里使用了默认的虚拟主机和默认的用户及密码
parameters=pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection=pika.BlockingConnection(parameters) # connection 就是所谓的消息代理
channel=connection.channel() # 获得信道
# 声明交换机,指定交换类型为直接交换。最后两个参数表示想要持久化的交换机,其中
durable 为 True 表示 RabbitMQ 在崩溃重启之后会重建队列和交换机
channel.exchange_declare(exchange='web_develop', exchange_type='direct',
passive=False, durable=True, auto_delete=False)
if len(sys.argv) !=1:
msg=sys.argv[1] # 使用命令行参数作为消息体
else:
msg='hah'
# 创建一个消息,delivery_mode 为 2 表示让这个消息持久化,重启 RabbitMQ 也不会丢失。使
用持久化需要考虑为此付出的性能成本,如果开启此功能,强烈建议把消息存储在 SSD
上
props=pika.BasicProperties(content_type='text/plain', delivery_mode=2)
# basic_publish 表示发送路由键为 xxx_routing_key,消息体为 haha 的消息给 web_develop 这个
交换机
channel.basic_publish('web_develop', 'xxx_routing_key', msg,
properties=props)
connection.close() # 关闭连接
上述例子使用了文本模式(text/plain),复杂的内容类型还可以选择 JSON 格式:applica-tion/json。信道是建立在 TCP 连接内的虚拟连接。不直接通过 TCP 连接发送 AMQP 命令,是因为对操作系统而言,创建和销毁 TCP 连接是非常昂贵的开销,而且 TCP 连接数量是有限,很容易出现性能瓶颈,一个 TCP 连接上的信道数量没有限制,性能也非常好。
再看一下消费者的代码(amqp_consumer.py):
import pika
# 处理接收到的消息的回调函数
# method_frame 携带了投递标记,header_frame 表示 AMQP 信息头的对象
# body 为消息实体
def on_message(channel, method_frame, header_frame, body):
# 消息确认,确认之后才会删除消息并给消费者发送新的消息
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
print body
parameters=pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection=pika.BlockingConnection(parameters)
channel=connection.channel()
channel.exchange_declare(exchange='web_develop', exchange_type='direct',
passive=False, durable=True, auto_delete=False)
# 声明队列,如果没有就创建
channel.queue_declare(queue='standard', auto_delete=True)
# 通过路由键将队列和交换机绑定
channel.queue_bind(queue='standard', exchange='web_develop',
routing_key='xxx_routing_key')
channel.basic_consume(on_message, 'standard') # 订阅队列
try:
channel.start_consuming() # 开始消费
except KeyboardInterrupt:
channel.stop_consuming() # 停止消费
connection.close()
现在先启动消费者:
> python chapter9/section2/amqp_consumer.py
然后使用另外一个终端,启动发布者程序发送消息:
> python chapter9/section2/amqp_producer.py web
执行完之后就可以在消费者的终端上看到“web”了。
现在对发布者的例子加以改动,让它支持消息确认。支持的原理是确保 basic_publish 的返回值为 True(mqp_producer_with_confirm.py):
...
channel=connection.channel()
# 接收确认消息
channel.confirm_delivery()
if channel.basic_publish('web_develop', 'xxx_routing_key', msg,
properties=props):
print 'Message publish was confirmed!'
else:
print 'Message could not be confirmed!'
connection.close()
虚拟主机
RabbitMQ 服务器可以创建虚拟主机,它能拥有自己的队列、绑定和交换机,就像一个有自己的权限机制的迷你版 RabbitMQ。不同的虚拟主机之间完全隔离,还可以有效避免命名冲突的问题。上面的例子都基于默认的虚拟主机“/”,虚拟主机需要在连接的时候指定。肯定不能在生产环境使用 guest 这个默认的用户/密码。接下来我们来创建自己的用户和虚拟机,并给它赋予一定的权限。
管理 RabbitMQ 一般都是通过 rabbitmqctl 这个命令来完成的:
> sudo rabbitmqctl add_user dongwm 123456 > sudo rabbitmqctl add_vhost web_develop > sudo rabbitmqctl set_permissions-p web_develop dongwm".*"".*"".*"
其中 set_permissions 后面的三个“.*”,分别是配置(队列和交换的创建和删除)、写(发布消息)、读(消费消息)的权限。
看一下目前存在的虚拟主机、队列和用户:
> sudo rabbitmqctl list_vhosts Listing vhosts ... / web_develop > sudo rabbitmqctl list_queues -p web_develop # 还没有声明,所以没有队列 Listing queues ... > sudo rabbitmqctl list_users Listing users ... dongwm [] guest [administrator]
默认的用户 guest 应该在上线前删除,取消管理员权限或者改变密码。
插件系统
RabbitMQ 提供了强大的插件系统,当你需要某些功能而 RabbitMQ 没有时,可以在网络上查找或者自己编写一个插件安装进来使用。使用插件可以实现如下功能:
- 管理和监控 RabbitMQ。
- 支持 AMQP 之外的协议。
- 消息复制。
- 新的路由算法和交换类型。
官方的插件列表页(https://www.rabbitmq.com/plugins.html )列出了多个插件,其中 RabbitMQ 团队维护的插件都放在支持的插件(Supported Plugins)列表中,这些插件会和 RabbitMQ 最新版保持一致;而实验性质的插件(Experimental Plugins)列表建议不要在生产环境中使用。
安装插件的方法非常简单:
> sudo rabbitmq-plugins enable rabbitmq_shovel
rabbitmq_shovel 插件就安装好了,Shovel 用于有多个数据中心的环境,能够定义 RabbitMQ 上的队列和另一个 RabbitMQ 上的交换机的复制关系,这样消息就可以被异步复制到其他的数据中心并被消费了。
对应的移除插件的命令如下:
> sudo rabbitmq-plugins disable rabbitmq_shovel
通过 Web 和 REST API 管理 RabbitMQ
有的人喜欢用命令行的方式管理,有的人喜欢在图形界面上管理,RabbitMQ 提供 Web 管理程序,但是这个功能是通过 rabbitmq_management 实现的,需要安装它:
sudo rabbitmq-plugins enable rabbitmq_management
Web 服务默认的端口是 15672,虚拟机没有添加这个端口的端口转发,所以需要修改配置,把端口换成 5000:
> cat/etc/rabbitmq/rabbitmq.config
[
{rabbitmq_management, [{listener, [{port, 5000}]}]}
].
这个配置采用的是 Erlang 的数据结构,看起来是一个包含了嵌套字典的数组,但是要注意结尾的点。重启 rabbitmq-server:
> sudo /etc/init.d/rabbitmq-server restart
登录 Web 界面之前还需要给用户添加权限,管理插件时有 5 种权限,如表 9.1 所示。
表 9.1 管理插件时的 5 种权限
| 权限 | 含义 |
| None | 也就是什么都不做,新创建的用户默认没有登录管理页面的权限 |
| management | 查看用户有权限访问的虚拟主机的队列、交换机、绑定、通道和连接等 |
| policymaker | 除了 management 的权限外,还能查看、创建和删除策略和参数 |
| monitoring | 除了 management 的权限外,还能查看其他用户的通道和连接、列出全部虚拟主机等 |
| administrator | 最高权限 |
使用 administrator 权限:
> sudo rabbitmqctl set_user_tags dongwm administrator
现在就可以使用刚才创建的 dongwm 这个用户登录管理页面 http://localhost:5000 了,登录之后的页面如图 9.2 所示。

图 9.2 登录之后的页面
这个 Web 界面支持如下操作:
- 服务器数据和统计预览。如最近一段时间的队列情况、当前连接数、当前队列数、内存占用、RabbitMQ 版本、主机名等。
- 导出/导入服务器配置。
- 查看服务器连接。
- 查看信道列表。
- 查看交换机列表,添加新的交换机。
- 查看队列,添加新的队列,修改队列绑定。
- 查看用户列表,添加用户。
- 查看虚拟主机,添加新的虚拟主机。
- 列出策略,添加/更新策略。
rabbitmq_management 插件还提供了一个 HTTP 的 REST API 服务,可以使用 curl 或者 httpie 这样的工具执行下面的操作:
> http -a dongwm:123456 http://localhost:5000/api/nodes/rabbit@WEB
HTTP/1.1 200 OK
...
Server:MochiWeb/1.1 WebMachine/1.10.0 (never breaks eye contact)
{
...
"log_file":"/var/log/rabbitmq/rabbit@WEB.log",
"mem_alarm":false,
"mem_limit":628290355,
"mem_used":54851512,
"mem_used_details":{
"rate":-1353.6
},
...
}
其他用法可以参考文档 http://localhost:5000/api。
还有一个非常有用的工具:rabbitmqadmin,它是一个使用 Python 编写的脚本,支持上述全部操作,我们可以方便地把它集成到运维监控和应用分析中。它需要通过 Web 页面下载:
> wget http://localhost:5000/cli/rabbitmqadmin > chmod+x rabbitmqadmin
看一下效果:

故障转移
一般使用 HAProxy 做 RabbitMQ 集群的负载均衡工具,当集群节点出现故障时,由应用程序决定故障转移的方式,最简单的方案就是让消费者重试:
while 1:
try:
connection=pika.BlockingConnection(parameters)
except:
pass
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论