返回介绍

8.3 编写简单的消息生产者和消费者

发布于 2025-04-22 21:54:09 字数 13427 浏览 0 评论 0 收藏

现在我们已经了解完 Spring Cloud Stream 中的基本组件,接下来看一个简单的 Spring Cloud Stream 示例。对于第一个例子,我们将要从组织服务传递一条消息到许可证服务。在许可证服务中,唯一要做的事情就是将日志消息打印到控制台。

另外,在这个例子中,因为只有一个 Spring Cloud Stream 发射器(消息生成者)和接收器(消息消费者),所以我们将要采用 Spring Cloud 提供的一些便捷方式,让在组织服务中建立发射器以及在许可证服务中建立接收器变得更简单。

8.3.1 在组织服务中编写消息生产者

我们首先修改组织服务,以便每次添加、更新或删除组织数据时,组织服务将向 Kafka 主题(topic)发布一条消息,指示组织更改事件已经发生。图 8-4 突出显示了消息生产者,并构建在图 8-3 所示的通用 Spring Cloud Stream 架构之上。

图 8-4 当组织服务数据发生变化时,它会向 Kafka 发布消息

发布的消息将包括与更改事件相关联的组织 ID,还将包括发生的操作(添加、更新或删除)。

需要做的第一件事就是在组织服务的 Maven pom.xml 文件中设置 Maven 依赖项。pom.xml 文件可以在 organization-service 目录中找到。在 pom.xml 中,需要添加两个依赖项:一个用于核心 Spring Cloud Stream 库,另一个用于包含 Spring Cloud Stream Kafka 库。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

定义完 Maven 依赖项,就需要告诉应用程序它将绑定到 Spring Cloud Stream 消息代理。这可以通过使用 @EnableBinding 注解来标注组织服务的引导类 Application (在 organization- service/src/main/java/com/thoughtmechanix/organization/Application.java 中)来完成。代码清单 8-1 展示了组织服务的 Application 类的源代码。

代码清单 8-1 带注解的 Application

package com.thoughtmechanix.organization;

import com.thoughtmechanix.organization.utils.UserContextFilter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import javax.servlet.Filter;

@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker
@EnableBinding(Source.class)  ⇽--- @EnableBinding 注解告诉 Spring Cloud Stream 将应用程序绑定到消息代理
public class Application {
    @Bean
    public Filter userContextFilter() {
        UserContextFilter userContextFilter = new UserContextFilter();
        return userContextFilter;
    }
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

在代码清单 8-1 中, @EnableBinding 注解告诉 Spring Cloud Stream 希望将服务绑定到消息代理。 @EnableBinding 注解中的 Source.class 告诉 Spring Cloud Stream,该服务将通过在 Source 类上定义的一组通道与消息代理进行通信。记住,通道位于消息队列之上。Spring Cloud Stream 有一个默认的通道集,可以配置它们来与消息代理进行通信。

到目前为止,我们还没有告诉 Spring Cloud Stream 希望将组织服务绑定到什么消息代理。本章很快就会讲到这一点。现在,我们可以继续实现将要发布消息的代码。

消息发布的代码可以在 organization-service/src/com/thoughtmechanix/organization/events/source/ SimpleSourceBean.java 中找到。代码清单 8-2 展示了这个 SimpleSourceBean 类的代码。

代码清单 8-2 向消息代理发布消息

package com.thoughtmechanix.organization.events.source;

// 为了简洁,省略了 import 语句

@Component
public class SimpleSourceBean {
    private Source source;

    private static final Logger logger =
    →  LoggerFactory.getLogger(SimpleSourceBean.class);

    @Autowired
    public SimpleSourceBean(Source source){  ⇽--- Spring Cloud Stream 将注入一个 Source 接口,以供服务使用
        this.source = source;
    }

