一、什么是响应式编程
1.1 什么是WebFlux
WebFlux是从Spring Framework5.0以后开始引入的响应式web编程框架。与传统的Spring mvc不同WebFlux不需要Servlet API,在完全异步且无阻塞的通过Reactor项目实现Reactive Streams 规范。
WebFlux可以在有限资源下提高系统的吞吐量和伸缩性,这意味着在资源相同的情况下WebFlux可以处理更多的请求。
1.2 MVC和WebFlux的比较
(1) 工作方式:
mvc的工作流程:主线程接收到请求(request)-> ..... -> 返回数据. 整个过程是单线程阻塞的,在处理好数据后才返回数据,如果用户请求比较多,那么吞吐量就比较低。
(2)WebFlux:
WebFlux的工作流程:主线程得到请求-> 立即返回数据与函数的组合(Mono或Flux) -> 开启一个新Work线程准备数据 -> 执行业务操作 --> Work线程工作完成 ---> 返回数据。
区别 | Spring mvc | Spring WebFlux |
地址映射 | @Controller @RequestMapping 等 | Router Functions 提供函数式的API,用于创建Router Handler Filter |
数据流 | Servlet API | Reactive Streams: 一种支持背压的异步数据量标准。WebFlux默认使用的是Reactor。 |
容器 | Tomcat Jetty Undertow | Tomcat Jetty Netty Undertow |
IO 模型 |
同步的、阻塞的IO | 异步非阻塞的IO |
吞吐量 | 低 | 高 |
数据库 | Sql NoSql | 支持NoSql,不支持Sql |
请求和响应 | HttpServletRequest和HttpServletResponse | ServletResponse和ServletRequest |
业务处理性能 | 相同 | 相同 |
2. 认识Mono和Flux
2.1 什么是Mono和Flux
Mono和Flux是Reactor中的两个基本概念。
Mono和Flux都是事件发布者,为消费者提供订阅接口。当有事件发生时,Mono和Flux会回调消费者的相应方法,然后通知消费者相应的事件。这也是响应式编程模型。
Mono和Flux用于处理异步数据流,它不是MVC中直接返回String或Object,而是将异步数据流包装成Mono或Flux对象。
2.2 Mono和Flux的区别
Flux可以发送多个item(例如:列表)。这些item可以经过若干算子(operators)后才被订阅。Mono只能发送一个item(例如:根据id查询)
Mono 主要用于返回单个数据,Flux用于返回多个数据。
二、开发WebFlux的流程:
2.1 配置WebFlux依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2.2 编写控制器:
mport org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
public class HelloController {
@GetMapping("/")
public Mono<String> hello(){
return Mono.just("Hello WebFlux Test dev!");
}
}
2.3 添加mongo依赖并创建实体类:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document
public class User {
@Id
private String id;
private String name;
private Integer age;
}
2.4 创建DAO
import com.example.webfluxdemo.entity.User;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
public interface UserMongoDao extends ReactiveMongoRepository<User,String> {
}
2.5 编写Handler:
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
@Component
public class HelloWorldHandler {
public Mono<ServerResponse> sayHello(ServerRequest serverRequest){
return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
.body(Mono.just("This is WebFlux Demo!"),String.class);
}
}
2.6 编写路由:
@Configuration
public class Router {
@Resource
private HelloWorldHandler helloWorldHandler;
@Bean
public RouterFunction<ServerResponse> getString(){
return route(GET("/hello"),req->helloWorldHandler.sayHello(req));
}
}
2.7 编写控制器:
mport com.example.webfluxdemo.dao.UserMongoDao;
import com.example.webfluxdemo.entity.User;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
@RestController
@RequestMapping("/user")
public class UserController {
@Resource
private UserMongoDao userMongoDao;
@GetMapping(value ="/list")
public Flux<User> findAll(){
return userMongoDao.findAll();
}
@PostMapping("")
public Mono<User> create( User user){
return this.userMongoDao.save(user);
}
@GetMapping("/{id}")
public Mono<ResponseEntity<User>> getUserById(@PathVariable("id") String id){
return this.userMongoDao.findById(id)
.map(getUser -> ResponseEntity.ok(getUser))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@DeleteMapping("/{id}")
public Mono<ResponseEntity<Void>> delete(@PathVariable("id")String id){
return userMongoDao.findById(id)
.flatMap(existingUser ->
userMongoDao.delete(existingUser)
.then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK)))
)
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
@PutMapping("/{id}")
public Mono update(@PathVariable("id")String id,User user){
return this.userMongoDao.findById(id)
.flatMap(existingUser-> {
existingUser.setName(user.getName());
existingUser.setAge(user.getAge());
return userMongoDao.save(user);
})
.map(update-> new ResponseEntity<>(update,HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
@GetMapping(value ="/listdelay",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<User> getAll(){
return userMongoDao.findAll().delayElements(Duration.ofSeconds(1));
}
}