返回介绍

深入理解 RabbitMQ

发布于 2025-04-20 18:52:16 字数 8937 浏览 0 评论 0 收藏

RabbitMQ 是一个实现了 AMQP 协议标准的开源消息代理和队列服务器。和 Beanstalkd 不同的是,它是企业级消息系统,自带了集群、管理、插件系统等特性,在高可用性、可扩展性、易用性等方面做得很好,现在被互联网公司广泛应用。

我们先安装它:

> sudo apt-get install rabbitmq-server -yq

再安装 RabbitMQ 的 Python 客户端,最常用的客户端是 pika:

> pip install pika

AMQP

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个异步消息传递所使用的应用层协议规范。它的设计初衷是为了摆脱商业 MQ 高额费用和不同 MQ 供应商的接口不统一的问题,所以一开始就设计成开放标准,以解决企业复杂的消息队列需求问题。我们先了解几个概念。

1.消息(Message)。消息实际包含两部分内容:

  • 有效载荷(Payload),也就是要传输的数据,数据类型可以纯文本也可以是 JSON。
  • 标签(Label),它包含交换机的名字和可选的主题(topic)标记等,AMQP 仅仅描述了标签,而 RabbitMQ 决定了把这个消息发给哪个消费者。

2.发布者(Producer):也就是生产者,它创建消息并且设置标签。

3.消费者(Consumer):消费者连接到代理服务器上,接收消息的有效载荷(注意,消费者并不需要消息中的标签)。

AMQP 的工作流程如图 9.1 所示。

图 9.1 AMQP 的工作流程图

为了保证消息被正确取出并执行、消息投递失败后会重发,AMQP 模块包含了一个消息确认的概念:当一个消息从队列中投递给消费者后,消费者会通知消息代理(也就是常说的 Broker),这个通知可以是自动完成的,也可以由处理消息的应用来执行。当消息确认(Ack)被启用的时候,消息代理不会完全将消息从队列中删除,除非收到来自消费者的确认回执。

交换机拿到一个消息之后会将它路由给队列。它使用哪种路由算法是由交换机类型和被称作“绑定”(queue_bind)的规则所决定的。目前 RabbitMQ 提供了如下四种交换机。

1.直连交换机(direct exchange):根据消息携带的路由键(routing key)将消息投递给对应队列。将一个队列绑定到某个交换机的同时赋予该绑定一个路由键,当一个携带着路由键为 XXX 的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为 XXX 的队列。直连交换机用来处理消息的单播路由。

2.主题交换机(topic exchange):通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机通常用来实现消息的多播路由。发送到主题交换机的消息的路由键,必须是一个由“.”分隔的词语列表,这些词语应该和对应的业务相关联,词语的个数可以随意,但是不要超过 255 字节。绑定键支持通配符:“*”(星号)用来表示一个单词;“#”(井号)用来表示任意数量(零个或多个)单词。

3.扇型交换机(fanout exchange):将消息路由给绑定到它身上的所有队列,且不理会绑定的路由键。扇型交换机用来处理消息的广播路由。扇型交换机非常有用,因为它允许你对单条消息做不同的处理,Web 开发中一个操作可能要做多个连带工作,比如用户创建一篇新的日记,需要更新用户的创建日记数、清除相关缓存、给关注这个用户的其他用户推消息、日记进审核后台、日记进最新日记池等等,可以使用扇型交换机把一个消息分发给多个任务队列,执行不一样的工作。尤其是当业务改变时,使用扇型交换机直接为新的消费者添加声明并绑定进来就可以了,否则需要修改发送方的代码来添加接收方。所以,使用扇型交换机可以有效地解耦发布者和消费者。

4.头交换机(headers exchange):允许匹配 AMQP 的头而非路由键,其实使用起来和直接交换机差不多,但是性能却差很多,一般用不到这种类型。

下面通过一个完备的直接交换的例子,演示一下 RabbitMQ 的工作流程。首先看一下发布者的代码(amqp_producer.py):

import sys
    
import pika
    
# %2F 是被转义的“/”,这里使用了默认的虚拟主机和默认的用户及密码
parameters=pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection=pika.BlockingConnection(parameters) # connection 就是所谓的消息代理
channel=connection.channel() # 获得信道
# 声明交换机,指定交换类型为直接交换。最后两个参数表示想要持久化的交换机,其中
    durable 为 True 表示 RabbitMQ 在崩溃重启之后会重建队列和交换机
channel.exchange_declare(exchange='web_develop', exchange_type='direct',
                        passive=False, durable=True, auto_delete=False)
