美文网首页
响应式编程——Flow API

响应式编程——Flow API

作者: 王侦 | 来源:发表于2019-07-29 16:16 被阅读0次

1.官方文档

Interrelated interfaces and static methods for establishing flow-
controlled components in which Publishers produce items 
consumed by one or more Subscribers, each managed by a 
Subscription.

These interfaces correspond to the reactive-streams specification. 
They apply in both concurrent and distributed asynchronous 
settings: All (seven) methods are defined in void "one-way" 
message style. Communication relies on a simple form of flow 
control (method Flow.Subscription.request(long)) that can be used 
to avoid resource management problems that may otherwise occur 
in "push" based systems.

Examples. A Flow.Publisher usually defines its own 
Flow.Subscription implementation; constructing one in method 
subscribe and issuing it to the calling Flow.Subscriber. It publishes 
items to the subscriber asynchronously, normally using an 
Executor. For example, here is a very simple publisher that only 
issues (when requested) a single TRUE item to a single 
subscriber. Because the subscriber receives only a single item, 
this class does not use buffering and ordering control required in 
most implementations (for example SubmissionPublisher).

用于建立流控制组件的相互关联的接口和静态方法,其中Publishers生产由一个或多个Subscribers使用的元素,Subscriber由Subscription管理。

这些接口对应于reactive-streams规范。它们适用于并发和分布式异步环境:所有(七种)方法都以void“单向”消息样式定义。通信依赖于简单形式的流控制(方法Flow.Subscription.request(long)),可用于避免在“基于推送”的系统中可能发生的资源管理问题。

例子。 Flow.Publisher通常定义自己的Flow.Subscription实现;在方法subscribe中构造一个并将其发布到调用Flow.Subscriber。它通常使用Executor异步地向订阅者发布items。例如,这是一个非常简单的发布者,它只向单个订阅者发出(如果请求)单个TRUE元素。由于订阅者只接收单个元素,因此该类不使用大多数实现中所需的缓冲和排序控制(例如SubmissionPublisher)。

 class OneShotPublisher implements Publisher<Boolean> {
   private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
   private boolean subscribed; // true after first subscribe
   public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
     if (subscribed)
       subscriber.onError(new IllegalStateException()); // only one allowed
     else {
       subscribed = true;
       subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
     }
   }
   static class OneShotSubscription implements Subscription {
     private final Subscriber<? super Boolean> subscriber;
     private final ExecutorService executor;
     private Future<?> future; // to allow cancellation
     private boolean completed;
     OneShotSubscription(Subscriber<? super Boolean> subscriber,
                         ExecutorService executor) {
       this.subscriber = subscriber;
       this.executor = executor;
     }
     public synchronized void request(long n) {
       if (n != 0 && !completed) {
         completed = true;
         if (n < 0) {
           IllegalArgumentException ex = new IllegalArgumentException();
           executor.execute(() -> subscriber.onError(ex));
         } else {
           future = executor.submit(() -> {
             subscriber.onNext(Boolean.TRUE);
             subscriber.onComplete();
           });
         }
       }
     }
     public synchronized void cancel() {
       completed = true;
       if (future != null) future.cancel(false);
     }
   }
 }
A Flow.Subscriber arranges that items be requested and 
processed. Items (invocations of Flow.Subscriber.onNext(T)) are 
not issued unless requested, but multiple items may be requested.
 Many Subscriber implementations can arrange this in the style of 
the following example, where a buffer size of 1 single-steps, and 
larger sizes usually allow for more efficient overlapped processing 
with less communication; for example with a value of 64, this 
keeps total outstanding requests between 32 and 64. Because 
Subscriber method invocations for a given Flow.Subscription are 
strictly ordered, there is no need for these methods to use locks or 
volatiles unless a Subscriber maintains multiple Subscriptions (in 
which case it is better to instead define multiple Subscribers, each 
with its own Subscription).

Flow.Subscriber安排要请求和处理的元素。 除非请求,否则不会发出元素(Flow.Subscriber.onNext(T)的调用),但可能会请求多个元素。 许多订阅者实现可以按照以下示例的样式进行排列,其中缓冲区大小为1个单步,而较大的大小通常允许更有效的重叠处理和更少的通信; 例如,值为64,这使得未完成的请求总数保持在32到64之间。由于对给定Flow.Subscription的订阅者方法调用是严格排序的,因此除非订阅者维护多个订阅,否则这些方法不需要使用锁或volatile。 (在多个订阅情况下,最好定义多个Subscribers,每个订阅者都有自己的Subscription)。

 class SampleSubscriber<T> implements Subscriber<T> {
   final Consumer<? super T> consumer;
   Subscription subscription;
   final long bufferSize;
   long count;
   SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
     this.bufferSize = bufferSize;
     this.consumer = consumer;
   }
   public void onSubscribe(Subscription subscription) {
     long initialRequestSize = bufferSize;
     count = bufferSize - bufferSize / 2; // re-request when half consumed
     (this.subscription = subscription).request(initialRequestSize);
   }
   public void onNext(T item) {
     if (--count <= 0)
       subscription.request(count = bufferSize - bufferSize / 2);
     consumer.accept(item);
   }
   public void onError(Throwable ex) { ex.printStackTrace(); }
   public void onComplete() {}
 }
