Android 利用RxJava和Retrofit搭建网络请求组

作者: 乱世白衣 | 来源:发表于2018-04-02 21:37 被阅读21467次

RxJava与Retrofit是当前使用比较广泛的两个框架,很多开发者同时使用了这两个框架并以此为基础搭建了网络请求。笔者也在使用,下面介绍一下如何利用RxJava实现简单的网络请求相关回调(onStart onSuccess等方法),并提供取消网络请求方法cancelRequest()。至于Retrofit部分配置以后有时间再做具体介绍,重点在于构建Retrofit时需要设置RxJava2CallAdapterFactory,请求接口调用方法返回Observable对象。
本文主要介绍相关回调设计及网络请求取消功能,主要是RxJava层面观察者部分,直接上代码

import android.app.Dialog;
import android.content.Context;
import android.os.Handler;
import android.os.Looper;
import android.support.annotation.NonNull;

import com.dev.kit.basemodule.BuildConfig;
import com.dev.kit.basemodule.R;
import com.dev.kit.basemodule.View.NetProgressDialog;
import com.dev.kit.basemodule.netRequest.Configs.Config;
import com.dev.kit.basemodule.netRequest.subscribers.NetRequestCallback;
import com.dev.kit.basemodule.netRequest.util.OnNetProgressCancelListener;
import com.dev.kit.basemodule.result.BaseResult;
import com.dev.kit.basemodule.util.ToastUtil;

import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import retrofit2.HttpException;


/**
 * 网络请求订阅者
 * Created by cuiyan on 16/6/2 14:09
 */
public class NetRequestSubscriber<T> implements Observer<T> {
    private Dialog progressDialog;
    private Disposable disposable;
    private NetRequestCallback<T> netRequestCallback;
    private Context context;

    /**
     * @param netRequestCallback 网络请求回调
     */
    public NetRequestSubscriber(@NonNull NetRequestCallback<T> netRequestCallback, Context context) {
        this(netRequestCallback, context, false, null);
    }

    /**
     * @param netRequestCallback    网络请求回调
     * @param showProgress          是否显示网络请求加载对话框
     * @param progressTip           loading提示语
     * @see NetProgressDialog
     */
    public NetRequestSubscriber(@NonNull final NetRequestCallback<T> netRequestCallback, Context context, boolean showProgress, String progressTip) {
        this.netRequestCallback = netRequestCallback;
        this.context = context;
        if (showProgress) {
            progressDialog = NetProgressDialog.getInstance(context, progressTip, new OnNetProgressCancelListener() {
                @Override
                public void onCancelRequest() {
                    cancelRequest();
                }
            });
        }
    }

    /**
     * @param netRequestCallback 网络请求回调
     * @param progressDialog     dialog 自定义对话框
     */
    public NetRequestSubscriber(@NonNull NetRequestCallback<T> netRequestCallback, Context context, @NonNull Dialog progressDialog) {
        this.netRequestCallback = netRequestCallback;
        this.context = context;
        this.progressDialog = progressDialog;
    }


    @Override
    public void onSubscribe(@NonNull Disposable d) {
        this.disposable = d;
        showProgress();
        onRequestStart();
    }

    @Override
    public synchronized void onNext(final T t) {
        if (t == null) {
            onRequestResultNull();
        } else {
            if (t instanceof BaseResult && !Config.REQUEST_SUCCESS_CODE.equals(((BaseResult) t).getCode())) {
                ToastUtil.showToast(context, ((BaseResult) t).getMessage());
            }
            onRequestSuccess(t);
        }
    }

    @Override
    public synchronized void onError(Throwable throwable) {
        dismissProgress();
        onRequestError(throwable);
        if (throwable instanceof HttpException) {
            ToastUtil.showToast(context, ((HttpException) throwable).message() + ((HttpException) throwable).code());
        } else {
            if (BuildConfig.DEBUG) {
                ToastUtil.showToast(context, "error:" + throwable.getMessage());
            } else {
                ToastUtil.showToast(context, context.getString(R.string.error_net_request_failed));
            }
        }
    }

    /**
     * {@link NetRequestSubscriber#onError(Throwable)}
     * {@link Observer#onError(Throwable)}
     * {@link Observer#onComplete()} (Throwable)}
     * 该方法与onError方法互斥
     */
    @Override
    public void onComplete() {
        dismissProgress();
        netRequestCallback.onFinish();
    }

    private void onRequestStart() {
        if (Looper.myLooper() != context.getMainLooper()) {
            Handler handler = new Handler(context.getMainLooper());
            handler.post(new Runnable() {
                @Override
                public void run() {
                    netRequestCallback.onStart();
                }
            });
        } else {
            netRequestCallback.onStart();
        }
    }

    private void onRequestSuccess(final T t) {
        if (Looper.myLooper() != context.getMainLooper()) {
            Handler handler = new Handler(context.getMainLooper());
            handler.post(new Runnable() {
                @Override
                public void run() {
                    netRequestCallback.onSuccess(t);
                }
            });
        } else {
            netRequestCallback.onSuccess(t);
        }
    }

