美文网首页
RxJava的源码理解及手动写一个简单的RxJava

RxJava的源码理解及手动写一个简单的RxJava

作者: cao苗子 | 来源:发表于2019-11-18 11:41 被阅读0次

1.简单使用

Observable
            .just("miaozi")// 返回 ObservableJust
            .map(new Function<String, String>() {
                @Override
                public String apply(@NonNull String s) throws Exception {
                    //可进行耗时操作
                    return s;
                }
            })// 返回 ObservableMap
            .subscribeOn(Scheduler.io())// 返回 ObservableSubscribeOn
            .observeOn(Scheduler.mainThread())// ObservableObserveOn
            .subscribe(new Consumer<String>() {//开始执行
                @Override
                public void accept(String s) throws Exception {
                    Log.e("TAG",s);
                }
            });

这个是我写的一个简单的RxJava中的map使用和子线程和主线程之间的调度使用。目的主要是加深对RxJava的理解。说实话,我自己是看视频对源码进行分析的,分析了有一个星期的时间吧,到现在才弄懂其中的源码。

RxJava是一种响应式的编码思想,可以是采用链式和递归的方式来调用的,还采用的静态代理的设计模式来代理使用,下面我在一一介绍和分析。

2.分析

第一步:just返回的是一个ObservableJust对象,里面存储一个 value 值 和实现了 subscribeActual方法。

public final class ObservableJust<T> extends Observable<T>{
    private final T value;
    public ObservableJust(final T value) {
        this.value = value;//miaozi
    }

    @Override
    protected void subscribeActual(Observer<T> observer) {
        Log.e("TAG","ObservableJust" + " observer="+observer.toString());
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        observer.onSubscribe();
        sd.run();
    }
}

第二步:第一步的基础之上 ObservableJust.map() 返回一个ObservableMap对象,里面有两个值,一个是source,source其实就是ObservableJust对象。一个是function对象。还是实现了subscribeActual方法。

/**
 * created by panshimu
 * on 2019/11/13
 */
public class ObservableMap<T, U> extends Observable<U>{
    final Observable<T> source;//其实就是 ObservableJust
    final Function<T,U> function;
    public ObservableMap(Observable<T> source, Function<T,U> function) {
        this.function = function;
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<U> observer) {
        Log.e("TAG","ObservableMap"+" observer="+observer.toString());
        //静态代理 调用的是ObservableJust中的subscribeActual
        source.subscribe(new MapObserver<T>(observer,function));
    }

    private final class MapObserver<T> implements Observer<T> {
        final Function<T,U> function;
        final Observer<U> observer;
        public MapObserver(Observer<U> observer, Function<T,U> function) {
            this.function = function;
            this.observer = observer;
        }
        @Override
        public void onSubscribe() {
            observer.onSubscribe();
        }
        @Override
        public void onNext(@NonNull T t) {
            try {
                U apply = function.apply(t);
                observer.onNext(apply);
            }catch (Exception e){
                observer.onError(e);
            }
        }
        @Override
        public void onError(@NonNull Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onComplete() {
            observer.onComplete();
        }
    }
}

第三步骤:切换子线程 subscribeOn 返回一个ObservableSubscribeOn对象,里面有两个值,一个是source,source就是上一个ObservableMap对象,还有一个值就是Scheduler,并且也实现了subscribeActual方法。

package com.miaozi.myrxjava;
import android.util.Log;
/**
 * created by panshimu
 * on 2019/11/15
 */
public class ObservableSubscribeOn<T> extends Observable<T> {
    final Observable<T> source;//上一个 ObservableMap
    final Scheduler scheduler;//IOScheduler
    public ObservableSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(Observer<T> observer) {
        Log.e("TAG","ObservableSubscribeOn" + " observer="+observer.toString());
       //执行的是 IOScheduler 中的 scheduleDirect 然后会调用 SubscribeTask 中的run方法
        scheduler.scheduleDirect(new SubscribeTask(source,observer));
    }
}


public class SubscribeTask<T> implements Runnable {
    Observer<T> observer;//
    final Observable<T> source;//ObservableMap