The default value of defaultBufferSize() may provide a useful 
starting point for choosing request sizes and capacities in Flow 
components based on expected rates, resources, and usages. Or, 
when flow control is never needed, a subscriber may initially 
request an effectively unbounded number of items, as in:

defaultBufferSize()的默认值可以提供一个有用的起点,用于根据预期的速率、资源和用法选择Flow组件中的请求大小和容量。 或者,当从不需要流量控制时,subscriber最初可以请求有效无限数量的元素,如:

 class UnboundedSubscriber<T> implements Subscriber<T> {
   public void onSubscribe(Subscription subscription) {
     subscription.request(Long.MAX_VALUE); // effectively unbounded
   }
   public void onNext(T item) { use(item); }
   public void onError(Throwable ex) { ex.printStackTrace(); }
   public void onComplete() {}
   void use(T item) { ... }
 }

2.源码


3.Pull、Push和Pull-Push模式(处理数据模式)

  • pull-based。client端不断轮询服务端以获取数据。这种模式的优点是当client端资源有限时可以更好的控制数据流(停止轮询),而缺点是当服务端没有数据时轮询是对计算资源和网络资源的浪费。
  • push-based,生产者不关心消费者的消费能力,直接推送数据。这种模式的缺点是当消费资源低于生产资源时会造成缓冲区溢出从而数据丢失,当丢失率维持在较小的数值时还可以接受,但是当这个比率变大时我们会希望生产者降速以避免大规模数据丢失。
  • 响应式编程是一种pull-push混合模式以综合他们的优点,这种模式下消费者负责请求数据以控制生产者数据流,同时当处理资源不足时也可以选择阻断或者丢弃数据。
    要指出是,当消费速度低于生产速度时,消费者要求生产者降低速度以完全消费数据(这个现象称作back-pressure)。

4.Flow与Stream

  • Java8中引入的StreamAPI通过map、reduce以及其他操作可以完美的处理数据集
  • FlowAPI则专注于处理数据的流通,比如对数据的请求,减速,丢弃,阻塞等。reactive streams不仅兼容传统编程方式,而且还支持函数式编程以极大的提高可读性和可维护性。
  • 从技术上讲,我们完全可以使用Flows来替换Streams,但任何时候都这么做就显得过于偏激。比如,我们创建一个Publisher来作为int数组的数据源,然后在Processor中转换Integer为String,最后创建一个Subscriber来归并到一个String中。这个时候就完全没有必要使用Flows,因为这不是在控制两个模块或两个线程间的数据通信,这个时候使用Streams更为合理。

总之,Flow更关注于通信处理,Stream更关注于数据处理。

5.杂志出版商示例

  • 假设出版商有两个订阅客户
  • 出版商将为每个订阅客户出版20本杂志。出版商知道他们的客户有时在邮递杂志时会不在家,而当他们的邮箱(subscriber buffer)不巧被塞满时邮递员会退回或丢弃杂志,出版商不希望出现这种情况。
  • 于是出版商发明了一个邮递系统:当客户在家时再给出版商致电,出版商会立即邮递一份杂志。出版商打算在办公室为每个客户保留一个小号的邮箱以防当杂志出版时客户没有第一时间致电获取。出版商认为为每个客户预留一个可以容纳8份杂志的邮件已经足够(publisher buffer)

总结场景:

  • 如果客户请求杂志足够迅速,将不会存在邮箱容量的问题。
  • 如果客户没有以杂志出版的速度发出请求,那么邮箱将被塞满。这位员工提出以下几种处理方案:
    a. 增加邮箱容量,为每位客户提供可容纳20份杂志的邮箱。(publisher增加buffer)
    b. 直到客户请求下一份杂志之前停止印刷,并且根据客户请求的速度降低印刷速度以清空邮箱。
    c. 新的杂志直接丢掉。
    d. 一个折中的方案: 如果邮箱满了,在下次打印之前等待一段时间,如果还是没有足够的空间则丢弃新的杂志。

最终选择了方案d。

package com.wz.concurrent.other;

import java.util.concurrent.Flow;
import java.util.stream.IntStream;

//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;

/**
 * @Author : Wang Zhen.
 * @Date : Created in 14:50 2019/7/29
 * @Description :
 * @Modified By   :
 * @Version :
 */
public class MagazineSubscriber implements Flow.Subscriber<Integer> {