    private void onRequestResultNull() {
        if (Looper.myLooper() != context.getMainLooper()) {
            Handler handler = new Handler(context.getMainLooper());
            handler.post(new Runnable() {
                @Override
                public void run() {
                    netRequestCallback.onResultNull();
                }
            });
        } else {
            netRequestCallback.onResultNull();
        }
    }

    private void onRequestError(final Throwable throwable) {
        throwable.printStackTrace();
        if (Looper.myLooper() != context.getMainLooper()) {
            Handler handler = new Handler(context.getMainLooper());
            handler.post(new Runnable() {
                @Override
                public void run() {
                    netRequestCallback.onError(throwable);
                    netRequestCallback.onFinish();
                }
            });
        } else {
            netRequestCallback.onError(throwable);
            netRequestCallback.onFinish();
        }
    }

    /**
     *
     *
     */
    private void showProgress() {
        if (progressDialog != null && !progressDialog.isShowing()) {
            progressDialog.show();
        }
    }

    private void dismissProgress() {
        if (progressDialog != null && progressDialog.isShowing()) {
            progressDialog.dismiss();
        }
    }

    public void cancelRequest() {
        dismissProgress();
        if (disposable != null && !disposable.isDisposed()) {
            disposable.dispose();
        }
        netRequestCallback.onCancel();
        netRequestCallback.onFinish();
    }
}

NetRequestSubscriber实现了Observer接口,利用Observer接口提供的以下四个方法即可实现简单的网络请求相关回调,上述回调实现的前提是构建Retrofit时要添加RxJava2CallAdapterFactory,这部分流程后续有空再聊。

 /**
 * Provides the Observer with the means of cancelling (disposing) the
 * connection (channel) with the Observable in both
 * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
 * @param d the Disposable instance whose {@link Disposable#dispose()} can
 * be called anytime to cancel the connection
 * @since 2.0
 */

void onSubscribe(@NonNull Disposable d);

/**
 * Provides the Observer with a new item to observe.
 * <p>
 * The {@link Observable} may call this method 0 or more times.
 * <p>
 * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
 * {@link #onError}.
 *
 * @param t
 *          the item emitted by the Observable
 */
void onNext(@NonNull T t);

/**
 * Notifies the Observer that the {@link Observable} has experienced an error condition.
 * <p>
 * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
 * {@link #onComplete}.
 *
 * @param e
 *          the exception encountered by the Observable
 */
void onError(@NonNull Throwable e);

/**
 * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
 * <p>
 * The {@link Observable} will not call this method if it calls {@link #onError}.
 */
void onComplete();

以上四个方法的作用及调用时机,api描述的已经很清晰,不再赘述,需要注意的地方是onError方法与onComplete方法互斥,当onError方法触发时,不会再触发onComplete方法。

下面我们看一下cancelRequest() ,顾名思义是要取消网络请求,那么是如何做到的?下面只做一点单介绍,定位到关键类及相关方法,说清细节还是比较麻烦的,篇幅也会比较大。

可以看到cancelRequest() 方法中调用了disposable.dispose()来取消网络请求,那么disposable到底是什么鬼?其实这个鬼是RxJava提供的类。源码如下:

/**
 * Represents a disposable resource.
 */
public interface Disposable {
    /**
     * Dispose the resource, the operation should be idempotent.
     */
    void dispose();

    /**
     * Returns true if this resource has been disposed.
     * @return true if this resource has been disposed
     */
    boolean isDisposed();
}

看Disposable源码,其实还是一头雾水的,无法直接看出是如何取消网络请求的,但其实想一想,取消网络请求实际上也不是RxJava能做到的,那应该是网络请求框架该做的事,Disposable只是个神辅而已。

前述已经说过构建Retrofit需要配置RxJava2CallAdapterFactory,是为了支持了以Observable形式观测网络请求,那么RxJava2CallAdapterFactory的作用是如何体现的,与调用Disposable.dispose()后取消网络请求有何关联?我们简单看一下RxJava2CallAdapterFactory类
该类通过工厂方法创建实例,其构造方法只有一个,如下

 private RxJava2CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
     this.scheduler = scheduler;
     this.isAsync = isAsync;
 }

RxJava2CallAdapterFactory的核心方法

public CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
    Class<?> rawType = getRawType(returnType);

  // 此处省略n个字

    return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable,
        isSingle, isMaybe, false);
}

重点来了:RxJava2CallAdapter 及 isAsync参数,直接看RxJava2CallAdapter核心代码,如下:

@Override 
public Object adapt(Call<R> call) {

    ************************************************重点*****************************************************
    Observable<Response<R>> responseObservable = isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call);
    *********************************************************************************************************
   
    Observable<?> observable;
    if (isResult) {
        observable = new ResultObservable<>(responseObservable);
    } else if (isBody) {
        observable = new BodyObservable<>(responseObservable);
    } else {
        observable = responseObservable;
    }

    if (scheduler != null) {
        observable = observable.subscribeOn(scheduler);
    }

    if (isFlowable) {
        return observable.toFlowable(BackpressureStrategy.LATEST);
    }

    if (isSingle) {
        return observable.singleOrError();
    }

    if (isMaybe) {
        return observable.singleElement();
    }

    if (isCompletable) {
        return observable.ignoreElements();
    }
    return observable;
}

