美文网首页Android 开发进阶
RxJava3 Subject 用法及相关源码

RxJava3 Subject 用法及相关源码

作者: JeffreyWorld | 来源:发表于2021-12-17 13:56 被阅读0次
    RxJava3 Subject

    说到 Subject(主题) ,很多人可能都不是很熟悉它,因为相对于 RxJava 的 Observable、Schedulers、Subscribes 等关键字来讲,它抛头露面的场合似乎比较少。

    事实上,Subject 作用是很大的,借用官方的解释,Subject 在同一时间内,既可以作为 Observable,也可以作为Observer:

    在RxJava3.x中,官方一共为我们提供了以下几种Subject:
    ReplaySubject (释放接收到的所有数据)
    BehaviorSubject (释放订阅前最后一个数据和订阅后接收到的所有数据)
    PublishSubject (释放订阅后接收到的数据)
    AsyncSubject (仅释放接收到的最后一个数据)
    SerializedSubject(串行Subject)
    UnicastSubject (仅支持订阅一次的Subject)

    
    /**
     * Represents an {@link Observer} and an {@link Observable} at the same time, allowing
     * multicasting events from a single source to multiple child {@code Observer}s.
     * <p>
     * All methods except the {@link #onSubscribe(io.reactivex.rxjava3.disposables.Disposable)}, {@link #onNext(Object)},
     * {@link #onError(Throwable)} and {@link #onComplete()} are thread-safe.
     * Use {@link #toSerialized()} to make these methods thread-safe as well.
     *
     * @param <T> the item value type
     */
    public abstract class Subject<T> extends Observable<T> implements Observer<T> {
        /**
         * Returns true if the subject has any Observers.
         * <p>The method is thread-safe.
         * @return true if the subject has any Observers
         */
        @CheckReturnValue
        public abstract boolean hasObservers();
    
        /**
         * Returns true if the subject has reached a terminal state through an error event.
         * <p>The method is thread-safe.
         * @return true if the subject has reached a terminal state through an error event
         * @see #getThrowable()
         * @see #hasComplete()
         */
        @CheckReturnValue
        public abstract boolean hasThrowable();
    
        /**
         * Returns true if the subject has reached a terminal state through a complete event.
         * <p>The method is thread-safe.
         * @return true if the subject has reached a terminal state through a complete event
         * @see #hasThrowable()
         */
        @CheckReturnValue
        public abstract boolean hasComplete();
    
        /**
         * Returns the error that caused the Subject to terminate or null if the Subject
         * hasn't terminated yet.
         * <p>The method is thread-safe.
         * @return the error that caused the Subject to terminate or null if the Subject
         * hasn't terminated yet
         */
        @Nullable
        @CheckReturnValue
        public abstract Throwable getThrowable();
    
        /**
         * Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
         * onComplete methods, making them thread-safe.
         * <p>The method is thread-safe.
         * @return the wrapped and serialized subject
         */
        @NonNull
        @CheckReturnValue
        public final Subject<T> toSerialized() {
            if (this instanceof SerializedSubject) {
                return this;
            }
            return new SerializedSubject<>(this);
        }
    }
    
    

    下面用 PublishSubject (释放订阅后接收到的数据),举几个例子:

    public final class PublishSubject<T> extends Subject<T> {
    
        static final PublishDisposable[] TERMINATED = new PublishDisposable[0];
        static final PublishDisposable[] EMPTY = new PublishDisposable[0];
        final AtomicReference<PublishDisposable<T>[]> subscribers;
        Throwable error;
        /**
         * Constructs a PublishSubject.
         * @param <T> the value type
         * @return the new PublishSubject
         */
        public static <T> PublishSubject<T> create() {
            return new PublishSubject<>();
        }
    
        /**
         * Constructs a PublishSubject.
         * @since 2.0
         */
        PublishSubject() {
            subscribers = new AtomicReference<>(EMPTY);
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> t) {
            PublishDisposable<T> ps = new PublishDisposable<>(t, this);
            t.onSubscribe(ps);
            if (add(ps)) {
                // if cancellation happened while a successful add, the remove() didn't work
                // so we need to do it again
                if (ps.isDisposed()) {
                    remove(ps);
                }
            } else {
                Throwable ex = error;
                if (ex != null) {
                    t.onError(ex);
                } else {
                    t.onComplete();
                }
            }
        }
    
        /**
         * Tries to add the given subscriber to the subscribers array atomically
         * or returns false if the subject has terminated.
         * @param ps the subscriber to add
         * @return true if successful, false if the subject has terminated
         */
        boolean add(PublishDisposable<T> ps) {
            for (;;) {
                PublishDisposable<T>[] a = subscribers.get();
                if (a == TERMINATED) {
                    return false;
                }
    
                int n = a.length;
                @SuppressWarnings("unchecked")
                PublishDisposable<T>[] b = new PublishDisposable[n + 1];
                System.arraycopy(a, 0, b, 0, n);
                b[n] = ps;
    
                if (subscribers.compareAndSet(a, b)) {
                    return true;
                }
            }
        }
    
        /**
         * Atomically removes the given subscriber if it is subscribed to the subject.
         * @param ps the subject to remove
         */
        void remove(PublishDisposable<T> ps) {
            for (;;) {
                PublishDisposable<T>[] a = subscribers.get();
                if (a == TERMINATED || a == EMPTY) {
                    return;
                }
    
                int n = a.length;
                int j = -1;
                for (int i = 0; i < n; i++) {
                    if (a[i] == ps) {
                        j = i;
                        break;
                    }
                }
    
                if (j < 0) {
                    return;
                }
    
                PublishDisposable<T>[] b;
    
                if (n == 1) {
                    b = EMPTY;
                } else {
                    b = new PublishDisposable[n - 1];
                    System.arraycopy(a, 0, b, 0, j);
                    System.arraycopy(a, j + 1, b, j, n - j - 1);
                }
                if (subscribers.compareAndSet(a, b)) {
                    return;
                }
            }
        }
    
        @Override
        public void onSubscribe(Disposable d) {
            if (subscribers.get() == TERMINATED) {
                d.dispose();
            }
        }
    
        @Override
        public void onNext(T t) {
            ExceptionHelper.nullCheck(t, "onNext called with a null value.");
            for (PublishDisposable<T> pd : subscribers.get()) {
                pd.onNext(t);
            }
        }
    
        @Override
        public void onError(Throwable t) {
            ExceptionHelper.nullCheck(t, "onError called with a null Throwable.");
            if (subscribers.get() == TERMINATED) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
    
            for (PublishDisposable<T> pd : subscribers.getAndSet(TERMINATED)) {
                pd.onError(t);
            }
        }
    
        @Override
        public void onComplete() {
            if (subscribers.get() == TERMINATED) {
                return;
            }
            for (PublishDisposable<T> pd : subscribers.getAndSet(TERMINATED)) {
                pd.onComplete();
            }
        }
    
        @Override
        @CheckReturnValue
        public boolean hasObservers() {
            return subscribers.get().length != 0;
        }
    
        @Override
        @Nullable
        @CheckReturnValue
        public Throwable getThrowable() {
            if (subscribers.get() == TERMINATED) {
                return error;
            }
            return null;
        }
    
        @Override
        @CheckReturnValue
        public boolean hasThrowable() {
            return subscribers.get() == TERMINATED && error != null;
        }
    
        @Override
        @CheckReturnValue
        public boolean hasComplete() {
            return subscribers.get() == TERMINATED && error == null;
        }
    
        /**
         * Wraps the actual subscriber, tracks its requests and makes cancellation
         * to remove itself from the current subscribers array.
         *
         * @param <T> the value type
         */
        static final class PublishDisposable<T> extends AtomicBoolean implements Disposable {
    
            private static final long serialVersionUID = 3562861878281475070L;
            /** The actual subscriber. */
            final Observer<? super T> downstream;
            /** The subject state. */
            final PublishSubject<T> parent;
    
            /**
             * Constructs a PublishSubscriber, wraps the actual subscriber and the state.
             * @param actual the actual subscriber
             * @param parent the parent PublishProcessor
             */
            PublishDisposable(Observer<? super T> actual, PublishSubject<T> parent) {
                this.downstream = actual;
                this.parent = parent;
            }
    
            public void onNext(T t) {
                if (!get()) {
                    downstream.onNext(t);
                }
            }
    
            public void onError(Throwable t) {
                if (get()) {
                    RxJavaPlugins.onError(t);
                } else {
                    downstream.onError(t);
                }
            }
    
            public void onComplete() {
                if (!get()) {
                    downstream.onComplete();
                }
            }
    
            @Override
            public void dispose() {
                if (compareAndSet(false, true)) {
                    parent.remove(this);
                }
            }
    
            @Override
            public boolean isDisposed() {
                return get();
            }
        }
    }
    

    第一个例子:在申请应用敏感权限时的使用

    >RxPermissionsFragment类
    
        //包含所有当前的权限请求。一旦被授予或拒绝,他们就会被移除。
        private Map<String, PublishSubject<Permission>> mSubjects = new HashMap<>();
    
        void onRequestPermissionsResult(String[] permissions, int[] grantResults, boolean[] shouldShowRequestPermissionRationale) {
            for (int i = 0, size = permissions.length; i < size; i++) {
                log("onRequestPermissionsResult  " + permissions[i]);
                //找到相应的 subject
                PublishSubject<Permission> subject = mSubjects.get(permissions[i]);
                if (subject == null) {
                    // 没有找到主题
                    Log.e(RxPermissions.TAG, "RxPermissions.onRequestPermissionsResult invoked but didn't find the corresponding permission request.");
                    return;
                }
                mSubjects.remove(permissions[i]);
                boolean granted = grantResults[i] == PackageManager.PERMISSION_GRANTED;
                subject.onNext(new Permission(permissions[i], granted, shouldShowRequestPermissionRationale[i]));
                subject.onComplete();
            }
        }
    
    ...省略部分代码
    
        public PublishSubject<Permission> getSubjectByPermission(@NonNull String permission) {
            return mSubjects.get(permission);
        }
    
        public void setSubjectForPermission(@NonNull String permission, @NonNull PublishSubject<Permission> subject) {
            mSubjects.put(permission, subject);
        }
    
    
    >RxPermissions类
    
        @TargetApi(Build.VERSION_CODES.M)
        private Observable<Permission> requestImplementation(final String... permissions) {
            List<Observable<Permission>> list = new ArrayList<>(permissions.length);
            List<String> unrequestedPermissions = new ArrayList<>();
    
            // 在多个权限的情况下,我们为每个权限创建一个 Observable。
            // 最后,将观察到的值组合在一起以产生唯一的响应。
            for (String permission : permissions) {
                mRxPermissionsFragment.get().log("Requesting permission " + permission);
                if (isGranted(permission)) {
                    // 已授予,或未授予 Android M
                    // 返回授予的权限对象。
                    list.add(Observable.just(new Permission(permission, true, false)));
                    continue;
                }
    
                if (isRevoked(permission)) {
                    // 被策略撤销,返回一个被拒绝的 Permission 对象。
                    list.add(Observable.just(new Permission(permission, false, false)));
                    continue;
                }
    
                PublishSubject<Permission> subject = mRxPermissionsFragment.get().getSubjectByPermission(permission);
                // 如果不存在则创建一个新主题
                if (subject == null) {
                    unrequestedPermissions.add(permission);
                    subject = PublishSubject.create();
                    mRxPermissionsFragment.get().setSubjectForPermission(permission, subject);
                }
    
                list.add(subject);
            }
    
            if (!unrequestedPermissions.isEmpty()) {
                String[] unrequestedPermissionsArray = unrequestedPermissions.toArray(new String[unrequestedPermissions.size()]);
                requestPermissionsFromFragment(unrequestedPermissionsArray);
            }
            return Observable.concat(Observable.fromIterable(list));
        }
    
    

    上面的代码,是应用申请相关隐私权限时的业务逻辑。 Map<String, PublishSubject<Permission>> mSubjects 里用一个Map,含有若干个用 PublishSubject 包裹的 Permission。在申请或拒绝完权限后,释放订阅后接收到的数据。

    第二个例子:在应用首页,业务逻辑比较多,需要要按先后顺序处理对应逻辑,并做相应的 UI 展示。

    >IndexFragment类
    
        private PublishSubject<List<ListBean>> mBannerPublishSubject; //banner
        private PublishSubject<Object> mGuidePublishSubject; //首页引导
        private PublishSubject<Object> mSignAwardPublishSubject; //签到奖励
    
        @Override
        public void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            mBannerPublishSubject = PublishSubject.create();
            mGuidePublishSubject= PublishSubject.create();
            mMonthVipPublishSubject = PublishSubject.create();
            Observable.concat(mGuidePublishSubject, mBannerPublishSubject, mSignAwardPublishSubject)
                    .subscribe(new DefaultObserver<Object>() {
                        @Override
                        public void onNext(@io.reactivex.rxjava3.annotations.NonNull Object object) {
                        }
    
                        @Override
                        public void onError(@io.reactivex.rxjava3.annotations.NonNull Throwable e) {
                        }
    
                        @Override
                        public void onComplete() {
                            //服务端状态为已完成,本地没结束引导,不重置引导状态
                            if (NewUserGuideUtils.isServerGuideEnd()) {
                                NewUserGuideUtils.setFirstLocalGuideEnd();
                                showDialogAfterGuide();
                            } else {
                                showNewGuideDialog();
                            }
                        }
                    });
        }
        ...省略部分代码
    
        //当首页所需的banner数据取到时
        pravite void getBannerDataSuccess(){
             mBannerPublishSubject.onComplete();
        }
    
        //当首页所需的用户信息数据取到时
        pravite void getUserInfoDataSuccess(){
            mGuidPublishSubject.onNext(this);
            mGuidPublishSubject.onComplete();
        }
        
        //当首页所需的签到奖励的数据取到时
        pravite void getSignAwardDataSuccess(){
            mSignAwardPublishSubject.onNext(this);
            mSignAwardPublishSubject.onComplete();
        }
    
    

    下面用 BehaviorSubject (释放订阅前最后一个数据和订阅后接收到的所有数据),举个例子:

    我们正在开发的几个项目,都用到了这个开源库 RxLifecyclehttps://github.com/trello/RxLifecycle
    官方对这个库的原话解释:Lifecycle handling APIs for Android apps using RxJava
    其中 RxActivity 的源码中,就要到了 BehaviorSubject。这个 BehaviorSubject 会在不同的生命周期发射不同的 ActivityEvent ,比如在 onCreate() 生命周期发射 ActivityEvent.CREATE ,在 onStop() 发射 ActivityEvent.STOP

    public final class BehaviorSubject<T> extends Subject<T> {
    
        final AtomicReference<Object> value; //原子操作类,当前接收到的最后一个数据
    
        final AtomicReference<BehaviorDisposable<T>[]> observers;//原子操作类,BehaviorDisposable内部存储了所有接受到的数据
    
        @SuppressWarnings("rawtypes")
        static final BehaviorDisposable[] EMPTY = new BehaviorDisposable[0];//标记,意味着一个空的BehaviorDisposable
    
        @SuppressWarnings("rawtypes")
        static final BehaviorDisposable[] TERMINATED = new BehaviorDisposable[0]; //标记,意味着已经达到了TERMINATED,终止数据的发射
        final ReadWriteLock lock;
        final Lock readLock;
        final Lock writeLock;
    
        final AtomicReference<Throwable> terminalEvent;
    
        ...省略
    
    }
    
    public abstract class RxActivity extends Activity implements LifecycleProvider<ActivityEvent> {
    
        private final BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create();
    
        @Override
        @NonNull
        @CheckResult
        public final Observable<ActivityEvent> lifecycle() {
            return lifecycleSubject.hide();
        }
    
        @Override
        @NonNull
        @CheckResult
        public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
            return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
        }
    
        @Override
        @NonNull
        @CheckResult
        public final <T> LifecycleTransformer<T> bindToLifecycle() {
            return RxLifecycleAndroid.bindActivity(lifecycleSubject);
        }
    
        @Override
        @CallSuper
        protected void onCreate(@Nullable Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            lifecycleSubject.onNext(ActivityEvent.CREATE);
        }
    
        @Override
        @CallSuper
        protected void onStart() {
            super.onStart();
            lifecycleSubject.onNext(ActivityEvent.START);
        }
    
        @Override
        @CallSuper
        protected void onResume() {
            super.onResume();
            lifecycleSubject.onNext(ActivityEvent.RESUME);
        }
    
        @Override
        @CallSuper
        protected void onPause() {
            lifecycleSubject.onNext(ActivityEvent.PAUSE);
            super.onPause();
        }
    
        @Override
        @CallSuper
        protected void onStop() {
            lifecycleSubject.onNext(ActivityEvent.STOP);
            super.onStop();
        }
    
        @Override
        @CallSuper
        protected void onDestroy() {
            lifecycleSubject.onNext(ActivityEvent.DESTROY);
            super.onDestroy();
        }
    }
    

    相关文章

      网友评论

        本文标题:RxJava3 Subject 用法及相关源码

        本文链接:https://www.haomeiwen.com/subject/lqulxrtx.html