- 前言
- 第一部分 基础应用开发
- 第 1 章 Spring Boot 入门
- 第 2 章 在 Spring Boot 中使用数据库
- 第 3 章 Spring Boot 界面设计
- 第 4 章 提高数据库访问性能
- 第 5 章 Spring Boot 安全设计
- 第二部分 分布式应用开发
- 第 6 章 Spring Boot SSO
- 第 7 章 使用分布式文件系统
- 第 8 章 云应用开发
- 第 9 章 构建高性能的服务平台
- 第三部分 核心技术源代码分析
- 第 10 章 Spring Boot 自动配置实现原理
- 第 11 章 Spring Boot 数据访问实现原理
- 第 12 章 微服务核心技术实现原理
- 附录 A 安装 Neo4j
- 附录 B 安装 MongoDB
- 附录 C 安装 Redis
- 附录 D 安装 RabbitMQ
- 结束语
12.4 分布式消息实现原理演示
使用 RabbitMQ 实现分布式消息分发,在分布式系统中具有很大的用途,为各个应用之间传递消息和数据提供了很大的方便,并且其松散耦合的结构和异步处理的机制不会影响系统的性能。
使用 spring-cloud-stream 可以非常简单地使用 RabbitMQ 的异步消息,Spring Cloud 的配置管理中的分布式消息分发也是通过调用 spring-cloud-stream 组件来实现的。下面将创建一个消息生产者和一个消息消费者来演示消息分发的实现原理。
12.4.1 消息生产者
消息生产者的实现如代码清单 12-12 所示,这里主要创建了一个 POST 接口“/send”,以接收传入的 Map 对象作为参数,使用 MessageChannel 的 send 方法,将消息发布到 RabbitMQ 的消息队列上。使用 Map 对象的目的,是保证消息生产者和消息消费者之间,可以使用相同的对象来存取消息的内容,这就要求双方必须事先约定 Map 对象的字段。
代码清单 12-12 消息生产者主程序
@EnableBinding(Source.class) @RestController @SpringBootApplication public class SenderApplication { @Autowired @Output(Source.OUTPUT) private MessageChannel channel; @RequestMapping(method = RequestMethod.POST, path = "/send") public void write (@RequestBody Map<String, Object> msg){ channel.send(MessageBuilder.withPayload(msg).build()); } public static void main(String[] args) { SpringApplication.run(SenderApplication.class, args); } }
代码清单 12-13 是消息生产者这个工程的配置文件,其中 stream 配置的目的地为 cloud-stream,客户端订阅时必须使用相同的目的地,注意这里绑定的方式是 output,rabbitmq 是连接消息服务器的一些参数配置。
代码清单 12-13 消息生产者工程配置文件
server: port: 80 spring: cloud: stream: bindings: output: destination: cloud-stream rabbitmq: addresses: amqp:// 192.168.1.216:5672 username: alan password: alan
12.4.2 消息消费者
消息消费者的实现如代码清单 12-14 所示,这里从 SubscribableChannel(即 Sink.INPUT)订阅了通道的消息,它相当于一个监听器,当订阅的通道上有消息发布时,就将消息取回来,然后简单地在控制台上打印出来。这里使用的 Map 对象,跟消息生产者约定了两个使用字段,即 msg 和 name。
代码清单 12-14 消息消费者主程序
@EnableBinding(Sink.class) @IntegrationComponentScan @MessageEndpoint @SpringBootApplication public class ReceiverApplication { @ServiceActivator(inputChannel=Sink.INPUT) public void accept(Map<String, Object> msg){ System.out.println(msg.get("msg").toString() + ":" + msg.get("name")); } public static void main(String[] args) { SpringApplication.run(ReceiverApplication.class, args); } }
代码清单 12-15 是消息消费者工程的配置文件,它与消息生产者有相同的目的地,不同的是它的绑定方式是 input,其中 rabbitmq 的配置是相同的,只要连上 RabbitMQ 消息服务器即可。
代码清单 12-15 消息消费者工程配置文件
server: port: 81 spring: cloud: stream: bindings: input: destination: cloud-stream group: group1 consumer: durableSubscription: true rabbitmq: addresses: amqp:// 192.168.1.216:5672 username: alan password: alan
这样,当两个工程都启动之后,输入下列 POST 指令,就可以将消息从消息生产者中发布出去。
curl -l -H "Content-type:application/json" -X POST -d'{"msg":"Hello","name":"RabbitMQ"}' http://localhost/send
其中消息结构中的 msg 和 name 就是约定好的字段。这时,在消息消费者的控制台上可以看到接收到了如下信息:
Hello:RabbitMQ
从上面的演示可以看出,使用 spring-cloud-stream 来实现分布式消息的分发和接收都是非常简单的。上面实例工程的完整代码可以从 GitHub 中检出: https://github.com/chenfromsz/spring-cloud-stream-demo.git .
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论