我是用来组合的操作符------Buffer

作者: javalong | 来源:发表于2018-04-17 20:54 被阅读9次
Buffer操作符功能介绍

如果本来是Observable<String>的一个对象,那么我只能在onNext中一个一个处理String,通过Buffer操作符,我们可以把nString组合在一起,然后在onNext中进行处理

用途

可以操作一些需要一起处理的数据。比如2个数据为一组,我始终要打印最大的数据,或者3个数据为一组,打印最大值。

来一段代码
 btSub.setOnClickListener({
            observable?.subscribe({ msg ->
                tvContent.text = tvContent.text.toString() + "\n" + msg.toString()
            })
        })
        observable = Observable.just("test1","test2","test3").buffer(2)

代码的作用就是每2个字符串 打印在一行文本中。

先来学习下just是什么鬼
 Observable.create(object : Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("test1")
                t.onNext("test2")
                t.onNext("test3")
            }
        })
....
 Observable.just("test1","test2","test3")

这2行代码的效果其实是一致的(当我们,深入源代码的时候,其实会发现有一些不同,但是我们现在先一个个学习操作符,然后再会过来看这些,其实会非常的简单)
这里我就不对just进行深入了,主要以操作符为主,如果感兴趣的可以自己看看。

看看源代码

Observable

...
public final Observable<List<T>> buffer(int count) {
        return buffer(count, count);
    }
...
 public final Observable<List<T>> buffer(int count, int skip) {
        return lift(new OperatorBufferWithSize<T>(count, skip));
    }
...
 public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }
...

这里我们看到了一个很牛逼的东西,也就是lift,在前言中,其实我也给大家推荐了一篇关于RxJava的文章,写的很经典,大家最好先看看。

既然提到了lift那么就不得不提到Operator接口了。前面其实我也已经介绍过,RxJava中最重要的4个类或接口,如果忘了,或者不太了解的,大家可以再去看下那
篇文章我是最简单的操作符-----Create

 public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }

public interface Func1<T, R> extends Function {
    R call(T t);
}

这段代码大家仔细一看就发现,其实Operator也是一个Func1嘛,只有一个call接口,用来把一个Subscriber转化成了另外一个Subscriber

既然已经了解了这Operator,那么我们继续看lift到底做了什么。一步步看代码。

前面我介绍了一种方法去阅读源代码,就是我们已经知道了它的功能,就带着他这个功能去阅读源码,这里也是适用的。

以我开头的demo代码为例子,其实就是把Subscriber<List<String>>转化为了Subscriber<String>

这里其实我自己理解起来都非常困难,为什么是把Subscriber<List<String>>转化为了Subscriber<String>而不是把Subscriber<String>转化为了Subscriber<List<String>>

先不解释,先直接看OperatorBufferWithSize的代码

OperatorBufferWithSize

...
public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
...
}
...

因为我们最习惯的思考逻辑就是从上到下,从左到右,因为一开始我们是Test1Test2Test3,然后呢组成,[Test1,Test2][Test3]这样子去处理,所以很正常的理解为把Subscriber<String>转化为了Subscriber<List<String>>。这也是为什么我觉得RxJava源码看起来比较累的原因。

前言中介绍的那篇经典的文章中就有一幅图,我就不引用了,大家可以自己去看一下,RxJava的运行顺序是先下再上,再从上到下。我们直接继续从源码中找寻答案把。

Observable

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }

OnSubscribeLift

...
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                st.onStart();
                parent.call(st);
            }
...
    }

RxJavaHooks我就不过多介绍了,在前几篇文章中已经有所解释,所以以上代码简化为

...
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = operator.call(o);
            try {
                st.onStart();
                parent.call(st);
            }
...
    }

当我们点击demo中的按钮,就是直接先调用上面的call方法。这里的parent其实是Observable中传入的onSubscribe,其实也就是前面所提到的

Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("test1")
                t.onNext("test2")
                t.onNext("test3")
            }
        }

所以直接看到这里,其实大家应该可以看出来为什么是把Subscriber<List<String>>转化为Subscriber<String>了。大家再来重新看这段代码。

OnSubscribeLift