if len(sys.argv) !=1:
   msg=sys.argv[1] # 使用命令行参数作为消息体
else:
   msg='hah'
    
# 创建一个消息,delivery_mode 为 2 表示让这个消息持久化,重启 RabbitMQ 也不会丢失。使
    用持久化需要考虑为此付出的性能成本,如果开启此功能,强烈建议把消息存储在 SSD
    上
props=pika.BasicProperties(content_type='text/plain', delivery_mode=2)
# basic_publish 表示发送路由键为 xxx_routing_key,消息体为 haha 的消息给 web_develop 这个
    交换机
channel.basic_publish('web_develop', 'xxx_routing_key', msg,
                     properties=props)
connection.close() # 关闭连接

上述例子使用了文本模式(text/plain),复杂的内容类型还可以选择 JSON 格式:applica-tion/json。信道是建立在 TCP 连接内的虚拟连接。不直接通过 TCP 连接发送 AMQP 命令,是因为对操作系统而言,创建和销毁 TCP 连接是非常昂贵的开销,而且 TCP 连接数量是有限,很容易出现性能瓶颈,一个 TCP 连接上的信道数量没有限制,性能也非常好。

再看一下消费者的代码(amqp_consumer.py):

import pika
    
# 处理接收到的消息的回调函数
# method_frame 携带了投递标记,header_frame 表示 AMQP 信息头的对象
# body 为消息实体
def on_message(channel, method_frame, header_frame, body):
    # 消息确认,确认之后才会删除消息并给消费者发送新的消息
    channel.basic_ack(delivery_tag=method_frame.delivery_tag)
    print body
parameters=pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection=pika.BlockingConnection(parameters)
channel=connection.channel()
    
channel.exchange_declare(exchange='web_develop', exchange_type='direct',
                        passive=False, durable=True, auto_delete=False)
# 声明队列,如果没有就创建
channel.queue_declare(queue='standard', auto_delete=True)
# 通过路由键将队列和交换机绑定
channel.queue_bind(queue='standard', exchange='web_develop',
                  routing_key='xxx_routing_key')
    
channel.basic_consume(on_message, 'standard') # 订阅队列
    
try:
    channel.start_consuming() # 开始消费
except KeyboardInterrupt:
    channel.stop_consuming() # 停止消费
    
connection.close()

现在先启动消费者:

> python chapter9/section2/amqp_consumer.py

然后使用另外一个终端,启动发布者程序发送消息:

> python chapter9/section2/amqp_producer.py web

执行完之后就可以在消费者的终端上看到“web”了。

现在对发布者的例子加以改动,让它支持消息确认。支持的原理是确保 basic_publish 的返回值为 True(mqp_producer_with_confirm.py):

...
channel=connection.channel()
# 接收确认消息
channel.confirm_delivery()
if channel.basic_publish('web_develop', 'xxx_routing_key', msg,
                        properties=props):
   print 'Message publish was confirmed!'
else:
   print 'Message could not be confirmed!'
connection.close()

虚拟主机

RabbitMQ 服务器可以创建虚拟主机,它能拥有自己的队列、绑定和交换机,就像一个有自己的权限机制的迷你版 RabbitMQ。不同的虚拟主机之间完全隔离,还可以有效避免命名冲突的问题。上面的例子都基于默认的虚拟主机“/”,虚拟主机需要在连接的时候指定。肯定不能在生产环境使用 guest 这个默认的用户/密码。接下来我们来创建自己的用户和虚拟机,并给它赋予一定的权限。

管理 RabbitMQ 一般都是通过 rabbitmqctl 这个命令来完成的:

> sudo rabbitmqctl add_user dongwm 123456
> sudo rabbitmqctl add_vhost web_develop
> sudo rabbitmqctl set_permissions-p web_develop dongwm".*"".*"".*"

其中 set_permissions 后面的三个“.*”,分别是配置(队列和交换的创建和删除)、写(发布消息)、读(消费消息)的权限。

看一下目前存在的虚拟主机、队列和用户:

> sudo rabbitmqctl list_vhosts
Listing vhosts ...
/
web_develop
> sudo rabbitmqctl list_queues -p web_develop # 还没有声明,所以没有队列
Listing queues ...
> sudo rabbitmqctl list_users
Listing users ...
dongwm []
guest [administrator]

默认的用户 guest 应该在上线前删除,取消管理员权限或者改变密码。

插件系统