    public void publishOrgChange(String action,String orgId){
        logger.debug("Sending Kafka message {}for Organization Id: {}",
        →  action, orgId);
        OrganizationChangeModel change = new OrganizationChangeModel(
        →  OrganizationChangeModel.class.getTypeName(),
        →  action,
        →  orgId,
        →  UserContext.getCorrelationId());  ⇽--- 要发布的消息是一个 Java POJO

        source
            .output()
            .send(MessageBuilder.withPayload(change).build());  ⇽--- 当准备发送消息时,使用 Source 类中定义的通道的 send() 方法
    }
}

在代码清单 8-2 中,我们将 Spring Cloud Source 类注入代码中。记住,所有与特定消息主题的通信都是通过称为通道的 Spring Cloud Stream 结构来实现的。通道由一个 Java 接口类表示。在代码清单 8-2 中,我们使用的是 Source 接口。 Source 是 Spring Cloud 定义的一个接口,它公开了一个名为 output() 的方法。当服务只需要发布到单个通道时, Source 接口是一个很方便的接口。 output() 方法返回一个 MessageChannel 类型的类。 MessageChannel 代表了如何将消息发送给消息代理。本章稍后将介绍如何使用自定义接口来公开多个消息传递通道。

消息的实际发布发生在 publishOrgChange() 方法中。此方法构建一个 Java POJO,名为 OrganizationChangeModel 。本章不会展示 OrganizationChangeModel 的代码,因为这个类只是一个包含 3 个数据元素的 POJO。

  • 动作(action) ——这是触发事件的动作。我在消息中包含了这个动作,以便让消息消费者在处理事件的过程中有更多的上下文。
  • 组织 ID(organization ID) ——这是与事件关联的组织 ID。
  • 关联 ID(correlation ID) ——这是触发事件的服务调用的关联 ID。应该始终在事件中包含关联 ID,因为它对跟踪和调试流经服务的消息流有极大的帮助。

当准备好发布消息时,可使用从 source.output() 返回的 MessageChannelsend() 方法:

source.output().send(MessageBuilder.withPayload(change).build());

send() 方法接收一个 Spring Message 类。我们使用一个名为 MessageBuilder 的 Spring 辅助类来接收 OrganizationChangeModel 类的内容,并将它转换为 Spring Message 类。

这就是发送消息所需的所有代码。然而,到目前为止,这一切都感觉有点儿像魔术,因为我们还没有看到如何将组织服务绑定到一个特定的消息队列,更不用说实际的消息代理。上述的这一切都是通过配置来完成的。代码清单 8-3 展示了这一配置,它将服务的 Spring Cloud Stream Source 映射到 Kafka 消息代理以及 Kafka 中的消息主题。此配置信息可以位于服务的 application.yml 文件中,也可以位于服务的 Spring Cloud Config 条目中。

代码清单 8-3 用于发布消息的 Spring Cloud Stream 配置

spring:
  application:
    name: organizationservice
  #为了简洁,省略了其余配置
    stream:  ⇽--- stream.bindings 是所需配置的开始,用于服务将消息发布到 Spring Cloud Stream 消息代理
      bindings:
        output:  ⇽--- output 是通道的名称,映射到在代码清单 8-2 中看到的 source.output() 通道
            destination: orgChangeTopic  ⇽--- 这是要写入消息的消息队列(或主题)的名称
            content-type: application/json  ⇽--- content-type 向 Spring Cloud Stream 提供了将要发送和接收什么类型的消息的提示(在本例中是 JSON)
        kafka:  ⇽--- stream.bindings.kafka 属性告诉 Spring,将使用 Kafka 作为服务中的消息总线(可以使用 RabbitMQ 作为替代)
          binder:
            zkNodes: localhost  ⇽--- Zknodes 和 brokers 属性告诉 Spring Cloud Stream,Kafka 和 ZooKeeper 的网络位置
            brokers: localhost

代码清单 8-3 中的配置看起来很密集,但很简单。代码清单 8-3 中的配置属性 spring.stream.bindings.output 将代码清单 8-2 中的 source.output() 通道映射到要与之通信的消息代理上的主题 orgChangeTopic 。它还告诉 Spring Cloud Stream,发送到此主题的消息应该被序列化为 JSON。Spring Cloud Stream 可以以多种格式序列化消息,包括 JSON、XML 以及 Apache 基金会的 Avro 格式。

代码清单 8-3 中的配置属性 spring.stream.bindings.kafka 告诉 Spring Cloud Stream,将服务绑定到 Kafka。子属性告诉 Spring Cloud Stream,Kafka 消息代理和运行着 Kafka 的 Apache ZooKeeper 服务器的网络地址。

我们已经编写完通过 Spring Cloud Stream 发布消息的代码,并通过配置来告诉 Spring Cloud Stream 它将使用 Kafka 作为消息代理,那么接下来让我们来看看,组织服务中消息的发布实际发生在哪里。这项工作将在 organization-service/src/main/java/com/thoughtmechanix/organization/ services/OrganizationService.java 中的 OranizationServer 类完成。代码清单 8-4 展示了这个类的代码。

代码清单 8-4 在组织服务中发布消息

package com.thoughtmechanix.organization.services;

// 为了简洁,省略了 import 语句
@Service
public class OrganizationService {
    @Autowired
    private OrganizationRepository orgRepository;