...
   public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = operator.call(o);
            try {
                st.onStart();
                parent.call(st);
            }
...

这里的参数Subscriber<? super R> o,就是我们在demo中subscribe订阅方法中传入的Subscriber。所以R其实就是List<String>,而在call方法中最终调用的是parent.call所传的参数是Subscriber<String>。调用的是下面这个方法

Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("test1")
                t.onNext("test2")
                t.onNext("test3")
            }
        }

所以我说,RxJava的执行方式呢,最后面的OnSubscribecall先执行,然后再调用前面的OnSubscribe,最前面的OnSubscribe开始调用onNext方法,再一步一步往下执行。

这里我们对整个方法的执行流程有了一个大致的了解,但是还没有开始去解析buffer操作符的真正实现的代码。

下面我们还是继续看

OperatorBufferWithSize

...
 public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
        if (skip == count) {
            BufferExact<T> parent = new BufferExact<T>(child, count);
            ...
            return parent;
        }
        if (skip > count) {
            BufferSkip<T> parent = new BufferSkip<T>(child, count, skip);
            ...
            return parent;
        }
        BufferOverlap<T> parent = new BufferOverlap<T>(child, count, skip);
        ...
        return parent;
    }
...

这里非常明显的分为了3中情况,skip代表是忽略几个元素,count代表几个元素为1组。什么意思呢?
我们这里直接举个例子。
Observable.just("Test1","Test2","Test3","Test4","Test5")
元素多一点,大家好理解,那么我skip=2,count=2,那么就会被分为3组,"[Test1,Test2]","[Test3,Test4]","[Test5]"。如果skip=3,count=2,那么就会被分为2组,"[Test1,Test2]","[Test4,Test5]"。

大家自己尝试下,我就不举例了,我就想说明,这里分了3种情况做了不同处理,其实没什么关系,我们只需看明白一种就好了,其他2种也是大同小异的。我们的demo中
observable = Observable.just("test1", "test2", "test3").buffer(2)只传了一个2,就是skip==2&&count==2

所以我们直接看

...
  if (skip == count) {
            BufferExact<T> parent = new BufferExact<T>(child, count);
            ...
            return parent;
        }
...
static final class BufferExact<T> extends Subscriber<T> {
        final Subscriber<? super List<T>> actual;
        final int count;

        List<T> buffer;

        public BufferExact(Subscriber<? super List<T>> actual, int count) {
            this.actual = actual;
            this.count = count;
            this.request(0L);
        }

        @Override
        public void onNext(T t) {
            List<T> b = buffer;
            if (b == null) {
                b = new ArrayList<T>(count);
                buffer = b;
            }

            b.add(t);

            if (b.size() == count) {
                buffer = null;
                actual.onNext(b);
            }
        }

...

前面介绍了Operator就是为了把 Subscriber<List<String>>转为 Subscriber<String>,而这里的BufferExact就是这个Subscriber<String>
所以我们最主要的还是看onNext的实现。

public void onNext(T t) {
            List<T> b = buffer;
            if (b == null) {
                b = new ArrayList<T>(count);
                buffer = b;
            }

            b.add(t);

            if (b.size() == count) {
                buffer = null;
                actual.onNext(b);
            }
        }

实现非常的简单,先把onNext传递过来的数据先保存在 List中,等凑够了count数量后,在调用后面的Subscriber<List<String>>

所以总的来说buffer操作符的实现是简单的,但是由于涉及到了liftOperator所以整体看起来还是比较复杂的。大家可以自己琢磨。

附加

前面直接说

 Observable.create(object : Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("test1")
                t.onNext("test2")
                t.onNext("test3")
            }
        })
Observable.just("test1", "test2", "test3")

这2个是差不多的。其实我们可以再深入下。

Observable

...
public static <T> Observable<T> just(T t1, T t2, T t3) {
        return from((T[])new Object[] { t1, t2, t3 });
    }
...
public static <T> Observable<T> from(T[] array) {
        int n = array.length;
        if (n == 0) {
            return empty();
        } else
        if (n == 1) {
            return just(array[0]);
        }
        return unsafeCreate(new OnSubscribeFromArray<T>(array));
    }
...