RabbitMQ 提供了强大的插件系统,当你需要某些功能而 RabbitMQ 没有时,可以在网络上查找或者自己编写一个插件安装进来使用。使用插件可以实现如下功能:

  • 管理和监控 RabbitMQ。
  • 支持 AMQP 之外的协议。
  • 消息复制。
  • 新的路由算法和交换类型。

官方的插件列表页(https://www.rabbitmq.com/plugins.html )列出了多个插件,其中 RabbitMQ 团队维护的插件都放在支持的插件(Supported Plugins)列表中,这些插件会和 RabbitMQ 最新版保持一致;而实验性质的插件(Experimental Plugins)列表建议不要在生产环境中使用。

安装插件的方法非常简单:

> sudo rabbitmq-plugins enable rabbitmq_shovel

rabbitmq_shovel 插件就安装好了,Shovel 用于有多个数据中心的环境,能够定义 RabbitMQ 上的队列和另一个 RabbitMQ 上的交换机的复制关系,这样消息就可以被异步复制到其他的数据中心并被消费了。

对应的移除插件的命令如下:

> sudo rabbitmq-plugins disable rabbitmq_shovel

通过 Web 和 REST API 管理 RabbitMQ

有的人喜欢用命令行的方式管理,有的人喜欢在图形界面上管理,RabbitMQ 提供 Web 管理程序,但是这个功能是通过 rabbitmq_management 实现的,需要安装它:

sudo rabbitmq-plugins enable rabbitmq_management

Web 服务默认的端口是 15672,虚拟机没有添加这个端口的端口转发,所以需要修改配置,把端口换成 5000:

> cat/etc/rabbitmq/rabbitmq.config
[
{rabbitmq_management, [{listener, [{port, 5000}]}]}
].

这个配置采用的是 Erlang 的数据结构,看起来是一个包含了嵌套字典的数组,但是要注意结尾的点。重启 rabbitmq-server:

> sudo /etc/init.d/rabbitmq-server restart

登录 Web 界面之前还需要给用户添加权限,管理插件时有 5 种权限,如表 9.1 所示。

表 9.1 管理插件时的 5 种权限

权限含义
None也就是什么都不做,新创建的用户默认没有登录管理页面的权限
management查看用户有权限访问的虚拟主机的队列、交换机、绑定、通道和连接等
policymaker除了 management 的权限外,还能查看、创建和删除策略和参数
monitoring除了 management 的权限外,还能查看其他用户的通道和连接、列出全部虚拟主机等
administrator最高权限

使用 administrator 权限:

> sudo rabbitmqctl set_user_tags dongwm administrator

现在就可以使用刚才创建的 dongwm 这个用户登录管理页面 http://localhost:5000 了,登录之后的页面如图 9.2 所示。

图 9.2 登录之后的页面

这个 Web 界面支持如下操作:

  • 服务器数据和统计预览。如最近一段时间的队列情况、当前连接数、当前队列数、内存占用、RabbitMQ 版本、主机名等。
  • 导出/导入服务器配置。
  • 查看服务器连接。
  • 查看信道列表。
  • 查看交换机列表,添加新的交换机。
  • 查看队列,添加新的队列,修改队列绑定。
  • 查看用户列表,添加用户。
  • 查看虚拟主机,添加新的虚拟主机。
  • 列出策略,添加/更新策略。

rabbitmq_management 插件还提供了一个 HTTP 的 REST API 服务,可以使用 curl 或者 httpie 这样的工具执行下面的操作:

> http -a dongwm:123456 http://localhost:5000/api/nodes/rabbit@WEB
HTTP/1.1 200 OK
...
Server:MochiWeb/1.1 WebMachine/1.10.0 (never breaks eye contact)
    
{
   ...
   "log_file":"/var/log/rabbitmq/rabbit@WEB.log",
   "mem_alarm":false,
   "mem_limit":628290355,
   "mem_used":54851512,
   "mem_used_details":{
       "rate":-1353.6
   },
   ...
}

其他用法可以参考文档 http://localhost:5000/api。

还有一个非常有用的工具:rabbitmqadmin,它是一个使用 Python 编写的脚本,支持上述全部操作,我们可以方便地把它集成到运维监控和应用分析中。它需要通过 Web 页面下载:

> wget http://localhost:5000/cli/rabbitmqadmin
> chmod+x rabbitmqadmin

看一下效果:

故障转移

一般使用 HAProxy 做 RabbitMQ 集群的负载均衡工具,当集群节点出现故障时,由应用程序决定故障转移的方式,最简单的方案就是让消费者重试:

while 1:
    try:
        connection=pika.BlockingConnection(parameters)
    except:
        pass

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。