SpringWebFlux

SpringWebFlux使用

为什么需要WebFlux

传统阻塞式模型的瓶颈(Spring MVC Servlet)

​ SpringMVC + Tomcat NIO,NIO 模型主要是为了处理网络连接的接入,在将请求交给 Servlet 容器后,业务逻辑仍然使用线程池处理。

​ SpringMVC + Jetty & Undertow:从架构层面就深度使用 NIO,在处理连接和请求分发上更高效,尤其是在大量并发长连接(如 WebSocket)的场景下表现优异。但是本质上是分作2个线程池,一个线程池负责管理外部请求,一个线程池负责承接前一个线程池的任务,这个线程池内遇到阻塞IO时,依旧是阻塞等待的

​ 因此,传统MVC模式存在以下瓶颈:

同步阻塞I/O

​ 工作线程遇到HTTP/数据库查询时,同步阻塞等待

并发能力受限

​ 线程池的大小有限,当并发请求数超过线程池大小时,新请求必须等待,导致延迟增加和吞吐量下降。这在高并发场景下是致命的。

解决方式:异步+非阻塞

  • 目标: 用更少的资源(尤其是线程)处理更高的并发。
  • 如何实现: 当等待 I/O 操作(如数据库查询)完成时,不阻塞当前线程,而是立即释放它去服务其他请求。当 I/O 操作完成后,再通过回调机制通知你来处理结果。
  • 这就是 WebFlux 的核心价值: 它提供了一套完整的、非阻塞的、支持背压的 Web 栈,让我们能够轻松地编写异步和响应式应用。

什么是WebFlux

​ Spring WebFlux 是 Spring Framework 5.0 引入的一个全新的、响应式的 Web 框架。它完全支持响应式流规范,可以在 Netty, Undertow 和 Servlet 3.1+ 容器等服务器上运行。

响应式编程的核心特性(Reactive Program)

异步非阻塞 (Asynchronous & Non-blocking)

  • 传统阻塞:线程等待操作完成,期间不能做其他事情
  • 响应式非阻塞:发起操作后立即返回,操作完成后通过回调通知
1
2
3
4
5
6
7
// 传统阻塞方式
User user = userRepository.findById(id); // 线程在此等待
System.out.println(user.getName());

// 响应式非阻塞方式
Mono<User> userMono = userRepository.findById(id);
userMono.subscribe(user -> System.out.println(user.getName())); // 回调方式

数据流 (Data Streams)

​ 将数据视为流,而不是孤立的对象。流是一系列按时间顺序排列的事件序列,可以包含0个或者多个事件。这些事件可以是数据或者各种信号(完成、订阅、异常、完成信号等)

1
2
3
4
5
6
// 处理用户流,而不是单个用户
Flux<User> userStream = userRepository.findAll();
userStream
.filter(user -> user.getAge() > 18)
.map(User::getName)
.subscribe(name -> System.out.println("Adult: " + name));

背压机制 (Backpressure)

数据处理的速度由消费者决定(即便生产者生产速度大于消费者消费速度),防止生产者压垮消费者

1
2
3
4
5
6
7
8
9
10
Flux.range(1, 1000000)        // 生产者:100万个数据
.onBackpressureBuffer(100) // 缓冲区:最多缓存100个
.subscribe(
data -> {
// 慢速处理
Thread.sleep(10);
System.out.println(data);
},
error -> System.err.println("Error: " + error)
);

函数式和声明式(Declarative + Functional)

1
2
3
4
5
6
7
8
public Flux<String> processUsers(Flux<User> users) {
return users
.filter(user -> user.getStatus() == Status.ACTIVE)
.take(100) // 只取前100个
.delayElements(Duration.ofMillis(10)) // 每个元素延迟10ms
.map(user -> transformUser(user))
.onErrorResume(error -> fallbackUsers());
}

消息/事件驱动(Event Drive)

​ 通过消息驱动机制实现异步等效果,当目标事件(异常、结束等)发生时,回调特定钩子。

1
2
3
4
5
6
7
8
userRepository.findById(userId)
.map(this::processUser)
.onErrorResume(throwable -> {
// 错误恢复:返回默认用户
return Mono.just(getDefaultUser());
})
//.onErrorReturn(User.EMPTY) // 或者直接返回默认值
.doOnError(error -> log.error("Processing failed", error)); // 副作用记录

总结

​ 在Spring WebFlux中,这些特点使得应用程序能够以更少的资源(尤其是线程)处理更高的并发连接

​ 特别适合I/O密集型应用(如微服务网关、实时通信等)。然而,对于CPU密集型应用,响应式编程的优势并不明显,因为CPU密集型任务会占用线程进行计算,而响应式编程的优势在于I/O等待期间释放线程。