    public SubscribeTask(Observable<T> source, Observer<T> observer) {
        this.source = source;
        this.observer = observer;
    }
    @Override
    public void run() {
        Log.e("TAG", "SubscribeTask" + " 切换到子线程-->"+ source.toString());
        //调用ObservableMap中的 subscribeActual 
        source.subscribe(observer);
    }
}

第四步:observeOn 切换回主线程 返回一个 ObservableObserveOn 对象,这个里面也是存储两个值,一个是source,source就是上一层ObservableSubscribeOn对象,另一个就是Scheduler了。这个也实现了subscribeActual方法。

package com.miaozi.myrxjava;

import android.util.Log;

import androidx.annotation.NonNull;

/**
 * created by panshimu
 * on 2019/11/15
 */
class ObservableObserveOn<T> extends Observable<T> {
    Observable<T> source;//ObservableSubscribeOn
    Scheduler scheduler;
    public ObservableObserveOn(Observable<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(Observer<T> observer) {
        Log.e("TAG","ObservableObserveOn"+ " observer="+observer.toString());
        //调用ObservableSubscribeOn中的subscribeActual
        source.subscribe(new ObserveOnObserver(observer));
    }

    private class ObserveOnObserver implements Observer<T>, Runnable {
        private T item;
        final Observer<T> observer;
        public ObserveOnObserver(Observer<T> observer) {
            this.observer = observer;
        }

        @Override
        public void onSubscribe() {
            observer.onSubscribe();
        }

        @Override
        public void onNext(@NonNull T t) {
            item = t;
            scheduler.scheduleDirect(this);
        }

        @Override
        public void onError(@NonNull Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onComplete() {
            observer.onComplete();
        }

        @Override
        public void run() {
            observer.onNext(item);
        }
    }
}

第五步:subscribe 调用的时候ObservableObserveOn父类中的subscribe,最终调用ObservableObserveOn中的subscribeActual(observer); observer就是Consumer。也就是调用上层的subscribeActual。

可以发现他其实是一条链子在调用,到最后一个要开始执行的时候,根据它的source也就是上级,一层一层的网上递归的调用,知道最上层然后执行方法又一层一层往回调用。

这里说明一下切换主线程和子线程,看源码不难发现切换到子线程是开启一个线程池,然后把 source.subscribe(observer);放到子线程的run方法中,就可以实现子线程的切换,然后又怎么切换到主线程呢?

切换主线程:

package com.miaozi.myrxjava;

import android.os.Handler;
import android.os.Message;

/**
 * created by panshimu
 * on 2019/11/18
 */
public class MainScheduler extends Scheduler {
    private Handler handler;
    public MainScheduler(Handler handler) {
        this.handler = handler;
    }
    @Override
    public void scheduleDirect(Runnable runnable) {
        Message obtain = Message.obtain(handler, runnable);
        handler.sendMessage(obtain);
    }
}

这段话很简单把?一开始我也猜想到是用 handler 的方式,一看就是。但是跟我们正常的切换时不一样的,并没有实现handlerMassage()方法的重写,是因为handler的源码中放我们实现了,具体去看handler源码中的callback。

再看MapObserver中的onNext()

public void onNext(@NonNull T t) {
            try {
                U apply = function.apply(t);
                observer.onNext(apply);
            }catch (Exception e){
                observer.onError(e);
            }
        }

这里的onNext就是不断的下载传递,进行value的转换,最后回执行到LambdaObserver中的onNext()

   @Override
    public void onNext(@NonNull T t) {
        try {
            this.onNext.accept(t);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

看到这里就可以看到 accept()方法了。这是最后最后的回调回去了。

附上源码连接:
https://github.com/panshimu/MyRxJava2

相关文章

网友评论

      本文标题:RxJava的源码理解及手动写一个简单的RxJava

      本文链接:https://www.haomeiwen.com/subject/expvictx.html