返回介绍

12.1 配置管理实现原理

发布于 2025-04-26 13:26:37 字数 5749 浏览 0 评论 0 收藏

在第 8 章的实例中,我们知道,配置管理的在线更新功能使用事件总线,即 spring-cloud-bus 来发布状态变化,并且使用分布式消息来发布更新事件,而分布式消息最终使用了 RabbitMQ 来实现消息收发。

12.1.1 在线更新流程

使用配置管理,实现在线更新一般遵循下列步骤:

1)更新 Git 仓库的配置文件。

2)以 POST 指令触发更新请求。

3)配置管理服务器从 Git 仓库中读取配置文件,并将配置文件分发给各个客户端,同时在 RabbitMQ 中发布一个更新消息。

4)客户端订阅 RabbitMQ 消息,收到消息后执行更新。

在使用配置管理的演示实例中,使用如下 POST 指令来触发在线更新:

curl –




X POST http://localhost:8888/bus/refresh

接收这个更新指令的实现方法如代码清单 12-1 所示,其中的 publish 将会发布一个更新事件,调用 RabbitMQ 进行消息发布,然后由客户端收到消息后执行更新。代码中定义了请求更新的链接 refresh,并可使用 destination 来指定更新目标。

代码清单 12-1 接收更新指令的源代码

package org.springframework.cloud.bus.endpoint;
......
public class RefreshBusEndpoint extends AbstractBusEndpoint {
    public RefreshBusEndpoint(ApplicationEventPublisher context, String id, Bus
Endpoint delegate) {
        super(context, id, delegate);
    }
    @RequestMapping(
        value = {"refresh"},
        method = {RequestMethod.POST}
    )
    @ResponseBody
    public void refresh(@RequestParam(
    value = "destination",
    required = false
) String destination) {
        this.publish(new RefreshRemoteApplicationEvent(this, this.getInstance
Id(), destination));
    }
}

12.1.2 更新消息的分发原理

配置管理服务器中的消息分发是从 spring-cloud-bus 中调用 spring-cloud-stream 组件来实现的,而 spring-cloud-stream 使用 RabbitMQ 实现了分布式消息的分发。

RabbitMQ 的消息服务一般需要创建一个交换机 Exchange 和一个队列 Queue,然后将交换机和队列进行绑定。而在设计配置服务器时并没有做这方面的工作,所做的工作仅仅是配置引用 spring-cloud-bus 的依赖和设置连接 RabbitMQ 服务器的参数而已。这个工作其实已经由 spring-cloud-stream 帮我们实现了。

从 RabbitMessageChannelBinder 的源代码中可以看到这部分的实现原理,代码清单 12-2 是一个消息发布方的队列绑定的实现。其中 exchangeName 是一个交换机的名字,baseQueueName 是一个队列的名字,并且从代码中也可以看出它使用了 TopicExchange 交换机,这是 RabbitMQ 中 4 种交换机(Fanout、Direct、Topic、Header)的其中一种,并且也可以看出代码中使用 setRoutingKey 将交换机和队列做了绑定。

代码清单 12-2 RabbitMessageChannelBinder 的源代码

package org.springframework.cloud.stream.binder.rabbit;
......
public class RabbitMessageChannelBinder extends MessageChannelBinderSupport implements DisposableBean {
    private AmqpOutboundEndpoint buildOutboundEndpoint(String name, RabbitMessage
ChannelBinder.RabbitPropertiesAccessor properties, RabbitTemplate rabbitTemplate) {
        String prefix = properties.getPrefix(this.defaultPrefix);
        String exchangeName = applyPrefix(prefix, name);
        String partitionKeyExtractorClass = properties.getPartitionKeyExtractor
Class();
        Expression partitionKeyExpression = properties.getPartitionKeyExpression();
        TopicExchange exchange = new TopicExchange(exchangeName);
        this.declareExchange(exchangeName, exchange);
        AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(rabbitTemplate);
        endpoint.setExchangeName(exchange.getName());
        String baseQueueName = exchangeName + ".default";
        if(partitionKeyExpression == null && !StringUtils.hasText(partitionKey
ExtractorClass)) {
            Queue var15 = new Queue(baseQueueName, true, false, false, this.
queueArgs(properties, baseQueueName));
            this.declareQueue(baseQueueName, var15);
            this.autoBindDLQ(baseQueueName, baseQueueName, properties);
            endpoint.setRoutingKey(name);
            org.springframework.amqp.core.Binding var16 = BindingBuilder.bind
(var15).to(exchange).with(name);
            this.declareBinding(baseQueueName, var16);
        } else {
            endpoint.setExpressionRoutingKey(EXPRESSION_PARSER.parseExpression
(this.buildPartitionRoutingExpression(name)));
            for(int i = 0; i < properties.getNextModuleCount(); ++i) {
                String partitionSuffix = "-" + i;
                String partitionQueueName = baseQueueName + partitionSuffix;
                Queue queue = new Queue(partitionQueueName, true, false, false,
 this.queueArgs(properties, partitionQueueName));
                this.declareQueue(queue.getName(), queue);
                this.autoBindDLQ(baseQueueName, baseQueueName + partitionSuffix,
 properties);
                this.declareBinding(queue.getName(), BindingBuilder.bind(queue).
to(exchange).with(name + partitionSuffix));
            }
        }
        this.configureOutboundHandler(endpoint, properties);
        return endpoint;
    }
    ......
}

现在我们更加明白,为什么使用 Spring Boot 可以那么简单,就是因为一些复杂的配置和方法都已经由 Spring Boot 及其所调用的一些组件实现了。至于在使用 RabbitMQ 中进行消息发布的实现,最终是由 RabbitTemplate 执行 doSend,将消息发布到 RabbitMQ 服务器上,如代码清单 12-3 所示。

代码清单 12-3 消息发布源代码

package org.springframework.amqp.rabbit.core;
......
public class RabbitTemplate extends RabbitAccessor implements BeanFactoryAware, 
RabbitOperations, MessageListener, ListenerContainerAware, Listener {
    public void send(Message message) throws AmqpException {
        this.send(this.exchange, this.routingKey, message);
    }
    public void send(final String exchange, final String routingKey, final Message
 message, final CorrelationData correlationData) throws AmqpException {
        this.execute(new ChannelCallback() {
            public Object doInRabbit(Channel channel) throws Exception {
                RabbitTemplate.this.doSend(channel, exchange, routingKey, 
message,RabbitTemplate.this.returnCallback != null && ((Boolean)RabbitTemplate.
this.mandatoryExpression.getValue(RabbitTemplate.this.evaluationContext, 
message, Boolean.class)).booleanValue(), correlationData);
                return null;
            }
        }, this.obtainTargetConnectionFactoryIfNecessary(this.sendConnection
FactorySelectorExpression, message));
    }
    ......
}

使用配置管理服务的客户端都订阅了 RabbitMQ 服务器的消息,当收到更新消息时,即从配置管理服务器中取得更新文件,然后在本地上执行更新配置的流程。

有关消息的发布和订阅的实现方法,最后通过一个简单的实例,使用 spring-cloud-stream,更加形象地说明这种分布式消息的发布和接收的原理。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。