从而能够用少量固定数量的线程处理高并发请求。

WebFlux的核心概念和编程模型

1、响应式流 (Reactive Streams)

Reactive Streams 是一个规范,定义了异步组件与背压之间的交互。它的核心接口只有四个:

Publisher 发布者

Subscriber 订阅者

Subscription 订阅关系,记录发布者和订阅者的关系

Processor 处理器

Spring WebFlux 基于 Reactor 库实现,而 Reactor 实现了响应式流规范

2、Reactor 核心类型:Mono 和 Flux

Flux: 代表一个包含 0 到 N 个元素的异步序列。类似于 List,但是非阻塞的。例如:从数据库返回多个对象、从 SSE 流接收多个事件。
Mono: 代表一个包含 0 或 1 个元素的异步序列。例如:根据ID查询一个对象、执行保存或删除操作(通常只返回成功/失败信息)。

3、两种编程模型

注解模型 (Annotation-based Model)

​ 与 Spring MVC 非常相似,易于上手。关键区别:控制器方法的返回值是 MonoFlux,而不是具体的对象或 List

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@RestController
@RequestMapping("/users")
public class UserController {

@Autowired
private UserRepository userRepository; // 假设这是一个响应式的 Repository

// 返回多个用户 - Flux
@GetMapping
public Flux<User> getAllUsers() {
return userRepository.findAll();
}

// 返回一个用户 - Mono
@GetMapping("/{id}")
public Mono<User> getUserById(@PathVariable String id) {
return userRepository.findById(id);
}

// 保存用户 - 接收一个Mono(请求体),返回一个Mono(保存后的用户)
@PostMapping
public Mono<User> createUser(@RequestBody Mono<User> userMono) {
return userMono.flatMap(userRepository::save);
}
}

函数式端点 (Functional Endpoints)

​ 一种更轻量级、函数式的编程模型。它通过将请求路由到 HandlerFunction 来处理。

  • 核心组件:
    • RouterFunction: 类似于 @RequestMapping,定义 URL 路由规则。
    • HandlerFunction: 类似于 @Controller 中的方法,包含实际的业务逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Configuration
public class UserRouter {

@Bean
public RouterFunction<ServerResponse> route(UserHandler userHandler) {
return RouterFunctions.route()
.GET("/fn/users", userHandler::getAllUsers)
.GET("/fn/users/{id}", userHandler::getUserById)
.POST("/fn/users", userHandler::createUser)
.build();
}
}

@Component
public class UserHandler {

@Autowired
private UserRepository userRepository;

public Mono<ServerResponse> getAllUsers(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(userRepository.findAll(), User.class);
}

public Mono<ServerResponse> getUserById(ServerRequest request) {
String id = request.pathVariable("id");
return userRepository.findById(id)
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build()); // 处理找不到的情况
}

public Mono<ServerResponse> createUser(ServerRequest request) {
Mono<User> userMono = request.bodyToMono(User.class);
return userMono.flatMap(userRepository::save)
.flatMap(savedUser -> ServerResponse.ok().bodyValue(savedUser));
}
}

4、背压

​ 见上文。在 WebFlux 中: 通过 FluxMono 内部实现,当使用如 application/stream+json 媒体类型时,背压机制会自动生效,具体背压策略看服务端和客户端如何实现。

1
2
3
4
5
6
7
8
9
@GetMapping(value = "/controlled-data", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> getControlledData() {
return Flux.interval(Duration.ofMillis(1))
.map(i -> "Data item " + i + " at " + Instant.now())
.onBackpressureBuffer(100); // 关键:设置缓冲区大小
// 或者使用其他背压策略:
// .onBackpressureDrop() // 丢弃超出的数据
// .onBackpressureLatest() // 只保留最新的数据
}

5、服务器推送 (Server-Sent Events, SSE)

WebFlux 非常适合实现服务器向客户端单向推送数据,客户端通过 EventSource API 连接到此端点,服务端会每隔一秒单向向客户端发送一个用户数据。

1
2
3
4
5
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return userService.getAllUsers()
.delayElements(Duration.ofSeconds(1)); // 每秒发送一个用户
}

6、响应式数据库驱动

 **WebFlux 的真正威力需要与非阻塞的数据库驱动结合**,如果只定义了响应式接口,而底层的数据库驱动依旧是阻塞的,那只是伪响应式,本质上也是一个请求,阻塞占用一个工作线程。
响应式数据库驱动支持的数据库: MongoDB (ReactiveMongoTemplate), Cassandra, Redis (ReactiveRedisTemplate), R2DBC (用于关系型数据库,如 PostgreSQL, MySQL)。