    @Autowired  ⇽--- Spring 的自动装配用于将 SimpleSourceBean 注入组织服务中
    SimpleSourceBean simpleSourceBean;

    // 为了简洁,省略了类的其余部分
    public void saveOrg(Organization org){
        org.setId( UUID.randomUUID().toString());

        orgRepository.save(org);
        simpleSourceBean.publishOrgChange("SAVE", org.getId());  ⇽--- 对服务中修改组织数据的每一个方法,调用 simpleSourceBean. publishOrgChange()
    }
}

应该在消息中放置什么数据

我从团队中听到的一个最常见的问题是,当他们第一次开始消息之旅时,应该在消息中放置多少数据。我的答案是,这取决于你的应用程序。正如读者可能注意到的,在我的所有示例中,我只返回已更改的组织记录的组织 ID。我从来没有把数据更改的副本放在消息中。在我的例子中(以及我在电话通信领域中遇到的许多问题),执行的业务逻辑对数据的变化非常敏感。我使用基于系统事件的消息来告诉其他服务,数据状态已经发生了变化,但是我总是强制其他服务重新到主服务器(拥有数据的服务)上来检索数据的新副本。这种方法在执行时间方面是昂贵的,但它也保证我始终拥有最新的数据副本。在从源系统读取数据之后,所使用的数据依然可能会发生变化,但这比在队列中盲目地消费信息的可能性要小得多。

要仔细考虑要传递多少数据。开发人员迟早会遇到这样一种情况:传递的数据已经过时了。这些数据可能是陈旧的,因为出现某种问题导致它在消息队列待了太长时间,或者之前包含数据的消息失败了,并且消息中传入的数据现在处于不一致的状态(因为应用程序依赖于消息的状态,而不是底层数据存储中的实际状态)。如果要在消息中传递状态,还要确保在消息中包含日期时间戳或版本号,以便使用数据的服务可以检查传递的数据,并确保它不会比服务已拥有的数据副本更旧(记住,数据可以不按顺序进行检索)。

8.3.2 在许可证服务中编写消息消费者

到目前为止,我们已经修改了组织服务,以便在组织服务更改组织数据时向 Kafka 发布消息。任何对组织数据感兴趣的服务,都可以在不需要由组织服务显式调用的情况下作出反应。这还意味着开发人员可以轻松地添加新的功能,可以让它们监听消息队列中的消息来对组织服务中的更改作出反应。现在让我们换一个角度,看看服务如何使用 Spring Cloud Stream 来消费消息。

对于本示例,我们将使用许可证服务消费组织服务发布的消息。图 8-5 展示了将许可证服务融入图 8-3 所示的 Spring Cloud Stream 架构中的什么地方。

图 8-5 当一条消息进入 Kafka 的 orgChangeTopic 时,许可证服务将作出响应

首先,还是需要将 Spring Cloud Stream 依赖项添加到许可证服务的 pom.xml 文件中。该 pom.xml 文件可以在本书源代码的 licensing-service 目录中找到。与之前看到的 organization-service pom.xml 文件类似,需要添加以下两个依赖项。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream</artifactId>
</dependency>

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

接下来,需要告诉许可证服务,它需要使用 Spring Cloud Stream 绑定到消息代理。像组织服务一样,我们将使用 @EnableBinding 注解来标注许可证服务引导类 Application (在 licensing-service/src/main/java/com/thoughtmechanix/licenses/Application.java 中)。许可证服务和组织服务之间的区别在于传递给 @EnableBinding 注解的值,如代码清单 8-5 所示。

代码清单 8-5 使用 Spring Cloud Stream 消费消息

package com.thoughtmechanix.licenses;

// 为了简洁,省略了 import 语句
@EnableBinding(Sink.class)  ⇽--- @EnableBinding 注解告诉服务使用 Sink 接口中定义的通道来监听传入的消息
public class Application {
    // 为了简洁,移除剩余代码
    @StreamListener(Sink.INPUT)  ⇽--- 每次收到来自 input 通道的消息时,Spring Cloud Stream 将执行此方法
    public void loggerSink(
        OrganizationChangeModel orgChange) {
            logger.debug("Received an event for organization id {}" ,
            →  orgChange.getOrganizationId());
    }
}

因为许可证服务是消息的消费者,所以将会把值 Sink.class 传递给 @EnableBinding 注解。这告诉 Spring Cloud Stream 使用默认的 Spring Sink 接口。与 8.3.1 节中描述的 Spring Cloud Steam Source 接口类似,Spring Cloud Stream 在 Sink 接口上公开了一个默认的通道,名为 input ,它用于监听通道上的传入消息。

定义了想要通过 @EnableBinding 注解来监听消息之后,就可以编写代码来处理来自 input 通道的消息。为此,要使用 Spring Cloud Stream 的 @StreamListener 注解。

@StreamListener 注解告诉 Spring Cloud Stream,每次从 input 通道接收消息,就会执行 loggerSink() 方法。Spring Cloud Stream 将自动把从通道中传出的消息反序列化为一个名为 OrganizationChangeModel 的 Java POJO。

同样,消息代理的主题到 input 通道的实际映射是在许可证服务的配置中完成的。对于许可证服务,其配置如代码清单 8-6 所示,可以在许可证服务的 licensing-service/src/main/resources/ application.yml 文件中找到。

代码清单 8-6 将许可证服务映射到 Kafka 中的消息主题

spring:
  application:
    name: licensingservice
    …  #为了简洁,省略了其余的配置
  cloud:
    stream:
      bindings:
        input:  ⇽--- spring.cloud.stream.bindings.input 属性将 input 通道映射到 orgChangeTopic 队列
          destination: orgChangeTopic
          content-type: application/json
          group: licensingGroup  ⇽--- 该 group 属性用于保证服务只处理一次
        binder:
          zkNodes: localhost
          brokers: localhost

代码清单 8-6 中的配置类似于组织服务的配置。然而,上述配置有两个关键的不同之处。首先,现在有一个名为 input 的通道定义在 spring.cloud.stream.bindings 属性下。这个值映射到代码清单 8-5 中代码里定义的 Sink.INPUT 通道,它的属性将 input 通道映射到 orgChangeTopic 。其次,我们看到这里引入了一个名为 spring.cloud.stream.bindings.input.group 的新属性。 group 属性定义将要消费消息的消费者组的名称。

消费者组的概念是这样的:开发人员可能拥有多个服务,每个服务都有多个实例侦听同一个消息队列,但是只需要服务实例组中的一个服务实例来消费和处理消息。 group 属性标识服务所属的消费者组。只要服务实例具有相同的组名,Spring Cloud Stream 和底层消息代理将保证,只有消息的一个副本会被属于该组的服务实例所使用。对于许可证服务, group 属性值将会是 licensingGroup

图 8-6 阐述了如何使用消费者组来强制跨多个服务消费的消息只被消费一次。

图 8-6 消费者组保证消息只会被一组服务实例处理一次

8.3.3 在实际操作中查看消息服务

现在,每当添加、更新或删除记录时,组织服务就将向 orgChangeTopic 发布消息,并且许可证服务从同一主题接收消息。通过更新组织服务记录并观察控制台,可以看到来自许可证服务的相应日志消息,以此来查看这段代码的实际操作。

要更新组织服务记录,我们将在组织服务上发送 PUT 请求来更新组织的联系电话号码。将要用来执行更新的端点是 http://localhost:5555/api/organization/v1/organizations/e254f8c-c442-4ebe- a82a-e2fc1d1ff78a,要发送到端点的 PUT 调用的请求体是:

{
    "contactEmail": "mark.balster@custcrmco.com",
    "contactName": "Mark Balster",
    "contactPhone": "823-555-2222",
    "id": "e254f8c-c442-4ebe-a82a-e2fc1d1ff78a",
    "name": "customer-crm-co"
}

图 8-7 展示了这个 PUT 调用返回的输出。

图 8-7 使用组织服务更新联系电话号码

一旦组织服务调用完成,就应该在运行服务的控制台窗口中看到图 8-8 所示的输出结果。

图 8-8 控制台将显示组织服务发送的消息,以及接下来被许可证服务接收的消息

现在已经有两个通过消息传递相互通信的服务。Spring Cloud Stream 充当了这些服务的中间人。从消息传递的角度来看,这些服务对彼此一无所知。它们使用消息传递代理来作为中介,并使用 Spring Cloud Stream 作为消息传递代理的抽象层进行通信。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。