Springboot 开发 Web Flux

时间:2022-10-04 20:54:42

一、什么是响应式编程

  1.1 什么是WebFlux

        WebFlux是从Spring Framework5.0以后开始引入的响应式web编程框架。与传统的Spring mvc不同WebFlux不需要Servlet API,在完全异步且无阻塞的通过Reactor项目实现Reactive Streams 规范。

       WebFlux可以在有限资源下提高系统的吞吐量和伸缩性,这意味着在资源相同的情况下WebFlux可以处理更多的请求。

1.2 MVC和WebFlux的比较

     (1) 工作方式:

       mvc的工作流程:主线程接收到请求(request)->  ..... -> 返回数据. 整个过程是单线程阻塞的,在处理好数据后才返回数据,如果用户请求比较多,那么吞吐量就比较低。

      (2)WebFlux:

       WebFlux的工作流程:主线程得到请求-> 立即返回数据与函数的组合(Mono或Flux) -> 开启一个新Work线程准备数据  ->  执行业务操作 --> Work线程工作完成 ---> 返回数据。

spring mvc和 web flux的区别
区别                        Spring mvc Spring WebFlux
地址映射 @Controller @RequestMapping 等

Router Functions 提供函数式的API,用于创建Router Handler Filter

数据流                 Servlet API Reactive Streams: 一种支持背压的异步数据量标准。WebFlux默认使用的是Reactor。
容器         Tomcat Jetty Undertow Tomcat Jetty Netty Undertow

IO 模型        

同步的、阻塞的IO  异步非阻塞的IO
吞吐量        
数据库         Sql  NoSql 支持NoSql,不支持Sql
请求和响应         HttpServletRequest和HttpServletResponse ServletResponse和ServletRequest
业务处理性能  相同                 相同

2. 认识Mono和Flux

   2.1 什么是Mono和Flux

        Mono和Flux是Reactor中的两个基本概念。

       Mono和Flux都是事件发布者,为消费者提供订阅接口。当有事件发生时,Mono和Flux会回调消费者的相应方法,然后通知消费者相应的事件。这也是响应式编程模型。

      Mono和Flux用于处理异步数据流,它不是MVC中直接返回String或Object,而是将异步数据流包装成Mono或Flux对象。

2.2 Mono和Flux的区别

     Flux可以发送多个item(例如:列表)。这些item可以经过若干算子(operators)后才被订阅。Mono只能发送一个item(例如:根据id查询)

    Mono 主要用于返回单个数据,Flux用于返回多个数据。

二、开发WebFlux的流程:

  2.1 配置WebFlux依赖:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

2.2 编写控制器:

mport org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

@RestController
public class HelloController {

    @GetMapping("/")
    public Mono<String> hello(){
        return Mono.just("Hello WebFlux Test dev!");
    }
}

Springboot 开发 Web Flux

 2.3 添加mongo依赖并创建实体类:

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document
public class User {
    @Id
    private String id;
    private String name;
    private Integer age;
}

2.4 创建DAO

import com.example.webfluxdemo.entity.User;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

public interface UserMongoDao extends ReactiveMongoRepository<User,String> {
}

2.5 编写Handler:

import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

@Component
public class HelloWorldHandler {

     public Mono<ServerResponse> sayHello(ServerRequest serverRequest){
         return ServerResponse.ok().contentType(MediaType.TEXT_PLAIN)
                 .body(Mono.just("This is WebFlux Demo!"),String.class);
     }
}

2.6 编写路由:

@Configuration
public class Router {

    @Resource
    private HelloWorldHandler helloWorldHandler;

    @Bean
    public RouterFunction<ServerResponse> getString(){
        return route(GET("/hello"),req->helloWorldHandler.sayHello(req));
    }
}

2.7 编写控制器:

mport com.example.webfluxdemo.dao.UserMongoDao;
import com.example.webfluxdemo.entity.User;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

@RestController
@RequestMapping("/user")
public class UserController {

    @Resource
    private UserMongoDao userMongoDao;

    @GetMapping(value ="/list")
    public Flux<User> findAll(){
        return userMongoDao.findAll();
    }

    @PostMapping("")
    public Mono<User> create( User user){
        return this.userMongoDao.save(user);
    }

    @GetMapping("/{id}")
    public Mono<ResponseEntity<User>> getUserById(@PathVariable("id") String id){
        return this.userMongoDao.findById(id)
                .map(getUser -> ResponseEntity.ok(getUser))
                .defaultIfEmpty(ResponseEntity.notFound().build());
    }
    @DeleteMapping("/{id}")
    public Mono<ResponseEntity<Void>> delete(@PathVariable("id")String id){
        return userMongoDao.findById(id)
                .flatMap(existingUser ->
                        userMongoDao.delete(existingUser)
                        .then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK)))
                )
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    @PutMapping("/{id}")
    public Mono update(@PathVariable("id")String id,User user){
        return this.userMongoDao.findById(id)
                .flatMap(existingUser-> {
                    existingUser.setName(user.getName());
                    existingUser.setAge(user.getAge());
                    return userMongoDao.save(user);
                })
                .map(update-> new ResponseEntity<>(update,HttpStatus.OK))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    @GetMapping(value ="/listdelay",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<User> getAll(){
        return userMongoDao.findAll().delayElements(Duration.ofSeconds(1));
    }
}

Springboot 开发 Web Flux

Springboot 开发 Web Flux