其实就是创建了一个OnSubscribeFromArray,也就是一个OnSubscribe。继续深入

OnSubscribeFromArray

...
public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }
..
static final class FromArrayProducer<T>
    extends AtomicLong
    implements Producer {
     ...
        @Override
        public void request(long n) {
            if (n < 0) {
                throw new IllegalArgumentException("n >= 0 required but it was " + n);
            }
            if (n == Long.MAX_VALUE) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    fastPath();
                }
            } else
            if (n != 0) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    slowPath(n);
                }
            }
        }
...

如果是OnSubscribe其实我们主要就是看一个call方法,这里是直接child.setProducer,前面也介绍过,setProducer其实无法是直接调用Producerrequest的方法。最后我们发现其实主要就看2个方法fastPathslowPath

...
 void fastPath() {
            final Subscriber<? super T> child = this.child;

            for (T t : array) {
                if (child.isUnsubscribed()) {
                    return;
                }

                child.onNext(t);
            }

            if (child.isUnsubscribed()) {
                return;
            }
            child.onCompleted();
        }
...
void slowPath(long r) {
            final Subscriber<? super T> child = this.child;
            final T[] array = this.array;
            final int n = array.length;

            long e = 0L;
            int i = index;

            for (;;) {

                while (r != 0L && i != n) {
                    if (child.isUnsubscribed()) {
                        return;
                    }

                    child.onNext(array[i]);

                    i++;

                    if (i == n) {
                        if (!child.isUnsubscribed()) {
                            child.onCompleted();
                        }
                        return;
                    }

                    r--;
                    e--;
                }

                r = get() + e;

                if (r == 0L) {
                    index = i;
                    r = addAndGet(e);
                    if (r == 0L) {
                        return;
                    }
                    e = 0L;
                }
            }
        }
...

其实直接看fastPath会更加清爽一点,就是遍历数组,然后挨个调用onNext,那么数组是什么呢,就是前面传入的"test1", "test2", "test3"

至于slowPath呢其实关键代码也是onNext。可以先不深入去理解它具体的意思,其实看名字就可以看出来,一个是快速处理,一个是慢慢处理。

总结

整体下来我也已经尽量涉及到了每行代码,但是中间的跳转确实会让很多人发懵,如果觉得哪里讲的还不够清楚,可以指出,我可以修正。

相关文章

  • 我是用来组合的操作符------Buffer

    Buffer操作符功能介绍 如果本来是Observable 的一个对象,那么我只能在onNext中一个一个处理St...

  • RxJava Operator之变换型操作符

    RxJava变换型操作符 buffer buffer操作符 定期将来自Observable的数据分拆成一些Obse...

  • RxJava操作符之组合操作符(六)

    前言 上一篇文章我们学习了过滤类操作符,本篇我们将一起来学习RxJava组合类操作符。组合操作符主要是用来同时处理...

  • 转换操作符

    buffer操作符 buffer操作符周期性地收集源Observable产生的结果到列表中,并把这个列表提交给订阅...

  • RXJava操作符-转换操作符

    一.buffer操作符 buffer操作符周期性地收集源Observable产生的结果到列表中,并把这个列表提交给...

  • kotlin flow (二)

    Flow操作符 buffer(int) 该操作符会新起一个协程来收集buffer之前的代码运行结果,新协程通过ch...

  • 一篇RxJava友好的文章(三)

    继上一篇讲述了过滤操作符,这一篇讲述组合操作符,组合操作符可用于组合多个Observable。组合操作符相对于过滤...

  • Rxjava讲解(2)

    上面文章讲过创建操作符, 转换操作符,过滤操作符, 这篇文字介绍组合操作符,条件操作符,功能操作符。 组合操作符 ...

  • RxJava(三)--变换操作符

    常用变换操作符 map、flatMap、concatMap、flatMapIterable、buffer、grou...

  • RxBinding结合RxJava的操作符和一些应用场景

    RxBinding的使用场景 1.防止按钮重复点击,在1秒内只能响应点击一次 2.Buffer操作符,可以用来计算...

网友评论

    本文标题:我是用来组合的操作符------Buffer

    本文链接:https://www.haomeiwen.com/subject/iryekftx.html