7、响应式的函数或操作

1、过滤与转换操作符

filter - 过滤
1
2
3
Flux.range(1, 10)
.filter(i -> i % 2 == 0) // 只保留偶数
.subscribe(System.out::println); // 2,4,6,8,10
map - 一对一转换
1
2
3
Flux.just("apple", "banana")
.map(String::toUpperCase) // 每个元素单独转换
.subscribe(System.out::println); // APPLE, BANANA

2、扁平化操作符

flatMap - 异步扁平化
操作 输入 → 输出 核心作用 适用场景
map T → R 一对一转换 简单数据类型转换
flatMap T → Stream 一对多转换 + 扁平化 嵌套集合、Optional处理、数据展开
1
2
3
4
5
6
7
8
9
Flux.just(1, 2, 3)
.flatMap(i ->
Mono.just(i * 10).delayElement(Duration.ofMillis(100))
) // 1→10, 2→20, 3→30(可能乱序)
.subscribe(System.out::println);
//过程解析
Flux<Integer>
map映射 Integer -> Mono<Integer>
flat聚合 Mono<Integer> -> Flux<Integer>
concatMap - 顺序扁平化
1
2
3
4
5
Flux.just(1, 2, 3)
.concatMap(i ->
Mono.just(i * 10).delayElement(Duration.ofMillis(100))
) // 保证顺序:1→10, 2→20, 3→30
.subscribe(System.out::println);
flatMapMany - Mono转Flux
1
2
3
Mono.just(Arrays.asList(1, 2, 3))
.flatMapMany(Flux::fromIterable) // Mono<List> → Flux<Integer>
.subscribe(System.out::println); // 1,2,3

3. 组合操作符

concat - 顺序连接
1
2
3
4
5
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);

Flux.concat(flux1, flux2) // 先消费flux1,再flux2
.subscribe(System.out::println); // 1,2,3,4,5,6
concatWith - 实例方法
1
2
3
Flux.just(1, 2, 3)
.concatWith(Flux.just(4, 5, 6))
.subscribe(System.out::println); // 1,2,3,4,5,6
merge - 并行合并
1
2
3
4
5
Flux<Integer> flux1 = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(100));
Flux<Integer> flux2 = Flux.just(4, 5, 6).delayElements(Duration.ofMillis(50));

Flux.merge(flux1, flux2) // 按元素到达时间合并,可能乱序
.subscribe(System.out::println); // 4,1,5,2,6,3
mergeWith - 实例方法
1
2
3
Flux.just(1, 2, 3)
.mergeWith(Flux.just(4, 5, 6))
.subscribe(System.out::println); // 1,2,3,4,5,6
mergeSequential - 按订阅顺序合并
1
2
3
4
5
6
7
8
9
10
11
Flux<Integer> flux1 = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(100));
Flux<Integer> flux2 = Flux.just(4, 5, 6).delayElements(Duration.ofMillis(50));

Flux.mergeSequential(flux1, flux2) // 先输出flux1全部,再flux2全部
.subscribe(System.out::println); // 1,2,3,4,5,6
// mergeSequential: 1 (100ms后)
// mergeSequential: 2 (200ms后)
// mergeSequential: 3 (300ms后)
// mergeSequential: 4 (300ms后) ← B1早已到达但被缓存,现在才输出
// mergeSequential: 5 (300ms后)
// mergeSequential: 6 (300ms后)
zip - 一对一组合
1
2
3
4
5
6
Flux<Integer> numbers = Flux.just(1, 2, 3);
Flux<String> letters = Flux.just("A", "B", "C");

Flux.zip(numbers, letters)
.map(tuple -> tuple.getT1() + ":" + tuple.getT2())
.subscribe(System.out::println); // 1:A, 2:B, 3:C
zipWith - 实例方法
1
2
3
4
Flux.just(1, 2, 3)
.zipWith(Flux.just("A", "B", "C"))
.map(tuple -> tuple.getT1() + ":" + tuple.getT2())
.subscribe(System.out::println); // 1:A, 2:B, 3:C

4. 空值处理

defaultIfEmpty - 默认值
1
2
3
Flux.empty()
.defaultIfEmpty("No Data") // 如果流为空,发射默认值
.subscribe(System.out::println); // "No Data"
switchIfEmpty - 切换备用流
1
2
3
Flux.empty()
.switchIfEmpty(Flux.just("Fallback", "Data")) // 切换到备用流
.subscribe(System.out::println); // "Fallback", "Data"
5. transform - 操作符组合
1
2
3
4
5
6
7
Function<Flux<String>, Flux<String>> filterAndTransform = 
flux -> flux.filter(s -> s.length() > 3)
.map(String::toUpperCase);

