1、java9

作者: lesline | 来源:发表于2018-10-14 23:47 被阅读11次

    JDK9中的Flow API对应响应式流规范,响应式流规范是一种事实标准。JEP 266包含了一组最小接口集合,这组接口能捕获核心的异步发布与订阅。希望在未来第三方能够实现这些接口,并且能共享其方式。

    java.util.concurrent.Flow包含以下4个接口:

    • Flow.Processor(处理器)
    • Flow.Publisher(发布者)
    • Flow.Subscriber(订阅者)
    • Flow.Subscription(订阅管理器)

    交互流程如下:


    Publisher、Subscriber交互流程.png

    很类似mq中的发布-订阅模式:发布者发布数据,订阅者接收数据。

    使用示例

    public class FlowDemo {
        public static void main(String[] args) throws Exception {
            // 1. 定义发布者, 发布的数据类型是 Integer
            // 直接使用~jdk~自带的SubmissionPublisher, 它实现了 Publisher 接口
            SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
    
            // 2. 定义订阅者
            Subscriber<Integer> subscriber = new Subscriber<Integer>() {
                private Subscription subscription;
                @Override
                public void onSubscribe(Subscription subscription) {
                    // 保存订阅关系, 需要用它来给发布者响应
                    this.subscription = subscription;
                    // 请求一个数据
                    this.subscription.request(1);
                }
    
                @Override
                public void onNext(Integer item) {
                    // 接受到一个数据, 处理
                    System.out.println("接受到数据: " + item);
                        TimeUnit.SECONDS.sleep(3);
                    // 处理完调用request再请求一个数据
                    this.subscription.request(1);
    
                    // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
                    // this.subscription.cancel();
                }
    
                @Override
                public void onError(Throwable throwable) {
                    // 出现了异常(例如处理数据的时候产生了异常)
                    throwable.printStackTrace();
    
                    // 我们可以告诉发布者, 后面不接受数据了
                    this.subscription.cancel();
                }
    
                @Override
                public void onComplete() {
                    // 全部数据处理完了(发布者关闭了)
                    System.out.println("处理完了!");
                }
    
            };
            // 3. 发布者和订阅者 建立订阅关系
            publiser.subscribe(subscriber);
    
            // 4. 生产数据, 并发布
            // 这里忽略数据生产过程
            for (int i = 0; i < 1000; i++) {
                System.out.println("生成数据:" + i);
                // submit是个block方法
                publiser.submit(i);
            }
    
            // 5. 结束后 关闭发布者
            // 正式环境 应该放 finally 或者使用 try-~resouce~ 确保关闭
            publiser.close();
    
            // 主线程延迟停止, 否则数据没有消费就退出
            Thread.currentThread().join(1000);
            // debug的时候, 下面这行需要有断点
            // 否则主线程结束无法debug
            System.out.println();
        }
    }
    
    
    import java.util.concurrent.Flow.Processor;
    import java.util.concurrent.Flow.Subscriber;
    import java.util.concurrent.Flow.Subscription;
    import java.util.concurrent.SubmissionPublisher;
    
    /**
     * 带 process 的 flow demo
     */
    
    /**
     * Processor, 需要继承SubmissionPublisher并实现Processor接口
     * 
     * 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
     */
    class MyProcessor extends SubmissionPublisher<String>
            implements Processor<Integer, String> {
    
        private Subscription subscription;
    
        @Override
        public void onSubscribe(Subscription subscription) {
            // 保存订阅关系, 需要用它来给发布者响应
            this.subscription = subscription;
    
            // 请求一个数据
            this.subscription.request(1);
        }
    
        @Override
        public void onNext(Integer item) {
            // 接受到一个数据, 处理
            System.out.println("处理器接受到数据: " + item);
    
            // 过滤掉小于0的, 然后发布出去
            if (item > 0) {
                this.submit("转换后的数据:" + item);
            }
    
            // 处理完调用request再请求一个数据
            this.subscription.request(1);
    
            // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
            // this.subscription.cancel();
        }
    
        @Override
        public void onError(Throwable throwable) {
            // 出现了异常(例如处理数据的时候产生了异常)
            throwable.printStackTrace();
    
            // 我们可以告诉发布者, 后面不接受数据了
            this.subscription.cancel();
        }
    
        @Override
        public void onComplete() {
            // 全部数据处理完了(发布者关闭了)
            System.out.println("处理器处理完了!");
            // 关闭发布者
            this.close();
        }
    
    }
    
    public class FlowDemoTest {
    
        public static void main(String[] args) throws Exception {
            // 1. 定义发布者, 发布的数据类型是 Integer
            // 直接使用jdk自带的SubmissionPublisher
            SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
    
            // 2. 定义处理器, 对数据进行过滤, 并转换为String类型
            MyProcessor processor = new MyProcessor();
    
            // 3. 发布者 和 处理器 建立订阅关系
            publiser.subscribe(processor);
    
            // 4. 定义最终订阅者, 消费 String 类型数据
            Subscriber<String> subscriber = new Subscriber<String>() {
                private Subscription subscription;
                @Override
                public void onSubscribe(Subscription subscription) {
                    // 保存订阅关系, 需要用它来给发布者响应
                    this.subscription = subscription;
                    // 请求一个数据
                    this.subscription.request(1);
                }
    
                @Override
                public void onNext(String item) {
                    // 接受到一个数据, 处理
                    System.out.println("接受到数据: " + item);
    
                    // 处理完调用request再请求一个数据
                    this.subscription.request(1);
    
                    // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
                    // this.subscription.cancel();
                }
    
                @Override
                public void onError(Throwable throwable) {
                    // 出现了异常(例如处理数据的时候产生了异常)
                    throwable.printStackTrace();
    
                    // 我们可以告诉发布者, 后面不接受数据了
                    this.subscription.cancel();
                }
    
                @Override
                public void onComplete() {
                    // 全部数据处理完了(发布者关闭了)
                    System.out.println("处理完了!");
                }
            };
    
            // 5. 处理器 和 最终订阅者 建立订阅关系
            processor.subscribe(subscriber);
    
            // 6. 生产数据, 并发布
            // 这里忽略数据生产过程
            //是阻塞方法 默认缓冲356个数据
            publiser.submit(-111);
            publiser.submit(111);
    
            // 7. 结束后 关闭发布者
            // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
            publiser.close();
    
            // 主线程延迟停止, 否则数据没有消费就退出
            Thread.currentThread().join(1000);
        }
    }
    
    

    背压依我的理解来说,是指订阅者能和发布者交互(通过代码里面的调用request和cancel方法交互),可以调节发布者发布数据的速率,解决把订阅者压垮的问题。关键在于上面例子里面的订阅关系Subscription这个接口,他有request和cancel 2个方法,用于通知发布者需要数据和通知发布者不再接受数据。

    我们重点理解背压在jdk9里面是如何实现的。关键在于发布者Publisher的实现类SubmissionPublisher的submit方法是block方法。订阅者会有一个缓冲池,默认为Flow.defaultBufferSize() = 256。当订阅者的缓冲池满了之后,发布者调用submit方法发布数据就会被阻塞,发布者就会停(慢)下来;订阅者消费了数据之后(调用Subscription.request方法),缓冲池有位置了,submit方法就会继续执行下去,就是通过这样的机制,实现了调节发布者发布数据的速率,消费得快,生成就快,消费得慢,发布者就会被阻塞,当然就会慢下来了。

    Flow类包含defaultBufferSize()静态方法,它返回发布者和订阅者使用的缓冲区的默认大小。 目前,它返回256。


    参考:

    Reactive Programming with JDK 9 Flow API | Oracle Community
    Java 9 揭秘(17. Reactive Streams) - 林本托 - 博客园
    Java 9新特点: 响应式流Reactive Streams
    http://www.reactive-streams.org/

    相关文章

      网友评论

          本文标题:1、java9

          本文链接:https://www.haomeiwen.com/subject/crlxzftx.html