美文网首页
WebFlux 响应式编程

WebFlux 响应式编程

作者: 茶理思 | 来源:发表于2020-01-13 12:59 被阅读0次

    在执行程序时,为了提供性能,处理器和编译器常常会对指令进行重排序。重排序分为编译器重排序和处理器重排序两种。
    • 编译器重排序:编译器保证不改变单线程执行结果的前提下,可以调整多线程语句执行顺序。
    • 处理器重排序:如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。

    将同步方式改成异步方式,方法执行改成消息发送,因此诞生了响应式编程模型。

    根据CQRS(Command Query Responsibility Segregation,命令查询的责任分离)模式的思想,任何业务都可以分解为两种基本的消息形式,Query和Command。

    响应式编程的设计原则如下
    • 保持数据的不变性。
    • 没有共享。
    • 阻塞是有害的。


    Reactor

    Reactor是第四代Reactive库,基于Reactive Streams规范在JVM上构建非阻塞应用程序。
    Reactor侧重于服务器端响应式编程,是一个基于Java 8实现的响应式流规范(ReactiveStreams specification)响应式库。

    在Reactor中,数据流发布者(Publisher)由Flux和Mono两个类表示,它们都提供了丰富的操作符(operator)。

    Flux & Mono

    Flux 包含0个或者多个元素[0..n]
    Mono 包含0个或1个元素[0..1]
    Flux & Mono都包含三种信号:元素值错误信号完成信号 (三者不一定都具备)

    image.png
    image.png

    多种声明方式


    image.png

    subscribed

    对数据流订阅

    public static void main(String[] args) {
        // 测试Flux
        Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print);
        System.out.println("\n--------------测试Flux--------------");
    
        // 测试Mono
        Mono.just(1).subscribe(System.out::println);
        System.out.println("--------------测试Mono--------------");
    
        // 测试2个参数的subscribe方法
        Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print, System.err::println);
        System.out.println("\n--------------测试2个参数的subscribe方法--------------");
    
        // 测试3个参数的subscribe方法
        Flux.just(1, 2, 3, 4, 5, 6).subscribe(
                System.out::print,
                System.err::println,
                () -> System.out.println("\ncomplete"));
        System.out.println("--------------测试3个参数的subscribe方法--------------");
    
        // 测试4个参数的subscribe方法
        Flux.just('A','B','C').subscribe(
                System.out::print,
                // 有错误时触发
                System.err::println,
                () -> System.out.println("\ncomplete"),
                subscription -> {
                         // 订阅开始时触发
                            System.out.println("订阅发生了");
                         // 订阅长度,request(1)则只会打出A
                            subscription.request(1);
                        });
        System.out.println("--------------测试4个参数的subscribe方法--------------");
    }
    

    测试复杂度提高了,不像传统写法,可以单步执行。
    使用StepVerifier做测试

    // run
    StepVerifier.create(Flux.just('a','b','c','d'))
                    .expectNext('a','b','c')
                    .expectComplete()
                    .verify();
    // result
    Exception in thread "main" java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onNext(d))
    // run
    StepVerifier.create(Flux.just('a','b','c','d'))
                    .expectNext('a','b','3','d')
                    .expectComplete()
                    .verify();
    // result
    Exception in thread "main" java.lang.AssertionError: expectation "expectNext(3)" failed (expected value: 3; actual value: c)
    // run 
    StepVerifier.create(Flux.just('a','b','c','d'))
                    .expectNext('a','b','c','d')
                    .expectComplete()
                    .verify();
    // result 成功,无结果输出
    

    StepVerifier API

    Operator操作符

    • map
      map可以将数据元素转换成映射表,得到一个新的元素。


      image.png
    // run
    Flux.range(1,3).map(i -> i*i*i).subscribe(
                    i -> {
                        System.out.print(i);
                        System.out.print(',');
                    }
            );
    // result
    1,8,27,
    
    • flatMap
      flatMap操作可以将每个数据元素转换/映射为各个流,然后将每个流合并为一个大的数据流。


      image.png
    // run
    Flux.just("a-b-c","1-2-3")
                    .flatMap(s -> Flux.fromArray(s.split("-")))
                    .subscribe(
                            System.out::print
                    );
    // result
    abc123
    
    • filter
      filter操作可以对数据元素过滤,得到剩余的元素。


      image.png
    // run
    Flux.range(1,3)
                    .filter(i -> i!=2)
                    .subscribe(
                            System.out::println
                    );
    // result
    1
    3
    
    • zip
      zip能够将多个流一对一的合并起来。


      image.png

    zip的其中一种用法


    image.png
    // run
    Flux.zip(Flux.fromArray(desc.split("\\s+"))
                    , Flux.just('a','b','c','d'))
                    .subscribe(System.out::print);
    // result
    [I,a][am,b][Reactor,c]
    

    更多Operator

    线程模型

    JDK提供的多线程工具类Executors提供了多种线程池,使开发人员可以方便地定义线程池进行多线程开发。
    • 获取当前线程环境Schedulers.immediate()。
    • 获取可重用的单线程环境Schedulers.single()。
    • 获取弹性线程池环境Schedulers.elastic()。
    • 获取固定大小线程池环境Schedulers.parallel()。
    • 获取自定义线程池环境Schedulers.fromExecutorService(ExecutorService) 。

    private static void helloAsync() {
            // Callable调用同步hello方法
            Mono.fromCallable(SchedulerOperationDemo::hello)
                    // 弹性线程池执行
                    .subscribeOn(Schedulers.elastic())
                    // 打印结果
                    .subscribe(System.out::println, System.err::println);
        }
    

    WebFlux

    相关文章

      网友评论

          本文标题:WebFlux 响应式编程

          本文链接:https://www.haomeiwen.com/subject/ztigactx.html