说到 Subject(主题) ,很多人可能都不是很熟悉它,因为相对于 RxJava 的 Observable、Schedulers、Subscribes 等关键字来讲,它抛头露面的场合似乎比较少。
事实上,Subject 作用是很大的,借用官方的解释,Subject 在同一时间内,既可以作为 Observable,也可以作为Observer:
在RxJava3.x中,官方一共为我们提供了以下几种Subject:
ReplaySubject
(释放接收到的所有数据)
BehaviorSubject
(释放订阅前最后一个数据和订阅后接收到的所有数据)
PublishSubject
(释放订阅后接收到的数据)
AsyncSubject
(仅释放接收到的最后一个数据)
SerializedSubject
(串行Subject)
UnicastSubject
(仅支持订阅一次的Subject)
/**
* Represents an {@link Observer} and an {@link Observable} at the same time, allowing
* multicasting events from a single source to multiple child {@code Observer}s.
* <p>
* All methods except the {@link #onSubscribe(io.reactivex.rxjava3.disposables.Disposable)}, {@link #onNext(Object)},
* {@link #onError(Throwable)} and {@link #onComplete()} are thread-safe.
* Use {@link #toSerialized()} to make these methods thread-safe as well.
*
* @param <T> the item value type
*/
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
/**
* Returns true if the subject has any Observers.
* <p>The method is thread-safe.
* @return true if the subject has any Observers
*/
@CheckReturnValue
public abstract boolean hasObservers();
/**
* Returns true if the subject has reached a terminal state through an error event.
* <p>The method is thread-safe.
* @return true if the subject has reached a terminal state through an error event
* @see #getThrowable()
* @see #hasComplete()
*/
@CheckReturnValue
public abstract boolean hasThrowable();
/**
* Returns true if the subject has reached a terminal state through a complete event.
* <p>The method is thread-safe.
* @return true if the subject has reached a terminal state through a complete event
* @see #hasThrowable()
*/
@CheckReturnValue
public abstract boolean hasComplete();
/**
* Returns the error that caused the Subject to terminate or null if the Subject
* hasn't terminated yet.
* <p>The method is thread-safe.
* @return the error that caused the Subject to terminate or null if the Subject
* hasn't terminated yet
*/
@Nullable
@CheckReturnValue
public abstract Throwable getThrowable();
/**
* Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
* onComplete methods, making them thread-safe.
* <p>The method is thread-safe.
* @return the wrapped and serialized subject
*/
@NonNull
@CheckReturnValue
public final Subject<T> toSerialized() {
if (this instanceof SerializedSubject) {
return this;
}
return new SerializedSubject<>(this);
}
}
下面用 PublishSubject
(释放订阅后接收到的数据),举几个例子:
public final class PublishSubject<T> extends Subject<T> {
static final PublishDisposable[] TERMINATED = new PublishDisposable[0];
static final PublishDisposable[] EMPTY = new PublishDisposable[0];
final AtomicReference<PublishDisposable<T>[]> subscribers;
Throwable error;
/**
* Constructs a PublishSubject.
* @param <T> the value type
* @return the new PublishSubject
*/
public static <T> PublishSubject<T> create() {
return new PublishSubject<>();
}
/**
* Constructs a PublishSubject.
* @since 2.0
*/
PublishSubject() {
subscribers = new AtomicReference<>(EMPTY);
}
@Override
protected void subscribeActual(Observer<? super T> t) {
PublishDisposable<T> ps = new PublishDisposable<>(t, this);
t.onSubscribe(ps);
if (add(ps)) {
// if cancellation happened while a successful add, the remove() didn't work
// so we need to do it again
if (ps.isDisposed()) {
remove(ps);
}
} else {
Throwable ex = error;
if (ex != null) {
t.onError(ex);
} else {
t.onComplete();
}
}
}
/**
* Tries to add the given subscriber to the subscribers array atomically
* or returns false if the subject has terminated.
* @param ps the subscriber to add
* @return true if successful, false if the subject has terminated
*/
boolean add(PublishDisposable<T> ps) {
for (;;) {
PublishDisposable<T>[] a = subscribers.get();
if (a == TERMINATED) {
return false;
}
int n = a.length;
@SuppressWarnings("unchecked")
PublishDisposable<T>[] b = new PublishDisposable[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = ps;
if (subscribers.compareAndSet(a, b)) {
return true;
}
}
}
/**
* Atomically removes the given subscriber if it is subscribed to the subject.
* @param ps the subject to remove
*/
void remove(PublishDisposable<T> ps) {
for (;;) {
PublishDisposable<T>[] a = subscribers.get();
if (a == TERMINATED || a == EMPTY) {
return;
}
int n = a.length;
int j = -1;
for (int i = 0; i < n; i++) {
if (a[i] == ps) {
j = i;
break;
}
}
if (j < 0) {
return;
}
PublishDisposable<T>[] b;
if (n == 1) {
b = EMPTY;
} else {
b = new PublishDisposable[n - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, n - j - 1);
}
if (subscribers.compareAndSet(a, b)) {
return;
}
}
}
@Override
public void onSubscribe(Disposable d) {
if (subscribers.get() == TERMINATED) {
d.dispose();
}
}
@Override
public void onNext(T t) {
ExceptionHelper.nullCheck(t, "onNext called with a null value.");
for (PublishDisposable<T> pd : subscribers.get()) {
pd.onNext(t);
}
}
@Override
public void onError(Throwable t) {
ExceptionHelper.nullCheck(t, "onError called with a null Throwable.");
if (subscribers.get() == TERMINATED) {
RxJavaPlugins.onError(t);
return;
}
error = t;
for (PublishDisposable<T> pd : subscribers.getAndSet(TERMINATED)) {
pd.onError(t);
}
}
@Override
public void onComplete() {
if (subscribers.get() == TERMINATED) {
return;
}
for (PublishDisposable<T> pd : subscribers.getAndSet(TERMINATED)) {
pd.onComplete();
}
}
@Override
@CheckReturnValue
public boolean hasObservers() {
return subscribers.get().length != 0;
}
@Override
@Nullable
@CheckReturnValue
public Throwable getThrowable() {
if (subscribers.get() == TERMINATED) {
return error;
}
return null;
}
@Override
@CheckReturnValue
public boolean hasThrowable() {
return subscribers.get() == TERMINATED && error != null;
}
@Override
@CheckReturnValue
public boolean hasComplete() {
return subscribers.get() == TERMINATED && error == null;
}
/**
* Wraps the actual subscriber, tracks its requests and makes cancellation
* to remove itself from the current subscribers array.
*
* @param <T> the value type
*/
static final class PublishDisposable<T> extends AtomicBoolean implements Disposable {
private static final long serialVersionUID = 3562861878281475070L;
/** The actual subscriber. */
final Observer<? super T> downstream;
/** The subject state. */
final PublishSubject<T> parent;
/**
* Constructs a PublishSubscriber, wraps the actual subscriber and the state.
* @param actual the actual subscriber
* @param parent the parent PublishProcessor
*/
PublishDisposable(Observer<? super T> actual, PublishSubject<T> parent) {
this.downstream = actual;
this.parent = parent;
}
public void onNext(T t) {
if (!get()) {
downstream.onNext(t);
}
}
public void onError(Throwable t) {
if (get()) {
RxJavaPlugins.onError(t);
} else {
downstream.onError(t);
}
}
public void onComplete() {
if (!get()) {
downstream.onComplete();
}
}
@Override
public void dispose() {
if (compareAndSet(false, true)) {
parent.remove(this);
}
}
@Override
public boolean isDisposed() {
return get();
}
}
}
第一个例子:在申请应用敏感权限时的使用
>RxPermissionsFragment类
//包含所有当前的权限请求。一旦被授予或拒绝,他们就会被移除。
private Map<String, PublishSubject<Permission>> mSubjects = new HashMap<>();
void onRequestPermissionsResult(String[] permissions, int[] grantResults, boolean[] shouldShowRequestPermissionRationale) {
for (int i = 0, size = permissions.length; i < size; i++) {
log("onRequestPermissionsResult " + permissions[i]);
//找到相应的 subject
PublishSubject<Permission> subject = mSubjects.get(permissions[i]);
if (subject == null) {
// 没有找到主题
Log.e(RxPermissions.TAG, "RxPermissions.onRequestPermissionsResult invoked but didn't find the corresponding permission request.");
return;
}
mSubjects.remove(permissions[i]);
boolean granted = grantResults[i] == PackageManager.PERMISSION_GRANTED;
subject.onNext(new Permission(permissions[i], granted, shouldShowRequestPermissionRationale[i]));
subject.onComplete();
}
}
...省略部分代码
public PublishSubject<Permission> getSubjectByPermission(@NonNull String permission) {
return mSubjects.get(permission);
}
public void setSubjectForPermission(@NonNull String permission, @NonNull PublishSubject<Permission> subject) {
mSubjects.put(permission, subject);
}
>RxPermissions类
@TargetApi(Build.VERSION_CODES.M)
private Observable<Permission> requestImplementation(final String... permissions) {
List<Observable<Permission>> list = new ArrayList<>(permissions.length);
List<String> unrequestedPermissions = new ArrayList<>();
// 在多个权限的情况下,我们为每个权限创建一个 Observable。
// 最后,将观察到的值组合在一起以产生唯一的响应。
for (String permission : permissions) {
mRxPermissionsFragment.get().log("Requesting permission " + permission);
if (isGranted(permission)) {
// 已授予,或未授予 Android M
// 返回授予的权限对象。
list.add(Observable.just(new Permission(permission, true, false)));
continue;
}
if (isRevoked(permission)) {
// 被策略撤销,返回一个被拒绝的 Permission 对象。
list.add(Observable.just(new Permission(permission, false, false)));
continue;
}
PublishSubject<Permission> subject = mRxPermissionsFragment.get().getSubjectByPermission(permission);
// 如果不存在则创建一个新主题
if (subject == null) {
unrequestedPermissions.add(permission);
subject = PublishSubject.create();
mRxPermissionsFragment.get().setSubjectForPermission(permission, subject);
}
list.add(subject);
}
if (!unrequestedPermissions.isEmpty()) {
String[] unrequestedPermissionsArray = unrequestedPermissions.toArray(new String[unrequestedPermissions.size()]);
requestPermissionsFromFragment(unrequestedPermissionsArray);
}
return Observable.concat(Observable.fromIterable(list));
}
上面的代码,是应用申请相关隐私权限时的业务逻辑。 Map<String, PublishSubject<Permission>> mSubjects
里用一个Map,含有若干个用 PublishSubject 包裹的 Permission。在申请或拒绝完权限后,释放订阅后接收到的数据。
第二个例子:在应用首页,业务逻辑比较多,需要要按先后顺序处理对应逻辑,并做相应的 UI 展示。
>IndexFragment类
private PublishSubject<List<ListBean>> mBannerPublishSubject; //banner
private PublishSubject<Object> mGuidePublishSubject; //首页引导
private PublishSubject<Object> mSignAwardPublishSubject; //签到奖励
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
mBannerPublishSubject = PublishSubject.create();
mGuidePublishSubject= PublishSubject.create();
mMonthVipPublishSubject = PublishSubject.create();
Observable.concat(mGuidePublishSubject, mBannerPublishSubject, mSignAwardPublishSubject)
.subscribe(new DefaultObserver<Object>() {
@Override
public void onNext(@io.reactivex.rxjava3.annotations.NonNull Object object) {
}
@Override
public void onError(@io.reactivex.rxjava3.annotations.NonNull Throwable e) {
}
@Override
public void onComplete() {
//服务端状态为已完成,本地没结束引导,不重置引导状态
if (NewUserGuideUtils.isServerGuideEnd()) {
NewUserGuideUtils.setFirstLocalGuideEnd();
showDialogAfterGuide();
} else {
showNewGuideDialog();
}
}
});
}
...省略部分代码
//当首页所需的banner数据取到时
pravite void getBannerDataSuccess(){
mBannerPublishSubject.onComplete();
}
//当首页所需的用户信息数据取到时
pravite void getUserInfoDataSuccess(){
mGuidPublishSubject.onNext(this);
mGuidPublishSubject.onComplete();
}
//当首页所需的签到奖励的数据取到时
pravite void getSignAwardDataSuccess(){
mSignAwardPublishSubject.onNext(this);
mSignAwardPublishSubject.onComplete();
}
下面用 BehaviorSubject
(释放订阅前最后一个数据和订阅后接收到的所有数据),举个例子:
我们正在开发的几个项目,都用到了这个开源库 RxLifecycle
:https://github.com/trello/RxLifecycle
官方对这个库的原话解释:Lifecycle handling APIs for Android apps using RxJava
其中 RxActivity
的源码中,就要到了 BehaviorSubject
。这个 BehaviorSubject
会在不同的生命周期发射不同的 ActivityEvent
,比如在 onCreate()
生命周期发射 ActivityEvent.CREATE
,在 onStop()
发射 ActivityEvent.STOP
。
public final class BehaviorSubject<T> extends Subject<T> {
final AtomicReference<Object> value; //原子操作类,当前接收到的最后一个数据
final AtomicReference<BehaviorDisposable<T>[]> observers;//原子操作类,BehaviorDisposable内部存储了所有接受到的数据
@SuppressWarnings("rawtypes")
static final BehaviorDisposable[] EMPTY = new BehaviorDisposable[0];//标记,意味着一个空的BehaviorDisposable
@SuppressWarnings("rawtypes")
static final BehaviorDisposable[] TERMINATED = new BehaviorDisposable[0]; //标记,意味着已经达到了TERMINATED,终止数据的发射
final ReadWriteLock lock;
final Lock readLock;
final Lock writeLock;
final AtomicReference<Throwable> terminalEvent;
...省略
}
public abstract class RxActivity extends Activity implements LifecycleProvider<ActivityEvent> {
private final BehaviorSubject<ActivityEvent> lifecycleSubject = BehaviorSubject.create();
@Override
@NonNull
@CheckResult
public final Observable<ActivityEvent> lifecycle() {
return lifecycleSubject.hide();
}
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event) {
return RxLifecycle.bindUntilEvent(lifecycleSubject, event);
}
@Override
@NonNull
@CheckResult
public final <T> LifecycleTransformer<T> bindToLifecycle() {
return RxLifecycleAndroid.bindActivity(lifecycleSubject);
}
@Override
@CallSuper
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
lifecycleSubject.onNext(ActivityEvent.CREATE);
}
@Override
@CallSuper
protected void onStart() {
super.onStart();
lifecycleSubject.onNext(ActivityEvent.START);
}
@Override
@CallSuper
protected void onResume() {
super.onResume();
lifecycleSubject.onNext(ActivityEvent.RESUME);
}
@Override
@CallSuper
protected void onPause() {
lifecycleSubject.onNext(ActivityEvent.PAUSE);
super.onPause();
}
@Override
@CallSuper
protected void onStop() {
lifecycleSubject.onNext(ActivityEvent.STOP);
super.onStop();
}
@Override
@CallSuper
protected void onDestroy() {
lifecycleSubject.onNext(ActivityEvent.DESTROY);
super.onDestroy();
}
}
网友评论