- 内容提要
- 译者序
- 前言
- 第 1 章 欢迎迈入云世界,Spring
- 第 2 章 使用 Spring Boot 构建微服务
- 第 3 章 使用 Spring Cloud 配置服务器控制配置
- 第 4 章 服务发现
- 第 5 章 使用 Spring Cloud 和 Netflix Hystrix 的客户端弹性模式
- 第 6 章 使用 Spring Cloud 和 Zuul 进行服务路由
- 第 7 章 保护微服务
- 第 8 章 使用 Spring Cloud Stream 的事件驱动架构
- 第 9 章 使用 Spring Cloud Sleuth 和 Zipkin 进行分布式跟踪
- 第 10 章 部署微服务
- 附录 A 在桌面运行云服务
- 附录 B OAuth2 授权类型
8.4 Spring Cloud Stream 用例:分布式缓存
到目前为止,我们拥有两个使用消息传递进行通信的服务,但是我们并没有真正处理消息。现在我们将要构建在本章前面讨论过的分布式缓存示例。我们将让许可证服务始终检查分布式的 Redis 缓存以获取与特定许可证相关联的组织数据。如果组织数据在缓存中存在,那么将从缓存中返回数据。否则,将调用组织服务,并将调用的结果缓存在一个 Redis 散列中。
在组织服务中更新数据时,组织服务将向 Kafka 发出一条消息。许可证服务将接收消息,并对 Redis 发出删除指令,以清除缓存。
云缓存与消息传递
使用 Redis 作为分布式缓存与云中的微服务开发密切相关。以我目前的雇主来为例,我们使用亚马逊 Web 服务(AWS)构建我们的解决方案,并且是亚马逊的 DynamoDB 的重度使用者。我们还使用亚马逊的 ElastiCache(Redis)增强如下功能。
- 提高查找常用数据的性能 ——通过使用缓存,我们显著提高了几个关键服务的性能。我们销售的产品中的所有表都是多租户的(在单个表中保存多个客户记录),这意味着它们可以非常大。由于缓存倾向于留住“大量”使用的数据,所以我们使用 Redis 和缓存来避免读取 DynamoDB,从而显著提高了性能。
- 减少持有数据的 DynamoDB 表上的负载(和成本) ——在 DynamoDB 中访问数据可能是一项昂贵的提议。应用程序发出的每一次读取都是一次收费事件。使用 Redis 服务器通过主键读取要比 DynamoDB 读取便宜得多。
- 增加弹性,以便在主数据存储(DynamoDB)存在性能问题时,服务能够优雅地降级 ——如果 AWS DynamoDB 出现问题(这确实偶尔发生),使用诸如 Redis 这样的缓存可以帮助服务优雅地降级。根据在缓存中保存的数据量,缓存解决方案可以帮助减少从访问数据存储中获取的错误的数量。
Redis 远远不止是一个缓存解决方案,但是如果开发人员需要一个分布式缓存,它可以充当这个角色。
8.4.1 使用 Redis 来缓存查找
现在先从设置许可证服务以使用 Redis 开始。幸运的是,Spring Data 已经简化了将 Redis 引入许可证服务中的工作。要在许可证服务中使用 Redis,需要做以下 4 件事情。
(1)配置许可证服务以包含 Spring Data Redis 依赖项。
(2)构造一个到 Redis 服务器的数据库连接。
(3)定义 Spring Data Redis 存储库,代码将使用它与一个 Redis 散列进行交互。
(4)使用 Redis 和许可证服务来存储和读取组织数据。
1.配置许可证服务以包含 Spring Data Redis 依赖项
需要做的第一件事就是将 spring-data-redis
、 jedis
以及 common-pools2
依赖项包含在许可证服务的 pom.xml 文件中。代码清单 8-7 展示了要包含的依赖项。
代码清单 8-7 添加 Spring Redis 依赖项
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.7.4.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.0</version>
</dependency>
2.构造一个到 Redis 服务器的数据库连接
既然已经在 Maven 中添加了依赖项,接下来就需要建立一个到 Redis 服务器的连接。Spring 使用开源项目 Jedis 与 Redis 服务器进行通信。要与特定的 Redis 实例进行通信,需要在 licensing-service/src/main/java/com/thoughtmechanix/licenses/Application.java 中的 Application
类中公开一个 JedisConnectionFactory
作为 Spring bean。一旦连接到 Redis,将使用该连接创建一个 Spring RedisTemplate
对象。我们很快会实现 Spring Data 存储库类,它们将使用 RedisTemplate
对象来执行查询,并将组织服务数据保存到 Redis 服务中。代码清单 8-8 展示了这段代码。
代码清单 8-8 确定许可证服务将如何与 Redis 进行通信
package com.thoughtmechanix.licenses;
// 为了简洁,省略了大部分 import 语句
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker
@EnableBinding(Sink.class)
public class Application {
@Autowired
private ServiceConfig serviceConfig;
// 为了简洁,省略了类中的其他方法
@Bean
public JedisConnectionFactory jedisConnectionFactory() { ⇽--- jedisConnectionFactory() 方法设置到 Redis 服务器的实际数据库连接
JedisConnectionFactory jedisConnFactory = new JedisConnectionFactory();
jedisConnFactory.setHostName( serviceConfig.getRedisServer() );
jedisConnFactory.setPort( serviceConfig.getRedisPort() );
return jedisConnFactory;
}
@Bean
public RedisTemplate<String, Object> redisTemplate() { ⇽--- redisTemplate() 方法创建一个 RedisTemplate,用于对 Redis 服务器执行操作
RedisTemplate<String, Object> template = new RedisTemplate<String,
→ Object>();
template.setConnectionFactory(jedisConnectionFactory());
return template;
}
}
建立许可证服务与 Redis 进行通信的基础工作已经完成。现在让我们来编写从 Redis 查询、添加、更新和删除数据的逻辑。
3.定义 Spring Data Redis 存储库
Redis 是一个键值数据存储,它的作用类似于一个大的、分布式的、内存中的 HashMap。在最简单的情况下,它存储数据并按键查找数据。Redis 没有任何复杂的查询语言来检索数据。它的简单性是它的优点,也是这么多项目采用它的原因之一。
因为我们使用 Spring Data 来访问 Redis 存储,所以需要定义一个存储库类。读者可能还记得在第 2 章中,Spring Data 使用用户定义的存储库类为 Java 类提供一个简单的机制来访问 Postgres 数据库,而无须开发人员编写低级的 SQL 查询。
对于许可证服务,我们将为 Redis 存储库定义两个文件。将要编写的第一个文件是一个 Java 接口,它将被注入任何需要访问 Redis 的许可证服务类中。这个 OrganizationRedisRepository
接口(在 licensing- service/src/main/java/com/thoughtmechanix/licenses/repository/OrganizationRedis Repository.java 中)如代码清单 8-9 所示。
代码清单 8-9 OrganizationRedisRepository
定义用于调用 Redis 的方法
package com.thoughtmechanix.licenses.repository;
import com.thoughtmechanix.licenses.model.Organization;
public interface OrganizationRedisRepository {
void saveOrganization(Organization org);
void updateOrganization(Organization org);
void deleteOrganization(String organizationId);
Organization findOrganization(String organizationId);
}
第二个文件是 OrganizationRedisRepository
接口的实现。这个接口的实现,即 licensing- service/src/main/java/com/thoughtmechanix/licenses/repository/OrganizationRedisRepositoryImpl.java 中的 OranizationRedisRepositoryImpl
类,使用了之前在代码清单 8-8 中定义的 RedisTemplate
来与 Redis 服务器进行交互,并对 Redis 服务器执行操作。代码清单 8-10 展示了所使用的代码。
代码清单 8-10 OrganizationRedisRepositoryImpl
实现
package com.thoughtmechanix.licenses.repository;
// 为了简洁,省略了大部分 import 语句
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
@Repository ⇽--- 这个 @Repository 注解告诉 Spring,这个类是一个与 Spring Data 一起使用的存储库类
public class OrganizationRedisRepositoryImpl implements
→ OrganizationRedisRepository {
private static final String HASH_NAME="organization"; ⇽--- 在 Redis 服务器中存储组织数据的散列的名称
private RedisTemplate<String, Organization> redisTemplate;
private HashOperations hashOperations; ⇽--- HashOperations 类包含一组用于在 Redis 服务器上执行数据操作的辅助方法
public OrganizationRedisRepositoryImpl(){
super();
}
@Autowired
private OrganizationRedisRepositoryImpl(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
@PostConstruct
private void init() {
hashOperations = redisTemplate.opsForHash();
}
@Override
public void saveOrganization(Organization org) {
hashOperations.put(HASH_NAME, org.getId(), org); ⇽--- 与 Redis 的所有交互都将使用由键存储的单个 Organization 对象
}
@Override
public void updateOrganization(Organization org) {
hashOperations.put(HASH_NAME, org.getId(), org);
}
@Override
public void deleteOrganization(String organizationId) {
hashOperations.delete(HASH_NAME, organizationId);
}
@Override
public Organization findOrganization(String organizationId) {
return (Organization) hashOperations.get(HASH_NAME, organizationId);
}
}
OrganizationRedisRepositoryImpl
包含用于从 Redis 存储和检索数据的所有 CRUD(Create、Read、Update 和 Delete)逻辑。在代码清单 8-10 所示的代码中有两个关键问题需要注意。
- Redis 中的所有数据都是通过一个键存储和检索的。因为是存储从组织服务中检索到的数据,所以自然选择组织 ID 作为存储组织记录的键。
- 一个 Redis 服务器可以包含多个散列和数据结构。在针对 Redis 服务器的每个操作中,需要告诉 Redis 执行操作的数据结构的名字。在代码清单 8-10 中,使用的数据结构名称存储在
HASH_NAME
常量中,其值为“organization”。
4.使用 Redis 和许可证服务来存储和读取组织数据
在完成对 Redis 执行操作的代码之后,就可以修改许可证服务,以便每次许可证服务需要组织数据时,它会在调用组织服务之前检查 Redis 缓存。检查 Redis 的逻辑将出现在 licensing- service/src/main/java/com/thoughtmechanix/licenses/clients/OrganizationRestTemplateClient.java 中的 OrganizationRestTemplateClient
类中。这个类的代码如代码清单 8-11 所示。
代码清单 8-11 OrganizationRestTemplateClient
将实现缓存逻辑
package com.thoughtmechanix.licenses.clients;
// 为了简洁,省略了 import 语句
@Component
public class OrganizationRestTemplateClient {
@Autowired
RestTemplate restTemplate;
@Autowired
OrganizationRedisRepository orgRedisRepo; ⇽--- OrganizationRedisRepository 被自动装配到 Organization RestTemplateClient
private static final Logger logger =
→ LoggerFactory.getLogger(OrganizationRestTemplateClient.class);
private Organization checkRedisCache(String organizationId) { ⇽--- 尝试使用组织 ID 从 Redis 中检索 Organization 类
try {
return orgRedisRepo.findOrganization(organizationId);
}
catch (Exception ex){
logger.error("Error encountered while trying to
→ retrieve organization {} check Redis Cache.Exception {}",
→ organizationId, ex);
return null;
}
}
private void cacheOrganizationObject(Organization org) {
try {
orgRedisRepo.saveOrganization(org);
}
catch (Exception ex){
logger.error("Unable to cache organization {} in Redis.
→ exception {}", org.getId(), ex);
}
}
public Organization getOrganization(String organizationId){
logger.debug("In Licensing Service.getOrganization: {}",
→ UserContext.getCorrelationId());
Organization org = checkRedisCache(organizationId);
if (org!=null){ ⇽--- 如果无法从 Redis 中检索出数据,那么将调用组织服务从源数据库检索数据
logger.debug("I have successfully
→ retrieved an organization {} from the redis cache: {}",
→ organizationId, org);
return org;
}
logger.debug("Unable to locate organization from the redis cache: {}.",
→ organizationId);
ResponseEntity<Organization> restExchange = restTemplate.exchange(
→ "http://zuulservice/api/organization/v1/organizations/{organizationId}",
→ HttpMethod.GET,
→ null,
→ Organization.class,
→ organizationId);
/*将记录保存到缓存中*/
org = restExchange.getBody();
if (org!=null) { ⇽--- 将检索到的对象保存到缓存中
cacheOrganizationObject(org);
}
return org;
}
}
getOrganization()
方法是调用组织服务的地方。在进行实际的 REST 调用之前,尝试使用 checkRedisCache()
方法从 Redis 中检索与调用相关联的组织对象。如果该组织对象不在 Redis 中,则代码将返回一个 null
值。如果从 checkRedisCache()
方法返回一个 null
值,那么代码将调用组织服务的 REST 端点来检索所需的组织记录。如果组织服务返回一条组织记录,那么将使用 cacheOrganizationObject()
方法缓存返回的组织对象。
注意
在与缓存进行交互时,要特别注意异常处理。为了提高弹性,如果无法与 Redis 服务器通信,我们绝对不会让整个调用失败。相反,我们会记录异常,并让调用转到组织服务。在这个特定的用例中,缓存旨在帮助提高性能,而缓存服务器的缺失不应该影响调用的成功。
有了 Redis 缓存代码,接下来应该访问许可证服务(是的,目前只有两个服务,但是有很多基础设施),并查看代码清单 8-10 中的日志消息。如果读者连续访问以下许可证服务端点 http://localhost:5555/api/licensing/v1/organizations/e254f8c-c442-4ebe-a82a-
e2fc1d1ff78a/licenses/f3831f8c-c338-4ebe-a82a-e2fc1d1ff78a
两次,那么应该在日志中看到以下两个输出语句:
licensingservice_1
| 2016-10-26 09:10:18.455 DEBUG 28 --- [nio-8080-exec-
1] c.t.l.c.OrganizationRestTemplateClient : Unable to locate
organization from the redis cache: e254f8c-c442-4ebe-a82a-e2fc1d1ff78a.
licensingservice_1
| 2016-10-26 09:10:31.602 DEBUG 28 --- [nio-8080-exec-
2] c.t.l.c.OrganizationRestTemplateClient : I have successfully
retrieved an organization e254f8c-c442-4ebe-a82a-e2fc1d1ff78a from the
redis cache: com.thoughtmechanix.licenses.model.Organization@6d20d301
来自控制台的第一行显示,第一次调用尝试为组织访问许可证服务端点 e254f8c-c442-4ebe-a82a-e2fc1d1ff78a
。许可证服务首先检查了 Redis 缓存,但找不到要查找的组织记录。然后代码调用组织服务来检索数据。从控制台显示出来的第二行表明,在第二次访问许可证服务端点时,组织记录已被缓存了。
8.4.2 定义自定义通道
之前我们在许可证服务和组织服务之间构建了消息集成,以便使用默认的 output
和 input
通道,这些通道与 Source
和 Sink
接口一起打包在 Spring Cloud Stream 项目中。然而,如果想要为应用程序定义多个通道,或者想要定制通道的名称,那么开发人员可以定义自己的接口,并根据应用程序需要公开任意数量的输入和输出通道。
要在许可证服务里面创建名为 inboundOrgChanges
的自定义通道,可以在 licensing-service/ src/main/java/com/thoughtmechanix/licenses/events/CustomChannels.java 的 CustomChannels
接口中进行定义,如代码清单 8-12 所示。
代码清单 8-12 为许可证服务定义一个自定义 input
通道
package com.thoughtmechanix.licenses.events;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface CustomChannels {
@Input("inboundOrgChanges") ⇽--- @Input 是方法级别的注解,它定义了通道的名称
SubscribableChannel orgs(); ⇽--- 通过 @Input 注解公开的每个通道必须返回一个 SubscribableChannel 类
}
代码清单 8-12 中的关键信息是,对于要公开的每个自定义 input
通道,使用 @Input
注解标记一个返回 SubscribableChannel
类的方法。如果想要为发布的消息定义 outpu``t
通道,可以在将要调用的方法上使用 @OutputChannel
。在 output
通道的情况下,定义的方法将返回一个 MessageChannel
类而不是与 input
通道一起使用的 SubscribableChannel
类。
@OutputChannel("outboundOrg")
MessageChannel outboundOrg();
定义完自定义 input
通道之后,接下来就需要在许可证服务中修改两样东西来使用它。首先,需要修改许可证服务,以将自定义 input
通道名称映射到 Kafka 主题。代码清单 8-13 展示了这一点。
代码清单 8-13 修改许可证服务以使用自定义 input
通道
spring:
...
cloud:
...
stream:
bindings:
inboundOrgChanges: ⇽--- 将通道的名称从 input 更改为 inboundOrgChanges
destination: orgChangeTopic
content-type: application/json
group: licensingGroup
要使用自定义 input
通道,需要将定义的 CustomChannels
接口注入将要使用它来处理消息的类中。对于分布式缓存示例,我已经将处理传入消息的代码移到了 licensing-service 文件夹下的 OrganizationChangeHandler
类(在 licensing-service/src/main/java/com/-thoughtmechanix/ licenses/events/handlers/OrganizationChange Handler.java 中)。代码清单 8-14 展示了与定义的 inboundOrgChanges
通道一起使用的消息处理代码。
代码清单 8-14 在 OrganizationChangeHandler
中使用新的自定义通道
@EnableBinding(CustomChannels.class) ⇽--- 将 @EnableBindings 从 Application 类移到 OrganizationChangeHandler 类。这一次不使用 Sink.class,而是使用 CustomChannels 类作为参数进行传入
public class OrganizationChangeHandler {
@StreamListener("inboundOrgChanges") ⇽--- 使用 @StreamListener 注解传入通道名称 inboundOrgChanges 而不是使用 Sink.INPUT
public void loggerSink(OrganizationChangeModel orgChange) {
... //为了简洁,省略了其余的代码
}
}
}
8.4.3 将其全部汇集在一起:在收到消息时清除缓存
到目前为止,我们不需要对组织服务做任何事。该服务被设置为在组织被添加、更新或删除时发布一条消息。我们需要做的就是根据代码清单 8-14 构建出 OrganizationChange-Handler
类。代码清单 8-15 展示了这个类的完整实现。
代码清单 8-15 处理许可证服务中的组织更改
@EnableBinding(CustomChannels.class)
public class OrganizationChangeHandler {
@Autowired
private OrganizationRedisRepository organizationRedisRepository; ⇽--- 用于与 Redis 进行交互的 OrganizationRedis Repository 被注入 OrganizationChange Handler
private static final Logger logger =
→ LoggerFactory.getLogger(OrganizationChangeHandler.class);
@StreamListener("inboundOrgChanges")
public void loggerSink(OrganizationChangeModel orgChange) { ⇽--- 在收到消息时,检查与数据相关的操作,然后做出相应的反应
switch(orgChange.getAction()){
// 为了简洁,省略了其余的代码
case "UPDATE":
logger.debug("Received a UPDATE event
→ from the organization service for organization id {}",
→ orgChange.getOrganizationId()); ⇽--- 如果组织数据被更新或者删除,那么就通过 organizationRedisRepository 类从 Redis 中移除组织数据
organizationRedisRepository
→ .deleteOrganization(orgChange.getOrganizationId());
break;
case "DELETE":
logger.debug("Received a DELETE event
→ from the organization service for organization id {}",
→ orgChange.getOrganizationId()); ⇽--- 如果组织数据被更新或者删除,那么就通过 organizationRedisRepository 类从 Redis 中移除组织数据
organizationRedisRepository
→ .deleteOrganization(orgChange.getOrganizationId());
break;
default:
logger.error("Received an UNKNOWN event
→ from the organization service of type {}",
→ orgChange.getType());
break;
}
}
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论