springweb flux 编程模型

时间:2021-08-12 17:57:35

 Spring WebFlux 编程模型是在spring5.0开始,springbot2.0版本设计出来的新的一种反应式变成模型。它脱胎于reactor模式,是java nio 异步编程模型。

 

传统一般采用servelt 那一套,是阻塞式编程,现在换了种模式,大大提高程序处理问题能力。

 

不是很明白概念请看官网:https://spring.io/

 

代码如下:

import com.example.demo.exception.ResourceNotFoundException;
import com.example.demo.po.JsonResult;
import com.example.demo.po.User;
import org.springframework.boot.autoconfigure.security.SecurityProperties;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 重要的两个 FLux 与 Mono
 * Created by mingge on 2018/5/10.
 */
@Service
public class UserService {

    private final Map<String, User> data = new ConcurrentHashMap<>();

     {
        data.put("1",new User("1","张三"));
        data.put("2",new User("1","李四"));
        data.put("3",new User("1","王五"));

    }

    public Flux<User> list() {
        return Flux.fromIterable(this.data.values());
    }

    public Flux<JsonResult> lists(){
        JsonResult jsonResult=new JsonResult(200,"ok",this.data.values());
        return Flux.just(jsonResult);
    }

    public Flux<User> getById(final Flux<String> ids) {
        return ids.flatMap(id -> Mono.justOrEmpty(this.data.get(id)));
    }

    public Mono<User> getById(final String id) {
        return Mono.justOrEmpty(this.data.get(id))
                .switchIfEmpty(Mono.error(new ResourceNotFoundException()));
    }

    public Flux<User> createOrUpdate(final Flux<User> users) {
        return users.doOnNext(user -> this.data.put(user.getId(), user));
    }

    public Mono<User> createOrUpdate(final User user) {
        this.data.put(user.getId(), user);
        return Mono.just(user);
    }

    public Mono<User> delete(final String id) {
        return Mono.justOrEmpty(this.data.remove(id));
    }
}

 

控制层的使用方式:

@RestController
@RequestMapping("/user")
public class UserController {
    
    private final UserService userService;

    @Autowired
    public UserController(final UserService userService) {
        this.userService = userService;
    }

    @ResponseStatus(value = HttpStatus.NOT_FOUND, reason = "Resource not found")
    @ExceptionHandler(ResourceNotFoundException.class)
    public void notFound() {
    }

    @GetMapping("/list")
    public Flux<User> list() {
        Flux<User> users=this.userService.list();
        return users;
    }

    @GetMapping("/lists")
    public Flux<JsonResult> lists(){
        return this.userService.lists();
    }
    @GetMapping("/{id}")
    public Mono<User> getById(@PathVariable("id") final String id) {
        return this.userService.getById(id);
    }

    @PostMapping("/edit")
    public Flux<User> create(@RequestBody final Flux<User>  users) {
        return this.userService.createOrUpdate(users);
    }

    @PutMapping("/{id}")
    public Mono<User>  update(@PathVariable("id") final String id, @RequestBody final User user) {
        Objects.requireNonNull(user);
        user.setId(id);
        return this.userService.createOrUpdate(user);
    }

    @DeleteMapping("/{id}")
    public Mono<User>  delete(@PathVariable("id") final String id) {
        return this.userService.delete(id);
    }

 

看起来似乎跟传统没有什么区别,其实主要区别就是返回值的一种变化,其底层处理的方法不一样啦...

 

一些小的测试:

@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {


    @Test
    public void testMonoBasic(){
        Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
        Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
        Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
    }


    @Test
    public void testBasic(){
        Flux.just("Hello", "World").subscribe(System.out::println);
        Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
        Flux.empty().subscribe(System.out::println);
        Flux.range(1, 10).subscribe(System.out::println);
        Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
    }

    @Test
    public void testA(){
        Flux.generate(sink -> {
            sink.next("Hello");
            sink.complete();
        }).subscribe(System.out::println);

        final Random random = new Random();
        Flux.generate(ArrayList::new, (list, sink) -> {
            int value = random.nextInt(100);
            list.add(value);
            sink.next(value);
            if (list.size() == 10) {
                sink.complete();
            }
            return list;
        }).subscribe(System.out::println);
    }

    @Test
    public void testB(){
        Flux.create(sink -> {
            for (int i = 0; i < 10; i++) {
                sink.next(i);
            }
            sink.complete();
        }).subscribe(System.out::println);
    }

    @Test
    public void testC(){
        Flux.range(1, 100).buffer(20).subscribe(System.out::println);
        Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
        Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);
        Flux.range(1, 10).filter(i -> i % 2 == 1).subscribe(System.out::println);
    }

    @Test
    public void testD(){
        Flux.range(1, 100).window(20).subscribe(System.out::println);
    }
    
}

 

有机会搭建新系统我就打算尝试这种新的方法啦...

 

更多:https://www.cnblogs.com/davidwang456/p/4589439.html