【RxJava】- 连接操作符源码分析

作者: 拔萝卜占坑 | 来源:发表于2020-03-31 14:32 被阅读0次

    目录

    【RxJava】- 创建操作符源码分析
    【RxJava】- 变换操作符源码分析
    【RxJava】- 过滤操作符源码分析
    【RxJava】- 结合操作符源码分析

    Connect

    让一个可连接的Observable开始发射数据给订阅者,可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这个方法,你可以等待所有的观察者都订阅了Observable之后再开始发射数据。

    RxJava中connect是ConnectableObservable接口的一个方法,使用publish操作符可以将一个普通的Observable转换为一个ConnectableObservable。

    Observable.create(null).replay().connect();
    

    实现类

    ConnectConsumer
    

    自己查看,很简单,就几行代码。

    调用 connect(cc)方法,返回disposable实例。

    Replay

    保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅.

    可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。

    实现类

    ObservableReplay
    

    Replay还有其它实现类,请自己查看。这里讲解replay()没有传入参数的实现。看一下创建过程,最终创建ObservableReplay实例传入的参数如下:

    • source
      被观察者
    • bufferFactory
      UnBoundedFactory实例
    • curr
      final AtomicReference<ReplayObserver<T>> curr = new AtomicReference<>();
    • onSubscribe
      ObservableSource<T> onSubscribe = new ReplaySource<>(curr, bufferFactory);
      

    首先调用connect(Consumer<? super Disposable> connection)方法

    返回初始化容量为16的UnboundedReplayBuffer数组

    ReplayBuffer<T> buf = bufferFactory.call();
    

    保存新创建的ReplayObserver对象

    if (!current.compareAndSet(ps, u)){continue;}
    

    调用accept方法

    connection.accept(ps);
    

    这里调用的是上面Connect中的ConnectConsumer类中的accept方法。

    public void accept(Disposable t) {
        this.disposable = t;
    }
    

    调用被观察者

    if (doConnect) {
        source.subscribe(ps);
    }
    

    调用onNext(T t)

    public void onNext(T t) {
        if (!done) {
            // 保存到数据
            buffer.next(t);
            replay();
        }
    }
    

    核心逻辑就在replay中,自己查看。

    Publish

    将普通的Observable转换为可连接的Observable

    ConnectableObservable observable = Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
        ObservableEmitter<Integer> observableEmitter = emitter.serialize();
        try {
            // 序列化
            if (!emitter.isDisposed()) {
                for (int i = 1; i < 3; i++) {
                    System.out.println("create operate--->emitter: "+i);
                    if (1==i){
                        // ExceptionHelper.TERMINATED
                       //   observableEmitter.onError(new Throwable("error"));
                    }else {
                        observableEmitter.onNext(i);
                    }
                }
                observableEmitter.onComplete();
            }
        } catch (Exception e) {
            observableEmitter.onError(e);
        }
    }).publish();
    observable.subscribe(new Observer<Integer>() {...});
    observable.connect();
    

    observable.connect()不调用,是不会发射数据的。

    实现类

    ObservablePublish
    

    看subscribeActual方法

    protected void subscribeActual(Observer<? super T> observer) {
        ...
        InnerDisposable<T> inner = new InnerDisposable<>(observer, co
        observer.onSubscribe(inner);
        if (conn.add(inner)) {
            if (inner.isDisposed()) {
                conn.remove(inner);
            }
            return;
        }
        ...
    }
    

    conn.add(inner)为每一个订阅者创建InnerDisposable对象,然后保存在PublishConnection中的一个数组中。

    看connect方法

    public void connect(Consumer<? super Disposable> connection) {
        ...
        if (doConnect) {
            source.subscribe(conn);
        }
    }
    

    调用被观察者subscribe对象,然后可以在里面发射数据。

    PublishConnection中的 onNext方法

    public void onNext(T t) {
        for (InnerDisposable<T> inner : get()) {
            inner.downstream.onNext(t);
        }
    

    调用每一个订阅者的onNext方法,将数据发射给订阅者。

    RefCount

    让一个可连接的Observable行为像普通的Observable。可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。

    Observable observable = Observable.create((ObservableOnSubscrib
        ObservableEmitter<Integer> observableEmitter = emitter.seri
        try {
            // 序列化
            if (!emitter.isDisposed()) {
                for (int i = 0; i < 4; i++) {
                    System.out.println("create operate--->emitter: 
                    if (1==i){
                        // ExceptionHelper.TERMINATED
                          observableEmitter.onError(new Throwable("
                    }else {
                        observableEmitter.onNext(i);
                    }
                }
                observableEmitter.onComplete();
            }
        } catch (Exception e) {
            observableEmitter.onError(e);
        }
    }).publish().refCount(2);
    observable.subscribe(...);
    observable.subscribe(...);
    

    实现类

    ObservableRefCount
    

    看一下subscribeActual方法

    protected void subscribeActual(Observer<? super T> observer) {
        RefConnection conn;
        boolean connect = false;
        synchronized (this) {
            conn = connection;
            if (conn == null) {
                conn = new RefConnection(this);
                connection = conn;
            }
            long c = conn.subscriberCount;
            if (c == 0L && conn.timer != null) {
                conn.timer.dispose();
            }
            conn.subscriberCount = c + 1;
            if (!conn.connected && c + 1 == n) {
                connect = true;
                conn.connected = true;
            }
        }
        source.subscribe(new RefCountObserver<>(observer, this, conn));
        if (connect) {
            source.connect(conn);
        }
    }
    

    if (!conn.connected && c + 1 == n)当订阅者的数量等于refCount方法传入的数量时,表示可以连接,调用ObservablePublish中的connect方法。就与上面将Publish操作符调用connect方法作用相同。

    当然你可以随时调用connect方法来强制发射数据。

    相关文章

      网友评论

        本文标题:【RxJava】- 连接操作符源码分析

        本文链接:https://www.haomeiwen.com/subject/dhfwuhtx.html