12.3 Spring WebFlux 实战
在第 11 章中通过一个促销活动的例子展示了 Spring Boot 的微服务开发过程。本节将采用 Spring WebFlux 框架重新改造一下促销活动的微服务项目。
在第 11 章中,microservice-promotion 项目是基于 Spring Boot 开发的,这里将使用 Spring WebFlux 框架进行项目改造,并进行完整的代码展示。
(1)在 pom.xml 文件中添加相关依赖,引入 spring-boot-starter-data-redis-reactive 包,具体依赖如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http:// www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https:// maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.9.RELEASE</version> <relativePath/> <!--lookup parent from repository --> </parent> <groupId>com.example.microservice.promotion</groupId> <artifactId>microservice-promotion</artifactId> <version>0.0.1-SNAPSHOT</version> <name>microservice-promotion</name> <description>microservice-promotion project for Spring Boot </description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive </artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.2.5.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config </artifactId> <version>2.2.5.RELEASE</version> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery </artifactId> <version>2.2.5.RELEASE</version> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel </artifactId> <version>2.2.5.RELEASE</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.2.3</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
(2)修改 application.xml 配置文件,在其中配置数据库连接方式,代码如下:
server: port: 8081 spring: application: name: microservice-promotion
(3)由于集成了 Nacos 和 Sentinel 中间件,因此需要修改 bootstrap.xml 配置文件,代码如下:
spring: cloud: nacos: discovery: server-addr: 127.0.0.1:8848 ip: 127.0.0.1 port: 80 namespace: 40421527-56ff-410b-8ca8-e025aca9e946 group: default config: server-addr: 127.0.0.1:8848 file-extension: properties namespace: 40421527-56ff-410b-8ca8-e025aca9e946 group: default sentinel: enabled: true transport: dashboard: 127.0.0.1:8888 clientIp: 127.0.0.1 port: 8719 log: dir: /log/sentinel filter: enabled: false management: endpoint: metrics: enabled: true prometheus: enabled: true endpoints: web: base-path: / exposure: include: health,info,status,prometheus metrics: export: prometheus: enabled: true tags: application: ${spring.application.name} web: server: request: autotime: enabled: true percentiles-histogram: on percentiles: -0.9 -0.99 client: request: autotime: enabled: true percentiles-histogram: on percentiles: -0.9 -0.99
(4)本例使用 log4j2 日志架构,配置如下:
<?xml version="1.0" encoding="UTF-8"?> <Configuration status="WARN"> <properties> <property name="LOG_HOME">/log</property> </properties> <Appenders> <Console name="CONSOLE" target="SYSTEM_OUT" > <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%t] %c{1.} %msg%n"/> </Console> <RollingRandomAccessFile name="INFO_FILE" fileName= "${LOG_HOME}/info.log" filePattern="${LOG_HOME}/info-%d{HH}- %i.log" immediateFlush="true"> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%traceId] %-5p %c{1.} %msg%n"/> <Policies> <TimeBasedTriggeringPolicy /> </Policies> <DefaultRolloverStrategy max="1"/> <Filters> <ThresholdFilter level="error" onMatch="ACCEPT" onMismatch="NEUTRAL"/> <ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/> </Filters> </RollingRandomAccessFile> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="CONSOLE" /> <AppenderRef ref="INFO_FILE" /> </Root> </Loggers> </Configuration>
(5)将 Redis 配置信息集成到 Nacos 上,具体的 Redis 信息如下:
redis.promotion.host=127.0.0.1 redis.promotion.port=6379 redis.promotion.password=test redis.promotion.maxTotal=2000 redis.promotion.maxIdle=100 redis.promotion.minIdle=40 redis.promotion.maxWaitMillis=3000 redis.promotion.timeBetweenEvictionRunsMillis=30000 redis.promotion.commandTimeout=3000
(6)Redis 自动配置如下:
新建 RedisProperties.class 文件,代码如下:
package com.example.promotion.config; import lombok.Data; import org.springframework.boot.context.properties.Configuration Properties; @Data @ConfigurationProperties(prefix = "redis") public class RedisProperties { private RedisInfo promotion; @Data public static class RedisInfo{ protected int maxTotal = 2000; //最大连接数 protected int maxIdle = 100; //最大空闲数 protected int minIdle = 40; //最小空闲数 protected int maxWaitMillis = 3000; //最长等待时间 //空闲回收休眠时间 protected int timeBetweenEvictionRunsMillis = 30000; protected int commandTimeout = 3000; //命令执行超时时间 private String host; //Redis 地址 private int port; //Redis 端口 private String password; //Redis 密码 } }
新建 RedisAutoConfiguration.class 文件,代码如下:
package com.example.promotion.config; import java.time.Duration; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.springframework.boot.autoconfigure.condition.Conditional OnClass; import org.springframework.boot.autoconfigure.condition.Conditional OnProperty; import org.springframework.boot.context.properties.EnableConfiguration Properties; import org.springframework.cloud.context.config.annotation.Refresh Scope; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisStandalone Configuration; import org.springframework.data.redis.connection.lettuce.Lettuce ClientConfiguration; import org.springframework.data.redis.connection.lettuce.Lettuce ConnectionFactory; import org.springframework.data.redis.connection.lettuce.Lettuce PoolingClientConfiguration; import org.springframework.data.redis.core. ReactiveStringRedis Template; @ConditionalOnClass(LettuceConnectionFactory.class) @Configuration @EnableConfigurationProperties(RedisProperties.class) @ConditionalOnProperty("redis.promotion.host") public class RedisAutoConfiguration { @Bean @RefreshScope public GenericObjectPoolConfig genericObjectPoolConfig(Redis Properties properties) { //通用线程池配置 GenericObjectPoolConfig genericObjectPoolConfig = new Generic ObjectPoolConfig(); //设置最大连接数 genericObjectPoolConfig.setMaxTotal(properties.getPromotion(). getMaxTotal()); //设置最大空闲数 genericObjectPoolConfig.setMaxIdle(properties.getPromotion(). getMaxIdle()); //设置最小空闲数 genericObjectPoolConfig.setMinIdle(properties.getPromotion(). getMinIdle()); //设置最长等待时间 genericObjectPoolConfig.setMaxWaitMillis(properties.get Promotion().getMaxWaitMillis()); //从连接池取出连接时检查有效性 genericObjectPoolConfig.setTestOnBorrow(true); //连接返回时检查有效性 genericObjectPoolConfig.setTestOnReturn(true); //空闲时检查有效性 genericObjectPoolConfig.setTestWhileIdle(true); //空闲回收休眠时间 genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis (properties.getPromotion().getTimeBetweenEvictionRunsMillis()); return genericObjectPoolConfig; } @Bean @RefreshScope public LettuceClientConfiguration lettuceClientConfiguration (RedisProperties properties, GenericObjectPoolConfig genericObject PoolConfig) { //Lettuce 客户端配置 LettucePoolingClientConfiguration build = LettucePooling ClientConfiguration.builder() .commandTimeout(Duration.ofMillis(properties.get Promotion().getCommandTimeout())) .shutdownTimeout(Duration.ZERO) .poolConfig(genericObjectPoolConfig) .build(); return build; } @Bean @RefreshScope public LettuceConnectionFactory lettuceConnectionFactory (RedisProperties properties, LettuceClientConfiguration lettuceClientConfiguration) { //Redis 配置 RedisStandaloneConfiguration redisConfiguration = new RedisStandaloneConfiguration(properties.getPromotion().getHost(), properties.getPromotion().getPort()); redisConfiguration.setPassword(properties.getPromotion(). getPassword()); //Lettuce 连接工厂 LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisConfiguration, lettuceClientConfiguration); return lettuceConnectionFactory; } @Bean(name = "redisTemplate") public ReactiveStringRedisTemplate reactiveStringRedisTemplate (LettuceConnectionFactory lettuceConnectionFactory) { //StringRedisTemplate 声明 return new ReactiveStringRedisTemplate(lettuceConnection Factory, RedisSerializationContext.string()); } }
(7)新建 Sentinel 切面配置,代码如下:
package com.example.promotion.config; import com.alibaba.csp.sentinel.annotation.aspectj.SentinelResource Aspect; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class SentinelConfig { @Bean public SentinelResourceAspect sentinelResourceAspect() { //Sentinel 切面声明 return new SentinelResourceAspect(); } }
(8)新建 Model 层对象 PromotionEntity,代码如下:
package com.example.microservice.promotion.model; import java.io.Serializable; import lombok.Data; @Data public class PromotionEntity implements Serializable { private static final long serialVersionUID = 1L; //促销活动 id private Integer id; //促销活动名称 private String name; //促销活动开始时间 private Integer beginTime; //促销活动结束时间 private Integer endTime; //活动奖品 private String prize; }
(9)接口返回通用状态码及 Redis 主键操作 key 声明。新增 Constant.class 文件,代码如下:
package com.example.promotion.constants; public class Constant { //接口成功返回状态码 public static final String SUCCESS_CODE = "S00000"; //接口失败返回状态码 public static final String ERROR_CODE = "F00001"; //接口成功返回信息 public static final String SUCCESS_MSG = "success"; //促销活动 Redis 存储结构 key public static final String REDIS_PROMOTION_KEY = "promotion:{0}"; //活动奖品领取记录 public static final String REDIS_PRIZE_KEY = "promotion:{0}:{1}"; }
(10)PromotionPushController 接口代码如下:
package com.example.microservice.promotion.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.annotation.SentinelResource; import com.example.microservice.promotion.constants.Constant; import com.example.microservice.promotion.service.BlockHandlerService; import com.example.microservice.promotion.service.FallBackService; import com.example.microservice.promotion.service.PromotionPushService; import cn.hutool.json.JSONObject; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; @Slf4j @RestController @RequestMapping("/api") public class PromotionPushController { @Autowired private PromotionPushService promotionPushService; //促销活动投放接口,/api/pushPromotion?id=xx @GetMapping("pushPromotion") @ResponseBody @SentinelResource(value = "pushPromotion", entryType = EntryType.IN, blockHandler = "promotionPushBlockHandle", blockHandlerClass = {BlockHandlerService.class}, defaultFallback = "fallback", fallback Class = {FallBackService.class}) public Mono<ResponseEntity<JSONObject>> pushPromotion(Integer id) { Mono<ResponseEntity<JSONObject>> mono = Mono.empty(); try { //调用促销活动投放服务方法 return promotionPushService.pushPromotion(id); } catch (Exception e) { //记录错误日志 log.error("push promotion error!"); JSONObject jsonObject = new JSONObject(); jsonObject.put("code", Constant.ERROR_CODE); jsonObject.put("msg", "push promotion error!"); return Mono.just(ResponseEntity.ok(jsonObject)); } } //领取奖品接口,/api/ getPrize?id=xx&device=xx @GetMapping("getPrize") @ResponseBody @SentinelResource(value = "getPrize", entryType = EntryType.IN, blockHandler = "prizeBlockHandle", blockHandlerClass = {BlockHandler Service.class}, defaultFallback = "fallback", fallbackClass = {FallBackService.class}) public Mono<ResponseEntity<JSONObject>> getPrize(Integer id, String device) { try { //调用领取奖品服务方法 return promotionPushService.getPrize(id, device); } catch (Exception e) { //记录错误日志 log.error("get prize error!"); JSONObject jsonObject = new JSONObject(); jsonObject.put("code", Constant.ERROR_CODE); jsonObject.put("msg", "get prize error!"); return Mono.just(ResponseEntity.ok(jsonObject)); } } }
(11)PromotionPushService 代码如下:
package com.example.microservice.promotion.service; import java.text.MessageFormat; import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.ReactiveHashOperations; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import com.example.microservice.promotion.constants.Constant; import com.example.microservice.promotion.model.PromotionEntity; import cn.hutool.json.JSONObject; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Service @Slf4j public class PromotionPushService { @Autowired private ReactiveStringRedisTemplate reactiveStringRedisTemplate; //促销活动投放方法 public Mono<ResponseEntity<JSONObject>> pushPromotion(Integer id) { //组装促销活动 Redis key String key = MessageFormat.format(Constant.REDIS_PROMOTION_ KEY, String.valueOf(id)); //采用 ReactiveStringRedisTemplate 查询促销活动信息 ReactiveHashOperations<String, String, String> reactiveHash Operations = reactiveStringRedisTemplate.opsForHash(); Flux<Entry<String, String>> flux = reactiveHashOperations. entries(key); Map<String, String> map = new HashMap<>(); flux.subscribe(entry -> { String k = entry.getKey(); String value = entry.getValue(); map.put(k, value); }); flux.blockLast(Duration.ofMillis(1000)); //先查询,最多阻塞 1s if (MapUtils.isNotEmpty(map)) { String name = (String) map.get("name"); String prize = (String) map.get("prize"); Integer beginTime = Integer.valueOf((String) map.get ("beginTime")); Integer endTime = Integer.valueOf((String) map.get ("endTime")); Integer currentTime = (int) (System.currentTimeMillis()/ 1000); //判断促销活动投放条件,如果在促销活动时间内,则投放 if (currentTime >= beginTime && currentTime <= endTime) { //组装 PromotionEntity 对象 PromotionEntity promotionEntity = new PromotionEntity(); promotionEntity.setBeginTime(beginTime); promotionEntity.setEndTime(endTime); promotionEntity.setId(id); promotionEntity.setName(name); promotionEntity.setPrize(prize); log.info("push promotion success"); JSONObject jsonObject = new JSONObject(promotionEntity); return Mono.just(ResponseEntity.ok(jsonObject)); } } JSONObject jsonObject = new JSONObject(); jsonObject.put("code", Constant.ERROR_CODE); jsonObject.put("msg", "push promotion error!"); return Mono.just(ResponseEntity.ok(jsonObject)); } //领取奖品的方法 public Mono<ResponseEntity<JSONObject>> getPrize(Integer id, String device) { //组装领取奖品记录 Redis key String key = MessageFormat.format(Constant.REDIS_PRIZE_KEY, String.valueOf(id), device); //查询领取奖品记录 Mono<String> mono = reactiveStringRedisTemplate.opsForValue(). get(key); String value = mono.block(Duration.ofMillis(1000)); //领取奖品判断条件,如果领取过,则不再发放 if (StringUtils.isEmpty(value)) { String promotionKey = MessageFormat.format(Constant.REDIS_ PROMOTION_KEY, String.valueOf(id)); ReactiveHashOperations<String, String, String> reactive HashOperations = reactiveStringRedisTemplate.opsForHash(); Flux<Entry<String, String>> flux = reactiveHashOperations. entries(promotionKey); Map<String, String> map = new HashMap<>(); flux.subscribe(entry -> { String k = entry.getKey(); String v = entry.getValue(); if (StringUtils.equals("prize", k)) { map.put(k, v); } }); //先查询,最多阻塞 1s flux.blockLast(Duration.ofMillis(1000)); if (MapUtils.isNotEmpty(map)) { String prize = map.get("prize"); log.info("get prize success"); JSONObject jsonObject = new JSONObject(); jsonObject.put("奖品", prize); return Mono.just(ResponseEntity.ok(jsonObject)); } } JSONObject jsonObject = new JSONObject(); jsonObject.put("code", Constant.ERROR_CODE); jsonObject.put("msg", "prize is exist!"); return Mono.just(ResponseEntity.ok(jsonObject)); } }
(12)限流代码如下:
package com.example.microservice.promotion.service; import org.springframework.http.ResponseEntity; import com.example.microservice.promotion.constants.Constant; import cn.hutool.json.JSONObject; import reactor.core.publisher.Mono; //限流通用类 public final class BlockHandlerService { public static Mono<ResponseEntity<JSONObject>> promotionPush BlockHandle(Integer id) { JSONObject jsonObject = new JSONObject(); jsonObject.put("code", Constant.ERROR_CODE); jsonObject.put("msg", "pushPromotion blcok!"); return Mono.just(ResponseEntity.ok(jsonObject)); } public static Mono<ResponseEntity<JSONObject>> prizeBlockHandle (Integer id, String device) { JSONObject jsonObject = new JSONObject(); jsonObject.put("code", Constant.ERROR_CODE); jsonObject.put("msg", "get prize blcok!"); return Mono.just(ResponseEntity.ok(jsonObject)); } }
(13)降级代码如下:
package com.example.microservice.promotion.service; import org.springframework.http.ResponseEntity; import com.example.microservice.promotion.constants.Constant; import cn.hutool.json.JSONObject; import reactor.core.publisher.Mono; //降级通用类 public final class FallBackService { public static Mono<ResponseEntity<JSONObject>> defaultFallBack (Throwable ex){ JSONObject jsonObject = new JSONObject(); jsonObject.put("code", Constant.ERROR_CODE); jsonObject.put("msg", "pushPromotion fallback!"); return Mono.just(ResponseEntity.ok(jsonObject)); } }
(14)MicroservicePromotionApplication 代码如下:
package com.example.microservice.promotion; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.data.redis.RedisAuto Configuration; import org.springframework.boot.autoconfigure.data.redis.Redis ReactiveAutoConfiguration; import org.springframework.boot.autoconfigure.data.redis.Redis RepositoriesAutoConfiguration; import org.springframework.cloud.client.discovery.EnableDiscovery Client; import org.springframework.context.annotation.EnableAspectJAutoProxy; @SpringBootApplication(exclude = {RedisAutoConfiguration.class, Redis RepositoriesAutoConfiguration.class, RedisReactiveAutoConfiguration. class}) @EnableAspectJAutoProxy //开启切面 @EnableDiscoveryClient //开启服务发现 public class MicroservicePromotionApplication { public static void main(String[] args) { SpringApplication.run(MicroservicePromotionApplication.class, args); } }
此时,启动 MicroservicePromotionApplication 主类即可访问促销活动接口 http://localhost:8081/api/pushPromotion?id=3,返回结果如下:
{ id: 3, name: "会员促销活动", beginTime: 1614822680, endTime: 1617176808, prize: "3 天免费会员" }
访问领取奖品接口 http://localhost/api/getPrize?id=3&device= 3af57d0545766ec9 40d2c32a6567cc06aed,返回结果如下:
{ 奖品: "3 天免费会员" }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论