Flux.just("one", "two", "three", "four")
.transform(filterAndTransform) // 应用组合操作
.subscribe(System.out::println); // "THREE", "FOUR"

6. 错误处理

onErrorReturn - 错误时返回默认值
1
2
3
Flux.error(new RuntimeException("Boom!"))
.onErrorReturn("Recovered")
.subscribe(System.out::println); // "Recovered"
onErrorResume - 错误时切换流
1
2
3
Flux.error(new RuntimeException("Boom!"))
.onErrorResume(error -> Flux.just("Fallback", "Data"))
.subscribe(System.out::println); // "Fallback", "Data"
onErrorMap - 错误转换
1
2
3
4
5
6
Flux.error(new IOException("Network error"))
.onErrorMap(original -> new BusinessException("Service unavailable", original))
.subscribe(
System.out::println,
error -> System.out.println(error.getClass()) // BusinessException
);
doOnError - 错误时作用
1
2
3
Flux.error(new RuntimeException("Boom!"))
.doOnError(error -> log.error("Error occurred: {}", error.getMessage()))
.subscribe();

7、超时与重试

retry - 重试机制
1
2
3
4
5
6
Flux.error(new RuntimeException("Temporary failure"))
.retry(3) // 重试3次
.subscribe(
System.out::println,
error -> System.out.println("Final error: " + error)
);
retryWhen - 条件重试
1
2
3
Flux.error(new RuntimeException("Temporary failure"))
.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1))) // 固定延迟重试
.subscribe();

8、Sinks工具类

创建可编程数据源
1
2
3
4
5
6
7
8
9
10
// 单值Sink
Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();
sinkOne.emitValue("Hello", Sinks.EmitFailureHandler.FAIL_FAST);

// 多值Sink
Sinks.Many<String> sinkMany = Sinks.many().unicast().onBackpressureBuffer();
Flux<String> flux = sinkMany.asFlux();
sinkMany.tryEmitNext("Message 1");
sinkMany.tryEmitNext("Message 2");
单播 (Unicast)
1
2
Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
// 只能有一个订阅者,多个订阅会报错
多播 (Multicast)
1
2
Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
// 多个订阅者共享后续数据(不包含历史数据)
重放 (Replay)
1
2
Sinks.Many<Integer> replaySink = Sinks.many().replay().all();
// 新订阅者会收到所有历史数据

9、阻塞式API处理

阻塞操作包装
1
2
3
4
5
6
Mono.fromCallable(() -> {
// 阻塞的JDBC调用
return jdbcTemplate.queryForList("SELECT * FROM users");
})
.subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
.subscribe();

10、block - 同步阻塞获取

1
2
3
4
// 测试或边缘场景使用,避免在生产环境主线程使用
List<User> users = userRepository.findAll()
.collectList()
.block(Duration.ofSeconds(5)); // 同步等待,最多5秒

11、Context-API - 响应式中的ThreadLocal

​ 在响应式编程中,尤其是使用 Reactor(如 Spring WebFlux)时,传统的 ThreadLocal 机制会失效,因为响应式编程是非阻塞的,并且操作可能在不同的线程上执行。这意味着,一个操作可能在一个线程上开始,在另一个线程上继续,因此无法通过 ThreadLocal 来传递上下文。

​ Reactor 提供了 Context 机制来替代 ThreadLocal,用于在响应式流中传递上下文数据。Context 是绑定到订阅(Subscription)上的,而不是线程。它随着数据流在操作符之间传递,因此可以在整个流处理过程中访问和修改上下文。

1
2
3
4
5
6
7
8
9
10
11
12
//Context 从下往上传播(从订阅点向上游传播),如果存在覆盖时,所有读到的都以最上层最后一次写的为准
Flux.range(1, 3)
.flatMap(i ->
Mono.deferContextual(ctx ->
Mono.just("Value: " + i + ", Context: " + ctx.get("traceId"))
)
)
.contextWrite(Context.of("traceId", "12345")) // 写入上下文
.subscribe(System.out::println);
// 输出: Value: 1, Context: 12345
// Value: 2, Context: 12345
// Value: 3, Context: 12345

12、ParallelFlux - 并行处理

1
2
3
4
5
6
7
Flux.range(1, 100)
.parallel(4) // 分成4个并行轨道
.runOn(Schedulers.parallel()) // 在并行调度器执行
.map(i -> i * 2) // 并行处理
.sequential() // 转换回普通Flux
.subscribe(System.out::println);
实际业务场景组合