看两行*号之间的重点部分, 定位到两个关键类CallEnqueueObservable和CallExecuteObservable粗略看一下两个类:
1.CallEnqueueObservable.java源码如下:

final class CallEnqueueObservable<T> extends Observable<Response<T>> {
    private final Call<T> originalCall;

    CallEnqueueObservable(Call<T> originalCall) {
        this.originalCall = originalCall;
    }

    @Override protected void subscribeActual(Observer<? super Response<T>> observer) {
        // Since Call is a one-shot type, clone it for each new observer.
        Call<T> call = originalCall.clone();
        CallCallback<T> callback = new CallCallback<>(call, observer);
        observer.onSubscribe(callback);
        call.enqueue(callback);
    }

    private static final class CallCallback<T> implements Disposable, Callback<T> {
        private final Call<?> call;
        private final Observer<? super Response<T>> observer;
        boolean terminated = false;

        CallCallback(Call<?> call, Observer<? super Response<T>> observer) {
            this.call = call;
            this.observer = observer;
        }

        @Override public void onResponse(Call<T> call, Response<T> response) {
            if (call.isCanceled()) return;

            try {
                observer.onNext(response);

                if (!call.isCanceled()) {
                    terminated = true;
                    observer.onComplete();
                }
            } catch (Throwable t) {
                if (terminated) {
                    RxJavaPlugins.onError(t);
                } else if (!call.isCanceled()) {
                    try {
                        observer.onError(t);
                    } catch (Throwable inner) {
                        Exceptions.throwIfFatal(inner);
                        RxJavaPlugins.onError(new CompositeException(t, inner));
                    }
                }
            }
        }

        @Override public void onFailure(Call<T> call, Throwable t) {
            if (call.isCanceled()) return;

            try {
                observer.onError(t);
            } catch (Throwable inner) {
                Exceptions.throwIfFatal(inner);
                RxJavaPlugins.onError(new CompositeException(t, inner));
            }
        }

        @Override public void dispose() {
            call.cancel();
        }

        @Override public boolean isDisposed() {
            return call.isCanceled();
        }
    }

看到 dispose()方法没 ^ _ ^,调用了 call.cancel(),这才是关键所在

@Override
 public void dispose() {
     call.cancel();
 }

2.CallExecuteObservable.java源码如下

 final class CallExecuteObservable<T> extends Observable<Response<T>> {
  private final Call<T> originalCall;

  CallExecuteObservable(Call<T> originalCall) {
    this.originalCall = originalCall;
  }

  @Override protected void subscribeActual(Observer<? super Response<T>> observer) {
    // Since Call is a one-shot type, clone it for each new observer.
    Call<T> call = originalCall.clone();
    observer.onSubscribe(new CallDisposable(call));

    boolean terminated = false;
    try {
      Response<T> response = call.execute();
      if (!call.isCanceled()) {
        observer.onNext(response);
      }
      if (!call.isCanceled()) {
        terminated = true;
        observer.onComplete();
      }
    } catch (Throwable t) {
      Exceptions.throwIfFatal(t);
      if (terminated) {
        RxJavaPlugins.onError(t);
      } else if (!call.isCanceled()) {
        try {
          observer.onError(t);
        } catch (Throwable inner) {
          Exceptions.throwIfFatal(inner);
          RxJavaPlugins.onError(new CompositeException(t, inner));
        }
      }
    }
  }

  private static final class CallDisposable implements Disposable {
    private final Call<?> call;

    CallDisposable(Call<?> call) {
      this.call = call;
    }

    @Override public void dispose() {
      call.cancel();
    }

    @Override public boolean isDisposed() {
      return call.isCanceled();
    }
  }

重点在subscribeActual方法中第三行observer.onSubscribe(new CallDisposable(call));CallDisposable是CallExecuteObservable定义的内部类,实现了Disposable接口,dispose()方法中也是调用了call.cancel()方法来取消网络请求

付与该问相关重要java类
RxJava部分:
Observer.java
Observable.java
Disposable.java

Retrofit部分:
Retrofit.java (着重关注create方法)
OkHttpCall.java
RxJava2CallAdapterFactory.java (着重关注get方法)
RxJava2CallAdapter (着重关注adapt方法)
CallEnqueueObservable.java (着重关注subscribeActual方法及CallCallback内部类)
CallExecuteObservable.java (着重关注subscribeActual方法)

该文并未详细介绍具体流程,且由于笔者水平有限,可能有错误或不妥之处,如该文有幸被读,望赐教
另附完整示例:https://github.com/670832188/TestApp

相关文章

网友评论

  • 王磊_3c60:既然用了rxjava,为啥还用handler呀
    乱世白衣:@王磊_3c60 算是一种保守写法吧,如果您没有在主线程订阅结果,这种写法会帮您切换到主线程处理结果

本文标题:Android 利用RxJava和Retrofit搭建网络请求组

本文链接:https://www.haomeiwen.com/subject/zpkxhftx.html