美文网首页
5、rxjava基本原理flatmap

5、rxjava基本原理flatmap

作者: 最美下雨天 | 来源:发表于2018-06-28 11:40 被阅读6次

线程切换:https://juejin.im/post/5a6751af6fb9a01cb2571794

public class Main {
    
    public static void main(String[] args)
    {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) {
                observableEmitter.onNext("12");
            }
        }).flatMap(new Function<String, Observable<Integer>>() {
            @Override
            public Observable<Integer> apply(String s) {
                final Integer value=Integer.parseInt(s);
                return Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> observableEmitter) {
                        observableEmitter.onNext(value);
                    }
                });
            }
        }).subscribe(new Observer<Integer>() {

            @Override
            void onNext(Integer value) {
                System.out.println(value + 10);
            }
        });
    }
}

public class ObservableCreate<T> extends Observable<T>{
    ObservableOnSubscribe observableOnSubscribe;
    public ObservableCreate(ObservableOnSubscribe<T> observableOnSubscribe)
    {
        this.observableOnSubscribe=observableOnSubscribe;
    }

    @Override
    protected void subscribeActual (Observer observer) {
        observableOnSubscribe.subscribe(new CreateEmitter(observer));
    }
}

public class CreateEmitter implements ObservableEmitter{

    Observer observer;
    public CreateEmitter(Observer observer)
    {

        this.observer=observer;
    }

    @Override
    public void onNext(Object value) {
        observer.onNext(value);
    }
}

public interface ObservableEmitter<T> {

    void onNext(T value);
}

public abstract class Observer<T> {
    abstract void onNext(T value);
}

public interface ObservableOnSubscribe<T> {

    void subscribe(ObservableEmitter<T> observableEmitter);
}

public class ObservableFlatMap<T,R> extends Observable{
    Observable<T> observable;
    Function<T,? extends Observable<? extends R>> mapper;
    public ObservableFlatMap(Observable<T> observable,Function<T,? extends Observable<? extends R>> mapper)
    {
        this.mapper=mapper;
        this.observable=observable;
    }

    @Override
    protected void subscribeActual(Observer observer) {

        observable.subscribe(new MergeObserver(mapper,observer));

    }
}

public class MergeObserver<T,R> extends  Observer<T>{

    Function<T,? extends Observable<? extends R>> mapper;
    Observer<T> observer;
    public MergeObserver(Function<T,? extends Observable<? extends R>> mapper,Observer<T> observer)
    {
        this.mapper=mapper;
        this.observer=observer;
    }


    @Override
    void onNext(T value) {
        Observable observable=mapper.apply(value);
        observable.subscribe(observer);
    }
}

public interface Function<T,R> {
    R apply(T t);
}

public abstract class Observable<T> {

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return new ObservableCreate<T>(source);
    }

    public final <R> Observable<R> flatMap(Function<T,? extends Observable<? extends R>> mapper) {

        return new ObservableFlatMap(this,mapper);
    }

    public final <U> void subscribe(Observer<U> observer) {

        subscribeActual(observer);
    }

    protected abstract <U> void subscribeActual(Observer<U> observer);
}

输出:


image.png

相关文章

网友评论

      本文标题:5、rxjava基本原理flatmap

      本文链接:https://www.haomeiwen.com/subject/vlkgyftx.html