    public static final String JACK = "Jack";
    public static final String PETE = "Pete";

//    private static final Logger log = LoggerFactory.
//            getLogger(MagazineSubscriber.class);

    private final long sleepTime;
    private final String subscriberName;
    private Flow.Subscription subscription;
    private int nextMagazineExpected;
    private int totalRead;

    MagazineSubscriber(final long sleepTime, final String subscriberName) {
        this.sleepTime = sleepTime;
        this.subscriberName = subscriberName;
        this.nextMagazineExpected = 1;
        this.totalRead = 0;
    }

    @Override
    public void onSubscribe(final Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(final Integer magazineNumber) {
        if (magazineNumber != nextMagazineExpected) {
            IntStream.range(nextMagazineExpected, magazineNumber).forEach(
                    (msgNumber) ->
                            log("Oh no! I missed the magazine " + msgNumber)
            );
            // Catch up with the number to keep tracking missing ones
            nextMagazineExpected = magazineNumber;
        }
        log("Great! I got a new magazine: " + magazineNumber);
        takeSomeRest();
        nextMagazineExpected++;
        totalRead++;

        log("I'll get another magazine now, next one should be: " +
                nextMagazineExpected);
        subscription.request(1);
    }

    @Override
    public void onError(final Throwable throwable) {
        log("Oops I got an error from the Publisher: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        log("Finally! I completed the subscription, I got in total " +
                totalRead + " magazines.");
    }

    private void log(final String logMessage) {
        System.out.println("<=========== [" + subscriberName + "] : " + logMessage);
    }

    public String getSubscriberName() {
        return subscriberName;
    }

    private void takeSomeRest() {
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

MagazineSubscriber中的方法:

  • onSubscriber(subscription) Publisher在被指定一个新的Subscriber时调用此方法。 一般来说你需要在subscriber内部保存这个subscrition实例,因为后面会需要通过她向publisher发送信号来完成:请求更多数据,或者取消订阅。 一般在这里我们会直接请求第一个数据,正如代码中所示。
  • onNext(magazineNumber) 每当新的数据产生,这个方法会被调用。在我们的示例中,我们用到了最经典的使用方式:处理这个数据的同时再请求下一个数据。然而我们在这中间添加了一段可配置的sleep时间,这样我们可以尝试订阅者在不同场景下的表现。剩下的一段逻辑判断仅仅是记录下丢失的杂志(当publisher出现丢弃数据的时候)。
  • onError(throwable) 当publisher出现异常时会调用subscriber的这个方法。在我们的实现中publisher丢弃数据时会产生异常。
  • onComplete() 当publisher数据推送完毕时会调用此方法,于是整个订阅过程结束。
package com.wz.concurrent.other;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @Author : Wang Zhen.
 * @Date : Created in 14:53 2019/7/29
 * @Description :
 * @Modified By   :
 * @Version :
 */
public class ReactiveFlowApp {

    private static final int NUMBER_OF_MAGAZINES = 20;
    private static final long MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE = 2;
//    private static final Logger log =
//            LoggerFactory.getLogger(ReactiveFlowApp.class);

    public static void main(String[] args) throws Exception {
        final ReactiveFlowApp app = new ReactiveFlowApp();

        System.out.println("\n\n### CASE 1: Subscribers are fast, buffer size is not so " +
                "important in this case.");
        app.magazineDeliveryExample(100L, 100L, 8);

        System.out.println("\n\n### CASE 2: A slow subscriber, but a good enough buffer " +
                "size on the publisher's side to keep all items until they're picked up");
        app.magazineDeliveryExample(1000L, 3000L, NUMBER_OF_MAGAZINES);

        System.out.println("\n\n### CASE 3: A slow subscriber, and a very limited buffer " +
                "size on the publisher's side so it's important to keep the slow " +
                "subscriber under control");
        app.magazineDeliveryExample(1000L, 3000L, 8);

    }

    void magazineDeliveryExample(final long sleepTimeJack,
                                 final long sleepTimePete,
                                 final int maxStorageInPO) throws Exception {
        final SubmissionPublisher<Integer> publisher =
                new SubmissionPublisher<>(ForkJoinPool.commonPool(), maxStorageInPO);

        final MagazineSubscriber jack = new MagazineSubscriber(
                sleepTimeJack,
                MagazineSubscriber.JACK
        );
        final MagazineSubscriber pete = new MagazineSubscriber(
                sleepTimePete,
                MagazineSubscriber.PETE
        );

        publisher.subscribe(jack);
        publisher.subscribe(pete);

        System.out.println("Printing 20 magazines per subscriber, with room in publisher for "
                + maxStorageInPO + ". They have " + MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE +
                " seconds to consume each magazine.");
        IntStream.rangeClosed(1, 20).forEach((number) -> {
            System.out.println("Offering magazine " + number + " to consumers");
            final int lag = publisher.offer(
                    number,
                    MAX_SECONDS_TO_KEEP_IT_WHEN_NO_SPACE,
                    TimeUnit.SECONDS,
                    (subscriber, msg) -> {
                        subscriber.onError(
                                new RuntimeException("Hey " + ((MagazineSubscriber) subscriber)
                                        .getSubscriberName() + "! You are too slow getting magazines" +
                                        " and we don't have more space for them! " +
                                        "I'll drop your magazine: " + msg));
                        return false; // don't retry, we don't believe in second opportunities
                    });
            if (lag < 0) {
                log("Dropping " + -lag + " magazines");
            } else {
                log("The slowest consumer has " + lag +
                        " magazines in total to be picked up");
            }
        });

        // Blocks until all subscribers are done (this part could be improved
        // with latches, but this way we keep it simple)
        while (publisher.estimateMaximumLag() > 0) {
            Thread.sleep(500L);
        }

        // Closes the publisher, calling the onComplete() method on every subscriber
        publisher.close();
        // give some time to the slowest consumer to wake up and notice
        // that it's completed
        Thread.sleep(Math.max(sleepTimeJack, sleepTimePete));
    }

    private static void log(final String message) {
        System.out.println("===========> " + message);
    }

}

使用Java9 SubmissionPublisher类来创建publisher。正如javadoc所述, 当subscribers消费过慢,就像Reactive Streams中的Publisher一样她会阻塞或丢弃数据。

main()方法中使用不同参数调用以上逻辑三次,以模拟之前介绍的三种不同真是场景。

  • 消费者消费速度很快,publisher缓存区不会发生问题。
  • 其中一个消费者速度很慢,以至缓存被填满,然而缓存区足够大以容纳所有所有数据,不会发生丢弃。
  • 其中一个消费者速度很慢,同时缓存区不够大。会发生丢失且情况。

结果:
情况1的如下:

### CASE 1: Subscribers are fast, buffer size is not so important in this case.
Printing 20 magazines per subscriber, with room in publisher for 8. They have 2 seconds to consume each magazine.
Offering magazine 1 to consumers
===========> The slowest consumer has 1 magazines in total to be picked up
Offering magazine 2 to consumers
===========> The slowest consumer has 2 magazines in total to be picked up
Offering magazine 3 to consumers
===========> The slowest consumer has 3 magazines in total to be picked up
Offering magazine 4 to consumers
===========> The slowest consumer has 4 magazines in total to be picked up
Offering magazine 5 to consumers
===========> The slowest consumer has 5 magazines in total to be picked up
Offering magazine 6 to consumers
===========> The slowest consumer has 6 magazines in total to be picked up
Offering magazine 7 to consumers
===========> The slowest consumer has 7 magazines in total to be picked up
Offering magazine 8 to consumers
===========> The slowest consumer has 8 magazines in total to be picked up
Offering magazine 9 to consumers
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 10 to consumers
<=========== [Jack] : Great! I got a new magazine: 1
<=========== [Pete] : Great! I got a new magazine: 1
<=========== [Jack] : I'll get another magazine now, next one should be: 2
<=========== [Jack] : Great! I got a new magazine: 2
<=========== [Pete] : I'll get another magazine now, next one should be: 2
<=========== [Pete] : Great! I got a new magazine: 2
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 11 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 3
<=========== [Jack] : Great! I got a new magazine: 3
<=========== [Pete] : I'll get another magazine now, next one should be: 3
<=========== [Pete] : Great! I got a new magazine: 3
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 12 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 4
<=========== [Jack] : Great! I got a new magazine: 4
<=========== [Pete] : I'll get another magazine now, next one should be: 4
<=========== [Pete] : Great! I got a new magazine: 4
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 13 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 5
<=========== [Jack] : Great! I got a new magazine: 5
<=========== [Pete] : I'll get another magazine now, next one should be: 5
<=========== [Pete] : Great! I got a new magazine: 5
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 14 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 6
<=========== [Jack] : Great! I got a new magazine: 6
<=========== [Pete] : I'll get another magazine now, next one should be: 6
<=========== [Pete] : Great! I got a new magazine: 6
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 15 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 7
<=========== [Jack] : Great! I got a new magazine: 7
<=========== [Pete] : I'll get another magazine now, next one should be: 7
<=========== [Pete] : Great! I got a new magazine: 7
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 16 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 8
<=========== [Jack] : Great! I got a new magazine: 8
<=========== [Pete] : I'll get another magazine now, next one should be: 8
<=========== [Pete] : Great! I got a new magazine: 8
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 17 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 9
<=========== [Jack] : Great! I got a new magazine: 9
<=========== [Pete] : I'll get another magazine now, next one should be: 9
<=========== [Pete] : Great! I got a new magazine: 9
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 18 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 10
<=========== [Jack] : Great! I got a new magazine: 10
<=========== [Pete] : I'll get another magazine now, next one should be: 10
<=========== [Pete] : Great! I got a new magazine: 10
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 19 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 11
<=========== [Jack] : Great! I got a new magazine: 11
<=========== [Pete] : I'll get another magazine now, next one should be: 11
<=========== [Pete] : Great! I got a new magazine: 11
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 20 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 12
<=========== [Jack] : Great! I got a new magazine: 12
<=========== [Pete] : I'll get another magazine now, next one should be: 12
<=========== [Pete] : Great! I got a new magazine: 12
===========> The slowest consumer has 9 magazines in total to be picked up
<=========== [Jack] : I'll get another magazine now, next one should be: 13
<=========== [Jack] : Great! I got a new magazine: 13
<=========== [Pete] : I'll get another magazine now, next one should be: 13
<=========== [Pete] : Great! I got a new magazine: 13
<=========== [Jack] : I'll get another magazine now, next one should be: 14
<=========== [Jack] : Great! I got a new magazine: 14
<=========== [Pete] : I'll get another magazine now, next one should be: 14
<=========== [Pete] : Great! I got a new magazine: 14
<=========== [Jack] : I'll get another magazine now, next one should be: 15
<=========== [Jack] : Great! I got a new magazine: 15
<=========== [Pete] : I'll get another magazine now, next one should be: 15
<=========== [Pete] : Great! I got a new magazine: 15
<=========== [Jack] : I'll get another magazine now, next one should be: 16
<=========== [Jack] : Great! I got a new magazine: 16
<=========== [Pete] : I'll get another magazine now, next one should be: 16
<=========== [Pete] : Great! I got a new magazine: 16
<=========== [Jack] : I'll get another magazine now, next one should be: 17
<=========== [Jack] : Great! I got a new magazine: 17
<=========== [Pete] : I'll get another magazine now, next one should be: 17
<=========== [Pete] : Great! I got a new magazine: 17
<=========== [Jack] : I'll get another magazine now, next one should be: 18
<=========== [Jack] : Great! I got a new magazine: 18
<=========== [Pete] : I'll get another magazine now, next one should be: 18
<=========== [Pete] : Great! I got a new magazine: 18
<=========== [Jack] : I'll get another magazine now, next one should be: 19
<=========== [Jack] : Great! I got a new magazine: 19
<=========== [Pete] : I'll get another magazine now, next one should be: 19
<=========== [Pete] : Great! I got a new magazine: 19
<=========== [Jack] : I'll get another magazine now, next one should be: 20
<=========== [Jack] : Great! I got a new magazine: 20
<=========== [Pete] : I'll get another magazine now, next one should be: 20
<=========== [Pete] : Great! I got a new magazine: 20
<=========== [Jack] : I'll get another magazine now, next one should be: 21
<=========== [Pete] : I'll get another magazine now, next one should be: 21
<=========== [Pete] : Finally! I completed the subscription, I got in total 20 magazines.
<=========== [Jack] : Finally! I completed the subscription, I got in total 20 magazines.

情况2:

### CASE 2: A slow subscriber, but a good enough buffer size on the publisher's side to keep all items until they're picked up
Printing 20 magazines per subscriber, with room in publisher for 20. They have 2 seconds to consume each magazine.
Offering magazine 1 to consumers
===========> The slowest consumer has 1 magazines in total to be picked up
Offering magazine 2 to consumers
===========> The slowest consumer has 2 magazines in total to be picked up
Offering magazine 3 to consumers
===========> The slowest consumer has 3 magazines in total to be picked up
Offering magazine 4 to consumers
===========> The slowest consumer has 4 magazines in total to be picked up
Offering magazine 5 to consumers
===========> The slowest consumer has 5 magazines in total to be picked up
Offering magazine 6 to consumers
===========> The slowest consumer has 6 magazines in total to be picked up
<=========== [Jack] : Great! I got a new magazine: 1
<=========== [Pete] : Great! I got a new magazine: 1
Offering magazine 7 to consumers
===========> The slowest consumer has 7 magazines in total to be picked up
Offering magazine 8 to consumers
===========> The slowest consumer has 8 magazines in total to be picked up
Offering magazine 9 to consumers
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 10 to consumers
===========> The slowest consumer has 10 magazines in total to be picked up
Offering magazine 11 to consumers
===========> The slowest consumer has 11 magazines in total to be picked up
Offering magazine 12 to consumers
===========> The slowest consumer has 12 magazines in total to be picked up
Offering magazine 13 to consumers
===========> The slowest consumer has 13 magazines in total to be picked up
Offering magazine 14 to consumers
===========> The slowest consumer has 14 magazines in total to be picked up
Offering magazine 15 to consumers
===========> The slowest consumer has 15 magazines in total to be picked up
Offering magazine 16 to consumers
===========> The slowest consumer has 16 magazines in total to be picked up
Offering magazine 17 to consumers
===========> The slowest consumer has 17 magazines in total to be picked up
Offering magazine 18 to consumers
===========> The slowest consumer has 18 magazines in total to be picked up
Offering magazine 19 to consumers
===========> The slowest consumer has 19 magazines in total to be picked up
Offering magazine 20 to consumers
===========> The slowest consumer has 20 magazines in total to be picked up
<=========== [Jack] : I'll get another magazine now, next one should be: 2
<=========== [Jack] : Great! I got a new magazine: 2
<=========== [Jack] : I'll get another magazine now, next one should be: 3
<=========== [Jack] : Great! I got a new magazine: 3
<=========== [Jack] : I'll get another magazine now, next one should be: 4
<=========== [Pete] : I'll get another magazine now, next one should be: 2
<=========== [Jack] : Great! I got a new magazine: 4
<=========== [Pete] : Great! I got a new magazine: 2
<=========== [Jack] : I'll get another magazine now, next one should be: 5
<=========== [Jack] : Great! I got a new magazine: 5
<=========== [Jack] : I'll get another magazine now, next one should be: 6
<=========== [Jack] : Great! I got a new magazine: 6
<=========== [Pete] : I'll get another magazine now, next one should be: 3
<=========== [Jack] : I'll get another magazine now, next one should be: 7
<=========== [Pete] : Great! I got a new magazine: 3
<=========== [Jack] : Great! I got a new magazine: 7
<=========== [Jack] : I'll get another magazine now, next one should be: 8
<=========== [Jack] : Great! I got a new magazine: 8
<=========== [Jack] : I'll get another magazine now, next one should be: 9
<=========== [Jack] : Great! I got a new magazine: 9
<=========== [Jack] : I'll get another magazine now, next one should be: 10
<=========== [Pete] : I'll get another magazine now, next one should be: 4
<=========== [Jack] : Great! I got a new magazine: 10
<=========== [Pete] : Great! I got a new magazine: 4
<=========== [Jack] : I'll get another magazine now, next one should be: 11
<=========== [Jack] : Great! I got a new magazine: 11
<=========== [Jack] : I'll get another magazine now, next one should be: 12
<=========== [Jack] : Great! I got a new magazine: 12
<=========== [Jack] : I'll get another magazine now, next one should be: 13
<=========== [Pete] : I'll get another magazine now, next one should be: 5
<=========== [Jack] : Great! I got a new magazine: 13
<=========== [Pete] : Great! I got a new magazine: 5
<=========== [Jack] : I'll get another magazine now, next one should be: 14
<=========== [Jack] : Great! I got a new magazine: 14
<=========== [Jack] : I'll get another magazine now, next one should be: 15
<=========== [Jack] : Great! I got a new magazine: 15
<=========== [Jack] : I'll get another magazine now, next one should be: 16
<=========== [Pete] : I'll get another magazine now, next one should be: 6
<=========== [Jack] : Great! I got a new magazine: 16
<=========== [Pete] : Great! I got a new magazine: 6
<=========== [Jack] : I'll get another magazine now, next one should be: 17
<=========== [Jack] : Great! I got a new magazine: 17
<=========== [Jack] : I'll get another magazine now, next one should be: 18
<=========== [Jack] : Great! I got a new magazine: 18
<=========== [Jack] : I'll get another magazine now, next one should be: 19
<=========== [Pete] : I'll get another magazine now, next one should be: 7
<=========== [Jack] : Great! I got a new magazine: 19
<=========== [Pete] : Great! I got a new magazine: 7
<=========== [Jack] : I'll get another magazine now, next one should be: 20
<=========== [Jack] : Great! I got a new magazine: 20
<=========== [Jack] : I'll get another magazine now, next one should be: 21
<=========== [Pete] : I'll get another magazine now, next one should be: 8
<=========== [Pete] : Great! I got a new magazine: 8
<=========== [Pete] : I'll get another magazine now, next one should be: 9
<=========== [Pete] : Great! I got a new magazine: 9
<=========== [Pete] : I'll get another magazine now, next one should be: 10
<=========== [Pete] : Great! I got a new magazine: 10
<=========== [Pete] : I'll get another magazine now, next one should be: 11
<=========== [Pete] : Great! I got a new magazine: 11
<=========== [Pete] : I'll get another magazine now, next one should be: 12
<=========== [Pete] : Great! I got a new magazine: 12
<=========== [Pete] : I'll get another magazine now, next one should be: 13
<=========== [Pete] : Great! I got a new magazine: 13
<=========== [Pete] : I'll get another magazine now, next one should be: 14
<=========== [Pete] : Great! I got a new magazine: 14
<=========== [Pete] : I'll get another magazine now, next one should be: 15
<=========== [Pete] : Great! I got a new magazine: 15
<=========== [Pete] : I'll get another magazine now, next one should be: 16
<=========== [Pete] : Great! I got a new magazine: 16
<=========== [Pete] : I'll get another magazine now, next one should be: 17
<=========== [Pete] : Great! I got a new magazine: 17
<=========== [Pete] : I'll get another magazine now, next one should be: 18
<=========== [Pete] : Great! I got a new magazine: 18
<=========== [Pete] : I'll get another magazine now, next one should be: 19
<=========== [Pete] : Great! I got a new magazine: 19
<=========== [Pete] : I'll get another magazine now, next one should be: 20
<=========== [Pete] : Great! I got a new magazine: 20
<=========== [Pete] : I'll get another magazine now, next one should be: 21
<=========== [Jack] : Finally! I completed the subscription, I got in total 20 magazines.
<=========== [Pete] : Finally! I completed the subscription, I got in total 20 magazines.

情况3:

### CASE 3: A slow subscriber, and a very limited buffer size on the publisher's side so it's important to keep the slow subscriber under control
Printing 20 magazines per subscriber, with room in publisher for 8. They have 2 seconds to consume each magazine.
Offering magazine 1 to consumers
===========> The slowest consumer has 1 magazines in total to be picked up
Offering magazine 2 to consumers
<=========== [Jack] : Great! I got a new magazine: 1
===========> The slowest consumer has 2 magazines in total to be picked up
Offering magazine 3 to consumers
<=========== [Pete] : Great! I got a new magazine: 1
===========> The slowest consumer has 3 magazines in total to be picked up
Offering magazine 4 to consumers
===========> The slowest consumer has 4 magazines in total to be picked up
Offering magazine 5 to consumers
===========> The slowest consumer has 5 magazines in total to be picked up
Offering magazine 6 to consumers
===========> The slowest consumer has 6 magazines in total to be picked up
Offering magazine 7 to consumers
===========> The slowest consumer has 7 magazines in total to be picked up
Offering magazine 8 to consumers
===========> The slowest consumer has 8 magazines in total to be picked up
Offering magazine 9 to consumers
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 10 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 2
<=========== [Jack] : Great! I got a new magazine: 2
<=========== [Jack] : I'll get another magazine now, next one should be: 3
<=========== [Jack] : Great! I got a new magazine: 3
<=========== [Jack] : I'll get another magazine now, next one should be: 4
<=========== [Jack] : Great! I got a new magazine: 4
<=========== [Pete] : I'll get another magazine now, next one should be: 2
<=========== [Pete] : Great! I got a new magazine: 2
<=========== [Pete] : Oops I got an error from the Publisher: Hey Pete! You are too slow getting magazines and we don't have more space for them! I'll drop your magazine: 10
===========> Dropping 1 magazines
Offering magazine 11 to consumers
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 12 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 5
<=========== [Jack] : Great! I got a new magazine: 5
<=========== [Jack] : I'll get another magazine now, next one should be: 6
<=========== [Jack] : Great! I got a new magazine: 6
<=========== [Pete] : Oops I got an error from the Publisher: Hey Pete! You are too slow getting magazines and we don't have more space for them! I'll drop your magazine: 12
===========> Dropping 1 magazines
Offering magazine 13 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 7
<=========== [Jack] : Great! I got a new magazine: 7
<=========== [Pete] : I'll get another magazine now, next one should be: 3
<=========== [Pete] : Great! I got a new magazine: 3
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 14 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 8
<=========== [Jack] : Great! I got a new magazine: 8
<=========== [Jack] : I'll get another magazine now, next one should be: 9
<=========== [Jack] : Great! I got a new magazine: 9
<=========== [Pete] : Oops I got an error from the Publisher: Hey Pete! You are too slow getting magazines and we don't have more space for them! I'll drop your magazine: 14
===========> Dropping 1 magazines
Offering magazine 15 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 10
<=========== [Jack] : Great! I got a new magazine: 10
<=========== [Pete] : I'll get another magazine now, next one should be: 4
<=========== [Pete] : Great! I got a new magazine: 4
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 16 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 11
<=========== [Jack] : Great! I got a new magazine: 11
<=========== [Jack] : I'll get another magazine now, next one should be: 12
<=========== [Jack] : Great! I got a new magazine: 12
<=========== [Pete] : Oops I got an error from the Publisher: Hey Pete! You are too slow getting magazines and we don't have more space for them! I'll drop your magazine: 16
===========> Dropping 1 magazines
Offering magazine 17 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 13
<=========== [Jack] : Great! I got a new magazine: 13
<=========== [Pete] : I'll get another magazine now, next one should be: 5
<=========== [Pete] : Great! I got a new magazine: 5
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 18 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 14
<=========== [Jack] : Great! I got a new magazine: 14
<=========== [Jack] : I'll get another magazine now, next one should be: 15
<=========== [Jack] : Great! I got a new magazine: 15
<=========== [Pete] : Oops I got an error from the Publisher: Hey Pete! You are too slow getting magazines and we don't have more space for them! I'll drop your magazine: 18
===========> Dropping 1 magazines
Offering magazine 19 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 16
<=========== [Jack] : Great! I got a new magazine: 16
<=========== [Pete] : I'll get another magazine now, next one should be: 6
<=========== [Pete] : Great! I got a new magazine: 6
===========> The slowest consumer has 9 magazines in total to be picked up
Offering magazine 20 to consumers
<=========== [Jack] : I'll get another magazine now, next one should be: 17
<=========== [Jack] : Great! I got a new magazine: 17
<=========== [Jack] : I'll get another magazine now, next one should be: 18
<=========== [Jack] : Great! I got a new magazine: 18
<=========== [Pete] : Oops I got an error from the Publisher: Hey Pete! You are too slow getting magazines and we don't have more space for them! I'll drop your magazine: 20
===========> Dropping 1 magazines
<=========== [Jack] : I'll get another magazine now, next one should be: 19
<=========== [Jack] : Great! I got a new magazine: 19
<=========== [Pete] : I'll get another magazine now, next one should be: 7
<=========== [Pete] : Great! I got a new magazine: 7
<=========== [Jack] : I'll get another magazine now, next one should be: 20
<=========== [Jack] : Great! I got a new magazine: 20
<=========== [Jack] : I'll get another magazine now, next one should be: 21
<=========== [Pete] : I'll get another magazine now, next one should be: 8
<=========== [Pete] : Great! I got a new magazine: 8
<=========== [Pete] : I'll get another magazine now, next one should be: 9
<=========== [Pete] : Great! I got a new magazine: 9
<=========== [Pete] : I'll get another magazine now, next one should be: 10
<=========== [Pete] : Oh no! I missed the magazine 10
<=========== [Pete] : Great! I got a new magazine: 11
<=========== [Pete] : I'll get another magazine now, next one should be: 12
<=========== [Pete] : Oh no! I missed the magazine 12
<=========== [Pete] : Great! I got a new magazine: 13
<=========== [Pete] : I'll get another magazine now, next one should be: 14
<=========== [Pete] : Oh no! I missed the magazine 14
<=========== [Pete] : Great! I got a new magazine: 15
<=========== [Pete] : I'll get another magazine now, next one should be: 16
<=========== [Pete] : Oh no! I missed the magazine 16
<=========== [Pete] : Great! I got a new magazine: 17
<=========== [Pete] : I'll get another magazine now, next one should be: 18
<=========== [Pete] : Oh no! I missed the magazine 18
<=========== [Pete] : Great! I got a new magazine: 19
<=========== [Pete] : I'll get another magazine now, next one should be: 20
<=========== [Pete] : Finally! I completed the subscription, I got in total 14 magazines.
<=========== [Jack] : Finally! I completed the subscription, I got in total 20 magazines.

参考

相关文章

  • 【Kotlin · Flow】 入门简介 and 实战举例 ·

    What Flow 是 kotlin 官方基于协程构建的,用于响应式编程的API 响应式编程简单来说就是使用异步数...

  • 响应式编程——Flow API

    1.官方文档 用于建立流控制组件的相互关联的接口和静态方法,其中Publishers生产由一个或多个Subscri...

  • JDK 9 Flow

    响应式编程 Flow 是支持响应式编程的一套规范,里面定义了一整套需要实现的接口 调用关系图如下: 使用实例: 定...

  • RxSwift初探(1)

    一、前提:函数响应式编程思想 简单来说 函数响应式编程 = 函数式编程 + 响应式编程 (1)函数式 函数式编程是...

  • 1、java9

    JDK9中的Flow API对应响应式流规范,响应式流规范是一种事实标准。JEP 266包含了一组最小接口集合,这...

  • RxJava/RxAndroid 使用实例实践

    原文地址 RxAndroid Tutorial响应式编程(Reactive programming)不是一种API...

  • Swift5.0 - day13 - 响应式编程

    一、响应式编程 1.1、响应式编程(Reactive Programming,简称RP)响应式编程是一种编程范式,...

  • RxJava

    响应式编程概述 什么是响应式编程? 是一种基于异步数据流概述的编程模式 响应式编程--关键概念 事件 响应式编程-...

  • iOS-Main-响应式编程的介绍&MVVM的特点

    响应式编程响应式编程 MVVM的特点

  • 动脑学院Rxjava预习资料 RxJava2 响应式编程介绍

    RxJava2 响应式编程介绍 响应式编程&RxJava基本概念响应式宣言响应式扩展响应式流规范 RxJava 基...

网友评论

      本文标题:响应式编程——Flow API

      本文链接:https://www.haomeiwen.com/subject/tzicrctx.html