美文网首页Reactor
[reactor-02]flux-just,join

[reactor-02]flux-just,join

作者: 随风风筝 | 来源:发表于2019-02-13 18:11 被阅读0次

    控制复杂性是计算机编程的本质。

    just

    1. 创建只生产一个产品的Publisher
    public static <T> Flux<T> just(T data) 
    
        public void test() {
            Flux<Integer> just = Flux.just(11);
            just.subscribe(System.out::println); // 输出11
        }
    

    或者在多线程中进行request,就像中奖一样,只有一个1等奖,抽奖的人有很多.
    举例如下:

    public class JustTest1 {
    
    
        public void test() {
            Flux<Integer> just = Flux.just(11);
            BaseSubscriber<Integer> subscriber = new BaseSubscriber<Integer>() {
    
    
                @Override
                protected void hookOnNext(Integer value) {
                    System.out.println("我在三亚玩完了,现在带我离开:" + value);
                }
    
                @Override
                protected void hookOnSubscribe(Subscription subscription) {
    
                }
            };
            just.subscribe(integer -> {
                Common.log("我立即需要你,来吧:", integer);
            });
            just.subscribe(subscriber);
    
    
            new Thread(() -> {
                try {
                    Thread.sleep(3000);
                    Common.log("我在这里", Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 现在需要你了,给我10个
                subscriber.request(10);
    
            }, "三亚").start();
    
        }
    
    
        public static void main(String[] args) throws InterruptedException {
            JustTest1 test = new JustTest1();
            test.test();
    
            Thread.sleep(5000);
    
        }
    }
    
    

    1. 创建多个固定产品的just


    public static <T> Flux<T> just(T... data) 
    
    public class JustTest {
    
    
        private  int count;
        private final int size = 20000000;
        int threadCount = 40;
    
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    
        public void run() {
    
            BaseSubscriber<Integer> base = new BaseSubscriber<Integer>() {
    
                @Override
                protected void hookOnNext(Integer value) {
                    count += value;
                }
    
                @Override
                protected void hookOnSubscribe(Subscription subscription) {
    
                }
            };
            Integer[] list = new Integer[size];
            for (int i = 0; i < size; i++) {
                list[i] = 1;
            }
    
            Flux<Integer> integerFlux = Flux.just(list);
    
            integerFlux.subscribe(base);
    
    
            for (int i = 0; i < threadCount; i++) {
                final int k = i;
                new Thread(() -> {
                    for (int j = 0; j < size / threadCount; j++) {
    
                        base.request(1000);
    
                    }
    
                    countDownLatch.countDown();
                }).start();
    
            }
        }
    
    
        public static void main(String[] args) throws InterruptedException {
            JustTest test = new JustTest();
            test.run();
            test.countDownLatch.await();
            System.out.println(test.count);
    
            System.out.println("end");
        }
    }
    
    

    join

    根据2个生产者合成一个元素,并给消费者消费


    public final <TRight, TLeftEnd, TRightEnd, R> Flux<R> join(
                Publisher<? extends TRight> other,
                Function<? super T, ? extends Publisher<TLeftEnd>> leftEnd,
                Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd,
                BiFunction<? super T, ? super TRight, ? extends R> resultSelector
        )
    

    看看这方法签名,实在是啰嗦.

    在上面的图中, 绿色产品是source发出, 红色产品是other发出.
    下面是具体流程

    1. 生产1, 输出无
    2. 生产A, 输出resultSelector.apply(1,A)
    3. leftEnd 取消 1
    4. 生产2, 输出resultSelector.apply(2,A)
    5. rightEnd取消A
    6. 生产3
    7. leftEnd 取消 2
    8. 生产B, 输出resultSelector.apply(3,B)
    9. leftEnd 取消 3
    10. 生产4 输出resultSelector.apply(4,B)
    11. rightEnd取消B
    12. 生产C, 输出resultSelector.apply(4,C)
    13. source结束生产
    14. other结束生产

    例:

    public class JoinTest {
    
        static final BiFunction<Integer, Integer, Integer> add = (t1, t2) -> t1 + t2;
    
        static <T> Function<Integer, Flux<T>> just(final Flux<T> publisher) {
            return t1 -> publisher;
        }
    
        public static void main(String[] args) {
            DirectProcessor<Integer> source1 = DirectProcessor.create();
            DirectProcessor<String> source2 = DirectProcessor.create();
            DirectProcessor<Integer> duration1 = DirectProcessor.create();
            DirectProcessor<Integer> duration2 = DirectProcessor.create();
            Flux<String> m =
                    source1.join(source2, just(duration1), t1 -> duration2, (integer, tRight) -> integer + "," + tRight);
    
            m.subscribe(System.out::println);
    
            source1.onNext(1);
            source2.onNext("A");
            duration1.onNext(1);
            source1.onNext(2);
            duration2.onNext(1);
            duration1.onNext(1);
            source1.onNext(3);
            source2.onNext("B");
            duration1.onNext(1);
            source1.onNext(4);
            source2.onNext("C");
            source1.onComplete();
            duration1.onNext(1);
            source2.onComplete();
            duration2.onNext(1);
        }
    }
    

    欢迎关注微信公众号

    相关文章

      网友评论

        本文标题:[reactor-02]flux-just,join

        本文链接:https://www.haomeiwen.com/subject/zrndsqtx.html