美文网首页Android-Rxjava&retrofit&daggerRxJavaRxJava
Rxjava2~from****那么多方法~学渣带你扣rxjav

Rxjava2~from****那么多方法~学渣带你扣rxjav

作者: 品味与回味 | 来源:发表于2017-06-23 18:05 被阅读1331次

    我都不知道自己哪里来的毅力。死扣这几个方法 但是 我想在就是想。 这几个方法 扣到哪里算哪里。今天没完成,会在转天继续完成。我会一直补充

    Paste_Image.png

    fromCallable

    Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then emits the value returned from that function.

    This allows you to defer the execution of the function you specify until an observer subscribes to the ObservableSource. That is to say, it makes the function "lazy."

    这是官网的解释。

    Calls a Callable and emits its resulting single value or signals its exception.
    这句话是对
    ObservableFromCallable的描述

    我们现不解释 先来看来那个例子
    <pre>

        Observable.fromCallable(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                return Observable.just("one", "two", "three", "four", "five");
            }
        }).subscribe( getSubscriber() );
    
        Observable.defer(new Callable<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> call() throws Exception {
                // Do some long running operation
                SystemClock.sleep(2000);
                return Observable.just("one", "two", "three", "four", "five");
            }
        }).subscribe( getSubscriber() );
    
    
    
     Observer<Object> getSubscriber() {
        return new Observer<Object>() {
    
            @Override
            public void onError(Throwable e) {
                Log.i("RxJava", "onError : " + e.toString());
            }
    
            @Override
            public void onComplete() {
                Log.i("RxJava", "onCompleted");
    
            }
    
            @Override
            public void onSubscribe(Disposable d) {
    
            }
    
            @Override
            public void onNext(Object o) {
                Log.i("RxJava", "onNext : " + o);
                Log.i("RxJava", Thread.currentThread().getName());
            }
        };
    }
    

    </pre>

    <pre>
    06-23 08:55:53.313 26651-26651/com.rxjava2.android.samples I/RxJava: onNext : io.reactivex.internal.operators.observable.ObservableFromArray@39ee6818
    06-23 08:55:53.313 26651-26651/com.rxjava2.android.samples I/RxJava: main
    06-23 08:55:53.313 26651-26651/com.rxjava2.android.samples I/RxJava: onCompleted
    06-23 08:55:55.315 26651-26651/com.rxjava2.android.samples I/RxJava: onNext : one
    06-23 08:55:55.315 26651-26651/com.rxjava2.android.samples I/RxJava: main
    06-23 08:55:55.315 26651-26651/com.rxjava2.android.samples I/RxJava: onNext : two
    06-23 08:55:55.315 26651-26651/com.rxjava2.android.samples I/RxJava: main
    06-23 08:55:55.315 26651-26651/com.rxjava2.android.samples I/RxJava: onNext : three
    06-23 08:55:55.315 26651-26651/com.rxjava2.android.samples I/RxJava: main
    06-23 08:55:55.315 26651-26651/com.rxjava2.android.samples I/RxJava: onNext : four
    06-23 08:55:55.315 26651-26651/com.rxjava2.android.samples I/RxJava: main
    06-23 08:55:55.315 26651-26651/com.rxjava2.android.samples I/RxJava: onNext : five
    06-23 08:55:55.315 26651-26651/com.rxjava2.android.samples I/RxJava: main
    06-23 08:55:55.315 26651-26651/com.rxjava2.android.samples I/RxJava: onCompleted
    </pre>

    可以看出来来什么?fromCallable的接口直接传过来的是一个对象,defer根据对象个的个人分别传送。这是因为什么呢? 我们往下看

    这是ObservableDefer的subscribeActual
    <pre>
    public void subscribeActual(Observer<? super T> s) {
    ObservableSource<? extends T> pub;
    try {
    pub = ObjectHelper.requireNonNull(supplier.call(), "null publisher supplied");
    } catch (Throwable t) {
    Exceptions.throwIfFatal(t);
    EmptyDisposable.error(t, s);
    return;
    }

        pub.subscribe(s);
    }
    

    </pre>
    我们之前分析过pub.subscribe(s);这方法决定了 。 你call的值还是要按照Observable对象的创建方式去别人调用onnext这些方法。

    这是ObservableFromCallable的subscribeActual
    <pre>
    public void subscribeActual(Observer<? super T> s) {
    DeferredScalarDisposable<T> d = new DeferredScalarDisposable<T>(s);
    s.onSubscribe(d);
    if (d.isDisposed()) {
    return;
    }
    T value;
    try {
    value = ObjectHelper.requireNonNull(callable.call(), "Callable returned null");
    } catch (Throwable e) {
    Exceptions.throwIfFatal(e);
    if (!d.isDisposed()) {
    s.onError(e);
    } else {
    RxJavaPlugins.onError(e);
    }
    return;
    }
    d.complete(value);
    }
    </pre>

    发现了没有这个并没有。而是直接一个value,全部发送回去。
    因此 我们可以说这个方法。回调回去的值不论几个都会全部发送

    下面看第二个方法

    Observable.fromArray()这个方法
    <pre>
    public static <T> Observable<T> fromArray(T... items) {
    ObjectHelper.requireNonNull(items, "items is null");
    if (items.length == 0) {
    return empty();
    } else
    if (items.length == 1) {
    return just(items[0]);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }
    </pre>

    看着眼熟吗? 没错和just 一模一样。区别 just有数量限制

    第三个方法
    fromFuture

    相关文章

      网友评论

        本文标题:Rxjava2~from****那么多方法~学渣带你扣rxjav

        本文链接:https://www.haomeiwen.com/subject/oabccxtx.html