- 前言
- 第一部分 基础应用开发
- 第 1 章 Spring Boot 入门
- 第 2 章 在 Spring Boot 中使用数据库
- 第 3 章 Spring Boot 界面设计
- 第 4 章 提高数据库访问性能
- 第 5 章 Spring Boot 安全设计
- 第二部分 分布式应用开发
- 第 6 章 Spring Boot SSO
- 第 7 章 使用分布式文件系统
- 第 8 章 云应用开发
- 第 9 章 构建高性能的服务平台
- 第三部分 核心技术源代码分析
- 第 10 章 Spring Boot 自动配置实现原理
- 第 11 章 Spring Boot 数据访问实现原理
- 第 12 章 微服务核心技术实现原理
- 附录 A 安装 Neo4j
- 附录 B 安装 MongoDB
- 附录 C 安装 Redis
- 附录 D 安装 RabbitMQ
- 结束语
12.1 配置管理实现原理
在第 8 章的实例中,我们知道,配置管理的在线更新功能使用事件总线,即 spring-cloud-bus 来发布状态变化,并且使用分布式消息来发布更新事件,而分布式消息最终使用了 RabbitMQ 来实现消息收发。
12.1.1 在线更新流程
使用配置管理,实现在线更新一般遵循下列步骤:
1)更新 Git 仓库的配置文件。
2)以 POST 指令触发更新请求。
3)配置管理服务器从 Git 仓库中读取配置文件,并将配置文件分发给各个客户端,同时在 RabbitMQ 中发布一个更新消息。
4)客户端订阅 RabbitMQ 消息,收到消息后执行更新。
在使用配置管理的演示实例中,使用如下 POST 指令来触发在线更新:
curl – X POST http://localhost:8888/bus/refresh
接收这个更新指令的实现方法如代码清单 12-1 所示,其中的 publish 将会发布一个更新事件,调用 RabbitMQ 进行消息发布,然后由客户端收到消息后执行更新。代码中定义了请求更新的链接 refresh,并可使用 destination 来指定更新目标。
代码清单 12-1 接收更新指令的源代码
package org.springframework.cloud.bus.endpoint; ...... public class RefreshBusEndpoint extends AbstractBusEndpoint { public RefreshBusEndpoint(ApplicationEventPublisher context, String id, Bus Endpoint delegate) { super(context, id, delegate); } @RequestMapping( value = {"refresh"}, method = {RequestMethod.POST} ) @ResponseBody public void refresh(@RequestParam( value = "destination", required = false ) String destination) { this.publish(new RefreshRemoteApplicationEvent(this, this.getInstance Id(), destination)); } }
12.1.2 更新消息的分发原理
配置管理服务器中的消息分发是从 spring-cloud-bus 中调用 spring-cloud-stream 组件来实现的,而 spring-cloud-stream 使用 RabbitMQ 实现了分布式消息的分发。
RabbitMQ 的消息服务一般需要创建一个交换机 Exchange 和一个队列 Queue,然后将交换机和队列进行绑定。而在设计配置服务器时并没有做这方面的工作,所做的工作仅仅是配置引用 spring-cloud-bus 的依赖和设置连接 RabbitMQ 服务器的参数而已。这个工作其实已经由 spring-cloud-stream 帮我们实现了。
从 RabbitMessageChannelBinder 的源代码中可以看到这部分的实现原理,代码清单 12-2 是一个消息发布方的队列绑定的实现。其中 exchangeName 是一个交换机的名字,baseQueueName 是一个队列的名字,并且从代码中也可以看出它使用了 TopicExchange 交换机,这是 RabbitMQ 中 4 种交换机(Fanout、Direct、Topic、Header)的其中一种,并且也可以看出代码中使用 setRoutingKey 将交换机和队列做了绑定。
代码清单 12-2 RabbitMessageChannelBinder 的源代码
package org.springframework.cloud.stream.binder.rabbit; ...... public class RabbitMessageChannelBinder extends MessageChannelBinderSupport implements DisposableBean { private AmqpOutboundEndpoint buildOutboundEndpoint(String name, RabbitMessage ChannelBinder.RabbitPropertiesAccessor properties, RabbitTemplate rabbitTemplate) { String prefix = properties.getPrefix(this.defaultPrefix); String exchangeName = applyPrefix(prefix, name); String partitionKeyExtractorClass = properties.getPartitionKeyExtractor Class(); Expression partitionKeyExpression = properties.getPartitionKeyExpression(); TopicExchange exchange = new TopicExchange(exchangeName); this.declareExchange(exchangeName, exchange); AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(rabbitTemplate); endpoint.setExchangeName(exchange.getName()); String baseQueueName = exchangeName + ".default"; if(partitionKeyExpression == null && !StringUtils.hasText(partitionKey ExtractorClass)) { Queue var15 = new Queue(baseQueueName, true, false, false, this. queueArgs(properties, baseQueueName)); this.declareQueue(baseQueueName, var15); this.autoBindDLQ(baseQueueName, baseQueueName, properties); endpoint.setRoutingKey(name); org.springframework.amqp.core.Binding var16 = BindingBuilder.bind (var15).to(exchange).with(name); this.declareBinding(baseQueueName, var16); } else { endpoint.setExpressionRoutingKey(EXPRESSION_PARSER.parseExpression (this.buildPartitionRoutingExpression(name))); for(int i = 0; i < properties.getNextModuleCount(); ++i) { String partitionSuffix = "-" + i; String partitionQueueName = baseQueueName + partitionSuffix; Queue queue = new Queue(partitionQueueName, true, false, false, this.queueArgs(properties, partitionQueueName)); this.declareQueue(queue.getName(), queue); this.autoBindDLQ(baseQueueName, baseQueueName + partitionSuffix, properties); this.declareBinding(queue.getName(), BindingBuilder.bind(queue). to(exchange).with(name + partitionSuffix)); } } this.configureOutboundHandler(endpoint, properties); return endpoint; } ...... }
现在我们更加明白,为什么使用 Spring Boot 可以那么简单,就是因为一些复杂的配置和方法都已经由 Spring Boot 及其所调用的一些组件实现了。至于在使用 RabbitMQ 中进行消息发布的实现,最终是由 RabbitTemplate 执行 doSend,将消息发布到 RabbitMQ 服务器上,如代码清单 12-3 所示。
代码清单 12-3 消息发布源代码
package org.springframework.amqp.rabbit.core; ...... public class RabbitTemplate extends RabbitAccessor implements BeanFactoryAware, RabbitOperations, MessageListener, ListenerContainerAware, Listener { public void send(Message message) throws AmqpException { this.send(this.exchange, this.routingKey, message); } public void send(final String exchange, final String routingKey, final Message message, final CorrelationData correlationData) throws AmqpException { this.execute(new ChannelCallback() { public Object doInRabbit(Channel channel) throws Exception { RabbitTemplate.this.doSend(channel, exchange, routingKey, message,RabbitTemplate.this.returnCallback != null && ((Boolean)RabbitTemplate. this.mandatoryExpression.getValue(RabbitTemplate.this.evaluationContext, message, Boolean.class)).booleanValue(), correlationData); return null; } }, this.obtainTargetConnectionFactoryIfNecessary(this.sendConnection FactorySelectorExpression, message)); } ...... }
使用配置管理服务的客户端都订阅了 RabbitMQ 服务器的消息,当收到更新消息时,即从配置管理服务器中取得更新文件,然后在本地上执行更新配置的流程。
有关消息的发布和订阅的实现方法,最后通过一个简单的实例,使用 spring-cloud-stream,更加形象地说明这种分布式消息的发布和接收的原理。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论