我是用来组合的操作符------Buffer

作者: javalong | 来源:发表于2018-04-17 20:54 被阅读9次
    Buffer操作符功能介绍

    如果本来是Observable<String>的一个对象,那么我只能在onNext中一个一个处理String,通过Buffer操作符,我们可以把nString组合在一起,然后在onNext中进行处理

    用途

    可以操作一些需要一起处理的数据。比如2个数据为一组,我始终要打印最大的数据,或者3个数据为一组,打印最大值。

    来一段代码
     btSub.setOnClickListener({
                observable?.subscribe({ msg ->
                    tvContent.text = tvContent.text.toString() + "\n" + msg.toString()
                })
            })
            observable = Observable.just("test1","test2","test3").buffer(2)
    

    代码的作用就是每2个字符串 打印在一行文本中。

    先来学习下just是什么鬼
     Observable.create(object : Observable.OnSubscribe<String> {
                override fun call(t: Subscriber<in String>) {
                    t.onNext("test1")
                    t.onNext("test2")
                    t.onNext("test3")
                }
            })
    ....
     Observable.just("test1","test2","test3")
    

    这2行代码的效果其实是一致的(当我们,深入源代码的时候,其实会发现有一些不同,但是我们现在先一个个学习操作符,然后再会过来看这些,其实会非常的简单)
    这里我就不对just进行深入了,主要以操作符为主,如果感兴趣的可以自己看看。

    看看源代码

    Observable

    ...
    public final Observable<List<T>> buffer(int count) {
            return buffer(count, count);
        }
    ...
     public final Observable<List<T>> buffer(int count, int skip) {
            return lift(new OperatorBufferWithSize<T>(count, skip));
        }
    ...
     public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
        }
    ...
    

    这里我们看到了一个很牛逼的东西,也就是lift,在前言中,其实我也给大家推荐了一篇关于RxJava的文章,写的很经典,大家最好先看看。

    既然提到了lift那么就不得不提到Operator接口了。前面其实我也已经介绍过,RxJava中最重要的4个类或接口,如果忘了,或者不太了解的,大家可以再去看下那
    篇文章我是最简单的操作符-----Create

     public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
            // cover for generics insanity
        }
    
    public interface Func1<T, R> extends Function {
        R call(T t);
    }
    

    这段代码大家仔细一看就发现,其实Operator也是一个Func1嘛,只有一个call接口,用来把一个Subscriber转化成了另外一个Subscriber

    既然已经了解了这Operator,那么我们继续看lift到底做了什么。一步步看代码。

    前面我介绍了一种方法去阅读源代码,就是我们已经知道了它的功能,就带着他这个功能去阅读源码,这里也是适用的。

    以我开头的demo代码为例子,其实就是把Subscriber<List<String>>转化为了Subscriber<String>

    这里其实我自己理解起来都非常困难,为什么是把Subscriber<List<String>>转化为了Subscriber<String>而不是把Subscriber<String>转化为了Subscriber<List<String>>

    先不解释,先直接看OperatorBufferWithSize的代码

    OperatorBufferWithSize

    ...
    public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
    ...
    }
    ...
    

    因为我们最习惯的思考逻辑就是从上到下,从左到右,因为一开始我们是Test1Test2Test3,然后呢组成,[Test1,Test2][Test3]这样子去处理,所以很正常的理解为把Subscriber<String>转化为了Subscriber<List<String>>。这也是为什么我觉得RxJava源码看起来比较累的原因。

    前言中介绍的那篇经典的文章中就有一幅图,我就不引用了,大家可以自己去看一下,RxJava的运行顺序是先下再上,再从上到下。我们直接继续从源码中找寻答案把。

    Observable

    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
        }
    

    OnSubscribeLift

    ...
        public void call(Subscriber<? super R> o) {
            try {
                Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
                try {
                    st.onStart();
                    parent.call(st);
                }
    ...
        }
    

    RxJavaHooks我就不过多介绍了,在前几篇文章中已经有所解释,所以以上代码简化为

    ...
        public void call(Subscriber<? super R> o) {
            try {
                Subscriber<? super T> st = operator.call(o);
                try {
                    st.onStart();
                    parent.call(st);
                }
    ...
        }
    

    当我们点击demo中的按钮,就是直接先调用上面的call方法。这里的parent其实是Observable中传入的onSubscribe,其实也就是前面所提到的

    Observable.OnSubscribe<String> {
                override fun call(t: Subscriber<in String>) {
                    t.onNext("test1")
                    t.onNext("test2")
                    t.onNext("test3")
                }
            }
    

    所以直接看到这里,其实大家应该可以看出来为什么是把Subscriber<List<String>>转化为Subscriber<String>了。大家再来重新看这段代码。

    OnSubscribeLift

    ...
       public void call(Subscriber<? super R> o) {
            try {
                Subscriber<? super T> st = operator.call(o);
                try {
                    st.onStart();
                    parent.call(st);
                }
    ...
    

    这里的参数Subscriber<? super R> o,就是我们在demo中subscribe订阅方法中传入的Subscriber。所以R其实就是List<String>,而在call方法中最终调用的是parent.call所传的参数是Subscriber<String>。调用的是下面这个方法

    Observable.OnSubscribe<String> {
                override fun call(t: Subscriber<in String>) {
                    t.onNext("test1")
                    t.onNext("test2")
                    t.onNext("test3")
                }
            }
    

    所以我说,RxJava的执行方式呢,最后面的OnSubscribecall先执行,然后再调用前面的OnSubscribe,最前面的OnSubscribe开始调用onNext方法,再一步一步往下执行。

    这里我们对整个方法的执行流程有了一个大致的了解,但是还没有开始去解析buffer操作符的真正实现的代码。

    下面我们还是继续看

    OperatorBufferWithSize

    ...
     public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
            if (skip == count) {
                BufferExact<T> parent = new BufferExact<T>(child, count);
                ...
                return parent;
            }
            if (skip > count) {
                BufferSkip<T> parent = new BufferSkip<T>(child, count, skip);
                ...
                return parent;
            }
            BufferOverlap<T> parent = new BufferOverlap<T>(child, count, skip);
            ...
            return parent;
        }
    ...
    

    这里非常明显的分为了3中情况,skip代表是忽略几个元素,count代表几个元素为1组。什么意思呢?
    我们这里直接举个例子。
    Observable.just("Test1","Test2","Test3","Test4","Test5")
    元素多一点,大家好理解,那么我skip=2,count=2,那么就会被分为3组,"[Test1,Test2]","[Test3,Test4]","[Test5]"。如果skip=3,count=2,那么就会被分为2组,"[Test1,Test2]","[Test4,Test5]"。

    大家自己尝试下,我就不举例了,我就想说明,这里分了3种情况做了不同处理,其实没什么关系,我们只需看明白一种就好了,其他2种也是大同小异的。我们的demo中
    observable = Observable.just("test1", "test2", "test3").buffer(2)只传了一个2,就是skip==2&&count==2

    所以我们直接看

    ...
      if (skip == count) {
                BufferExact<T> parent = new BufferExact<T>(child, count);
                ...
                return parent;
            }
    ...
    static final class BufferExact<T> extends Subscriber<T> {
            final Subscriber<? super List<T>> actual;
            final int count;
    
            List<T> buffer;
    
            public BufferExact(Subscriber<? super List<T>> actual, int count) {
                this.actual = actual;
                this.count = count;
                this.request(0L);
            }
    
            @Override
            public void onNext(T t) {
                List<T> b = buffer;
                if (b == null) {
                    b = new ArrayList<T>(count);
                    buffer = b;
                }
    
                b.add(t);
    
                if (b.size() == count) {
                    buffer = null;
                    actual.onNext(b);
                }
            }
    
    ...
    

    前面介绍了Operator就是为了把 Subscriber<List<String>>转为 Subscriber<String>,而这里的BufferExact就是这个Subscriber<String>
    所以我们最主要的还是看onNext的实现。

    public void onNext(T t) {
                List<T> b = buffer;
                if (b == null) {
                    b = new ArrayList<T>(count);
                    buffer = b;
                }
    
                b.add(t);
    
                if (b.size() == count) {
                    buffer = null;
                    actual.onNext(b);
                }
            }
    

    实现非常的简单,先把onNext传递过来的数据先保存在 List中,等凑够了count数量后,在调用后面的Subscriber<List<String>>

    所以总的来说buffer操作符的实现是简单的,但是由于涉及到了liftOperator所以整体看起来还是比较复杂的。大家可以自己琢磨。

    附加

    前面直接说

     Observable.create(object : Observable.OnSubscribe<String> {
                override fun call(t: Subscriber<in String>) {
                    t.onNext("test1")
                    t.onNext("test2")
                    t.onNext("test3")
                }
            })
    Observable.just("test1", "test2", "test3")
    

    这2个是差不多的。其实我们可以再深入下。

    Observable

    ...
    public static <T> Observable<T> just(T t1, T t2, T t3) {
            return from((T[])new Object[] { t1, t2, t3 });
        }
    ...
    public static <T> Observable<T> from(T[] array) {
            int n = array.length;
            if (n == 0) {
                return empty();
            } else
            if (n == 1) {
                return just(array[0]);
            }
            return unsafeCreate(new OnSubscribeFromArray<T>(array));
        }
    ...
    

    其实就是创建了一个OnSubscribeFromArray,也就是一个OnSubscribe。继续深入

    OnSubscribeFromArray

    ...
    public void call(Subscriber<? super T> child) {
            child.setProducer(new FromArrayProducer<T>(child, array));
        }
    ..
    static final class FromArrayProducer<T>
        extends AtomicLong
        implements Producer {
         ...
            @Override
            public void request(long n) {
                if (n < 0) {
                    throw new IllegalArgumentException("n >= 0 required but it was " + n);
                }
                if (n == Long.MAX_VALUE) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        fastPath();
                    }
                } else
                if (n != 0) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        slowPath(n);
                    }
                }
            }
    ...
    
    

    如果是OnSubscribe其实我们主要就是看一个call方法,这里是直接child.setProducer,前面也介绍过,setProducer其实无法是直接调用Producerrequest的方法。最后我们发现其实主要就看2个方法fastPathslowPath

    ...
     void fastPath() {
                final Subscriber<? super T> child = this.child;
    
                for (T t : array) {
                    if (child.isUnsubscribed()) {
                        return;
                    }
    
                    child.onNext(t);
                }
    
                if (child.isUnsubscribed()) {
                    return;
                }
                child.onCompleted();
            }
    ...
    void slowPath(long r) {
                final Subscriber<? super T> child = this.child;
                final T[] array = this.array;
                final int n = array.length;
    
                long e = 0L;
                int i = index;
    
                for (;;) {
    
                    while (r != 0L && i != n) {
                        if (child.isUnsubscribed()) {
                            return;
                        }
    
                        child.onNext(array[i]);
    
                        i++;
    
                        if (i == n) {
                            if (!child.isUnsubscribed()) {
                                child.onCompleted();
                            }
                            return;
                        }
    
                        r--;
                        e--;
                    }
    
                    r = get() + e;
    
                    if (r == 0L) {
                        index = i;
                        r = addAndGet(e);
                        if (r == 0L) {
                            return;
                        }
                        e = 0L;
                    }
                }
            }
    ...
    

    其实直接看fastPath会更加清爽一点,就是遍历数组,然后挨个调用onNext,那么数组是什么呢,就是前面传入的"test1", "test2", "test3"

    至于slowPath呢其实关键代码也是onNext。可以先不深入去理解它具体的意思,其实看名字就可以看出来,一个是快速处理,一个是慢慢处理。

    总结

    整体下来我也已经尽量涉及到了每行代码,但是中间的跳转确实会让很多人发懵,如果觉得哪里讲的还不够清楚,可以指出,我可以修正。

    相关文章

      网友评论

        本文标题:我是用来组合的操作符------Buffer

        本文链接:https://www.haomeiwen.com/subject/iryekftx.html