美文网首页
JDK 9 Flow

JDK 9 Flow

作者: c934 | 来源:发表于2020-12-05 12:22 被阅读0次

响应式编程

Flow 是支持响应式编程的一套规范,里面定义了一整套需要实现的接口


接口类目

调用关系图如下:


调用关系图.png

使用实例:

  1. 定义Subscriber
package com.luo.service;

import java.util.concurrent.Flow;

public class DemoSubscriber implements Flow.Subscriber {


    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("sub-建立订阅关系第一次调用");
        this.subscription = subscription;
        this.subscription.request(5);
    }

    @Override
    public void onNext(Object item) {
        System.out.println("sub-接受数据:" + item);
        try {
            Thread.sleep(2 * 1000);
            this.subscription.request(2);
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println(throwable);
        this.subscription.cancel();
    }
    @Override
    public void onComplete() {
        System.out.println("sub-数据接收完成");
    }
}

  1. 测试
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class ReactiveMain {

    public static void main(String[] args) throws InterruptedException {
        // 发布者
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        // 订阅者
        Flow.Subscriber subscriber = new DemoSubscriber();

        publisher.subscribe(subscriber);
        Thread.sleep(5 * 1000);
        System.out.println("p没有发送过消息,已经等待10秒");

        try {
            for (int i = 0; i < 10; i++) {
                publisher.submit("" + i);
                Thread.sleep(500);
            }
            System.out.println("所有消息发送完成");

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            publisher.close();
        }
        System.out.println("p 关闭发送");
        Thread.sleep(10 * 1000);

        Thread.currentThread().join();

    }
}

Processor 使用测试



import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class DemoProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("process-第一次建立订阅关系");
        this.subscription = subscription;
        this.subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.println("process-接受到数据" + item);
        item += "-process 处理后的消息";
        this.submit(item);
        this.subscription.request(1);

    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println(throwable);
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        System.out.println("process-数据处理完成");
    }

}

使用测试

import java.util.concurrent.SubmissionPublisher;

public class ReactiveProcessorMain {
    public static void main(String[] args) throws InterruptedException {
        // 发布者
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        // 订阅者
        DemoProcessor processor = new DemoProcessor();

        publisher.subscribe(processor);

        DemoSubscriber subScriber = new DemoSubscriber();

        processor.subscribe(subScriber);


        try {
            for (int i = 0; i < 10; i++) {
                publisher.submit("" + i);
                Thread.sleep(500);
            }
            System.out.println("所有消息发送完成");

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            publisher.close();
        }
        System.out.println("p 关闭发送");
        Thread.sleep(10 * 1000);

        Thread.currentThread().join();

    }
}

相关文章

网友评论

      本文标题:JDK 9 Flow

      本文链接:https://www.haomeiwen.com/subject/rkcgwktx.html