SpringWebFlux使用
为什么需要WebFlux
传统阻塞式模型的瓶颈(Spring MVC Servlet)
SpringMVC + Tomcat NIO,NIO 模型主要是为了处理网络连接的接入,在将请求交给 Servlet 容器后,业务逻辑仍然使用线程池处理。
SpringMVC + Jetty & Undertow:从架构层面就深度使用 NIO,在处理连接和请求分发上更高效,尤其是在大量并发长连接(如 WebSocket)的场景下表现优异。但是本质上是分作2个线程池,一个线程池负责管理外部请求,一个线程池负责承接前一个线程池的任务,这个线程池内遇到阻塞IO时,依旧是阻塞等待的。
因此,传统MVC模式存在以下瓶颈:
同步阻塞I/O
工作线程遇到HTTP/数据库查询时,同步阻塞等待
并发能力受限
线程池的大小有限,当并发请求数超过线程池大小时,新请求必须等待,导致延迟增加和吞吐量下降。这在高并发场景下是致命的。
解决方式:异步+非阻塞
- 目标: 用更少的资源(尤其是线程)处理更高的并发。
- 如何实现: 当等待 I/O 操作(如数据库查询)完成时,不阻塞当前线程,而是立即释放它去服务其他请求。当 I/O 操作完成后,再通过回调机制通知你来处理结果。
- 这就是 WebFlux 的核心价值: 它提供了一套完整的、非阻塞的、支持背压的 Web 栈,让我们能够轻松地编写异步和响应式应用。
什么是WebFlux
Spring WebFlux 是 Spring Framework 5.0 引入的一个全新的、响应式的 Web 框架。它完全支持响应式流规范,可以在 Netty, Undertow 和 Servlet 3.1+ 容器等服务器上运行。
响应式编程的核心特性(Reactive Program)
异步非阻塞 (Asynchronous & Non-blocking)
- 传统阻塞:线程等待操作完成,期间不能做其他事情
- 响应式非阻塞:发起操作后立即返回,操作完成后通过回调通知
1 2 3 4 5 6 7
| User user = userRepository.findById(id); System.out.println(user.getName());
Mono<User> userMono = userRepository.findById(id); userMono.subscribe(user -> System.out.println(user.getName()));
|
数据流 (Data Streams)
将数据视为流,而不是孤立的对象。流是一系列按时间顺序排列的事件序列,可以包含0个或者多个事件。这些事件可以是数据或者各种信号(完成、订阅、异常、完成信号等)
1 2 3 4 5 6
| Flux<User> userStream = userRepository.findAll(); userStream .filter(user -> user.getAge() > 18) .map(User::getName) .subscribe(name -> System.out.println("Adult: " + name));
|
背压机制 (Backpressure)
数据处理的速度由消费者决定(即便生产者生产速度大于消费者消费速度),防止生产者压垮消费者
1 2 3 4 5 6 7 8 9 10
| Flux.range(1, 1000000) .onBackpressureBuffer(100) .subscribe( data -> { Thread.sleep(10); System.out.println(data); }, error -> System.err.println("Error: " + error) );
|
函数式和声明式(Declarative + Functional)
1 2 3 4 5 6 7 8
| public Flux<String> processUsers(Flux<User> users) { return users .filter(user -> user.getStatus() == Status.ACTIVE) .take(100) .delayElements(Duration.ofMillis(10)) .map(user -> transformUser(user)) .onErrorResume(error -> fallbackUsers()); }
|
消息/事件驱动(Event Drive)
通过消息驱动机制实现异步等效果,当目标事件(异常、结束等)发生时,回调特定钩子。
1 2 3 4 5 6 7 8
| userRepository.findById(userId) .map(this::processUser) .onErrorResume(throwable -> { return Mono.just(getDefaultUser()); }) .doOnError(error -> log.error("Processing failed", error));
|
总结
在Spring WebFlux中,这些特点使得应用程序能够以更少的资源(尤其是线程)处理更高的并发连接。
特别适合I/O密集型应用(如微服务网关、实时通信等)。然而,对于CPU密集型应用,响应式编程的优势并不明显,因为CPU密集型任务会占用线程进行计算,而响应式编程的优势在于I/O等待期间释放线程。
从而能够用少量固定数量的线程处理高并发请求。
WebFlux的核心概念和编程模型
1、响应式流 (Reactive Streams)
Reactive Streams 是一个规范,定义了异步组件与背压之间的交互。它的核心接口只有四个:
Publisher 发布者
Subscriber 订阅者
Subscription 订阅关系,记录发布者和订阅者的关系
Processor 处理器
Spring WebFlux 基于 Reactor 库实现,而 Reactor 实现了响应式流规范
2、Reactor 核心类型:Mono 和 Flux
Flux: 代表一个包含 0 到 N 个元素的异步序列。类似于 List,但是非阻塞的。例如:从数据库返回多个对象、从 SSE 流接收多个事件。
Mono: 代表一个包含 0 或 1 个元素的异步序列。例如:根据ID查询一个对象、执行保存或删除操作(通常只返回成功/失败信息)。
3、两种编程模型
注解模型 (Annotation-based Model)
与 Spring MVC 非常相似,易于上手。关键区别:控制器方法的返回值是 Mono 或 Flux,而不是具体的对象或 List。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @RestController @RequestMapping("/users") public class UserController {
@Autowired private UserRepository userRepository;
@GetMapping public Flux<User> getAllUsers() { return userRepository.findAll(); }
@GetMapping("/{id}") public Mono<User> getUserById(@PathVariable String id) { return userRepository.findById(id); }
@PostMapping public Mono<User> createUser(@RequestBody Mono<User> userMono) { return userMono.flatMap(userRepository::save); } }
|
函数式端点 (Functional Endpoints)
一种更轻量级、函数式的编程模型。它通过将请求路由到 HandlerFunction 来处理。
- 核心组件:
-
RouterFunction: 类似于 @RequestMapping,定义 URL 路由规则。
-
HandlerFunction: 类似于 @Controller 中的方法,包含实际的业务逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Configuration public class UserRouter {
@Bean public RouterFunction<ServerResponse> route(UserHandler userHandler) { return RouterFunctions.route() .GET("/fn/users", userHandler::getAllUsers) .GET("/fn/users/{id}", userHandler::getUserById) .POST("/fn/users", userHandler::createUser) .build(); } }
@Component public class UserHandler {
@Autowired private UserRepository userRepository;
public Mono<ServerResponse> getAllUsers(ServerRequest request) { return ServerResponse.ok() .contentType(MediaType.APPLICATION_JSON) .body(userRepository.findAll(), User.class); }
public Mono<ServerResponse> getUserById(ServerRequest request) { String id = request.pathVariable("id"); return userRepository.findById(id) .flatMap(user -> ServerResponse.ok().bodyValue(user)) .switchIfEmpty(ServerResponse.notFound().build()); }
public Mono<ServerResponse> createUser(ServerRequest request) { Mono<User> userMono = request.bodyToMono(User.class); return userMono.flatMap(userRepository::save) .flatMap(savedUser -> ServerResponse.ok().bodyValue(savedUser)); } }
|
4、背压
见上文。在 WebFlux 中: 通过 Flux 和 Mono 内部实现,当使用如 application/stream+json 媒体类型时,背压机制会自动生效,具体背压策略看服务端和客户端如何实现。
1 2 3 4 5 6 7 8 9
| @GetMapping(value = "/controlled-data", produces = MediaType.APPLICATION_STREAM_JSON_VALUE) public Flux<String> getControlledData() { return Flux.interval(Duration.ofMillis(1)) .map(i -> "Data item " + i + " at " + Instant.now()) .onBackpressureBuffer(100); }
|
5、服务器推送 (Server-Sent Events, SSE)
WebFlux 非常适合实现服务器向客户端单向推送数据,客户端通过 EventSource API 连接到此端点,服务端会每隔一秒单向向客户端发送一个用户数据。
1 2 3 4 5
| @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<User> streamUsers() { return userService.getAllUsers() .delayElements(Duration.ofSeconds(1)); }
|
6、响应式数据库驱动
**WebFlux 的真正威力需要与非阻塞的数据库驱动结合**,如果只定义了响应式接口,而底层的数据库驱动依旧是阻塞的,那只是伪响应式,本质上也是一个请求,阻塞占用一个工作线程。
响应式数据库驱动支持的数据库: MongoDB (ReactiveMongoTemplate), Cassandra, Redis (ReactiveRedisTemplate), R2DBC (用于关系型数据库,如 PostgreSQL, MySQL)。
7、响应式的函数或操作
1、过滤与转换操作符
filter - 过滤
1 2 3
| Flux.range(1, 10) .filter(i -> i % 2 == 0) .subscribe(System.out::println);
|
map - 一对一转换
1 2 3
| Flux.just("apple", "banana") .map(String::toUpperCase) .subscribe(System.out::println);
|
2、扁平化操作符
flatMap - 异步扁平化
| 操作 |
输入 → 输出 |
核心作用 |
适用场景 |
| map |
T → R |
一对一转换 |
简单数据类型转换 |
| flatMap |
T → Stream |
一对多转换 + 扁平化 |
嵌套集合、Optional处理、数据展开 |
1 2 3 4 5 6 7 8 9
| Flux.just(1, 2, 3) .flatMap(i -> Mono.just(i * 10).delayElement(Duration.ofMillis(100)) ) .subscribe(System.out::println);
Flux<Integer> map映射 Integer -> Mono<Integer> flat聚合 Mono<Integer> -> Flux<Integer>
|
concatMap - 顺序扁平化
1 2 3 4 5
| Flux.just(1, 2, 3) .concatMap(i -> Mono.just(i * 10).delayElement(Duration.ofMillis(100)) ) .subscribe(System.out::println);
|
flatMapMany - Mono转Flux
1 2 3
| Mono.just(Arrays.asList(1, 2, 3)) .flatMapMany(Flux::fromIterable) .subscribe(System.out::println);
|
3. 组合操作符
concat - 顺序连接
1 2 3 4 5
| Flux<Integer> flux1 = Flux.just(1, 2, 3); Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux.concat(flux1, flux2) .subscribe(System.out::println);
|
concatWith - 实例方法
1 2 3
| Flux.just(1, 2, 3) .concatWith(Flux.just(4, 5, 6)) .subscribe(System.out::println);
|
merge - 并行合并
1 2 3 4 5
| Flux<Integer> flux1 = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(100)); Flux<Integer> flux2 = Flux.just(4, 5, 6).delayElements(Duration.ofMillis(50));
Flux.merge(flux1, flux2) .subscribe(System.out::println);
|
mergeWith - 实例方法
1 2 3
| Flux.just(1, 2, 3) .mergeWith(Flux.just(4, 5, 6)) .subscribe(System.out::println);
|
mergeSequential - 按订阅顺序合并
1 2 3 4 5 6 7 8 9 10 11
| Flux<Integer> flux1 = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(100)); Flux<Integer> flux2 = Flux.just(4, 5, 6).delayElements(Duration.ofMillis(50));
Flux.mergeSequential(flux1, flux2) .subscribe(System.out::println);
|
zip - 一对一组合
1 2 3 4 5 6
| Flux<Integer> numbers = Flux.just(1, 2, 3); Flux<String> letters = Flux.just("A", "B", "C");
Flux.zip(numbers, letters) .map(tuple -> tuple.getT1() + ":" + tuple.getT2()) .subscribe(System.out::println);
|
zipWith - 实例方法
1 2 3 4
| Flux.just(1, 2, 3) .zipWith(Flux.just("A", "B", "C")) .map(tuple -> tuple.getT1() + ":" + tuple.getT2()) .subscribe(System.out::println);
|
4. 空值处理
defaultIfEmpty - 默认值
1 2 3
| Flux.empty() .defaultIfEmpty("No Data") .subscribe(System.out::println);
|
switchIfEmpty - 切换备用流
1 2 3
| Flux.empty() .switchIfEmpty(Flux.just("Fallback", "Data")) .subscribe(System.out::println);
|
1 2 3 4 5 6 7
| Function<Flux<String>, Flux<String>> filterAndTransform = flux -> flux.filter(s -> s.length() > 3) .map(String::toUpperCase);
Flux.just("one", "two", "three", "four") .transform(filterAndTransform) .subscribe(System.out::println);
|
6. 错误处理
onErrorReturn - 错误时返回默认值
1 2 3
| Flux.error(new RuntimeException("Boom!")) .onErrorReturn("Recovered") .subscribe(System.out::println);
|
onErrorResume - 错误时切换流
1 2 3
| Flux.error(new RuntimeException("Boom!")) .onErrorResume(error -> Flux.just("Fallback", "Data")) .subscribe(System.out::println);
|
onErrorMap - 错误转换
1 2 3 4 5 6
| Flux.error(new IOException("Network error")) .onErrorMap(original -> new BusinessException("Service unavailable", original)) .subscribe( System.out::println, error -> System.out.println(error.getClass()) );
|
doOnError - 错误时作用
1 2 3
| Flux.error(new RuntimeException("Boom!")) .doOnError(error -> log.error("Error occurred: {}", error.getMessage())) .subscribe();
|
7、超时与重试
retry - 重试机制
1 2 3 4 5 6
| Flux.error(new RuntimeException("Temporary failure")) .retry(3) .subscribe( System.out::println, error -> System.out.println("Final error: " + error) );
|
retryWhen - 条件重试
1 2 3
| Flux.error(new RuntimeException("Temporary failure")) .retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1))) .subscribe();
|
8、Sinks工具类
创建可编程数据源
1 2 3 4 5 6 7 8 9 10
| Sinks.One<String> sinkOne = Sinks.one(); Mono<String> mono = sinkOne.asMono(); sinkOne.emitValue("Hello", Sinks.EmitFailureHandler.FAIL_FAST);
Sinks.Many<String> sinkMany = Sinks.many().unicast().onBackpressureBuffer(); Flux<String> flux = sinkMany.asFlux(); sinkMany.tryEmitNext("Message 1"); sinkMany.tryEmitNext("Message 2");
|
单播 (Unicast)
1 2
| Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
|
多播 (Multicast)
1 2
| Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
|
重放 (Replay)
1 2
| Sinks.Many<Integer> replaySink = Sinks.many().replay().all();
|
9、阻塞式API处理
阻塞操作包装
1 2 3 4 5 6
| Mono.fromCallable(() -> { return jdbcTemplate.queryForList("SELECT * FROM users"); }) .subscribeOn(Schedulers.boundedElastic()) .subscribe();
|
10、block - 同步阻塞获取
1 2 3 4
| List<User> users = userRepository.findAll() .collectList() .block(Duration.ofSeconds(5));
|
11、Context-API - 响应式中的ThreadLocal
在响应式编程中,尤其是使用 Reactor(如 Spring WebFlux)时,传统的 ThreadLocal 机制会失效,因为响应式编程是非阻塞的,并且操作可能在不同的线程上执行。这意味着,一个操作可能在一个线程上开始,在另一个线程上继续,因此无法通过 ThreadLocal 来传递上下文。
Reactor 提供了 Context 机制来替代 ThreadLocal,用于在响应式流中传递上下文数据。Context 是绑定到订阅(Subscription)上的,而不是线程。它随着数据流在操作符之间传递,因此可以在整个流处理过程中访问和修改上下文。
1 2 3 4 5 6 7 8 9 10 11 12
| Flux.range(1, 3) .flatMap(i -> Mono.deferContextual(ctx -> Mono.just("Value: " + i + ", Context: " + ctx.get("traceId")) ) ) .contextWrite(Context.of("traceId", "12345")) .subscribe(System.out::println);
|
12、ParallelFlux - 并行处理
1 2 3 4 5 6 7
| Flux.range(1, 100) .parallel(4) .runOn(Schedulers.parallel()) .map(i -> i * 2) .sequential() .subscribe(System.out::println); 实际业务场景组合
|