一、RxJava的核心思想
介绍RxJava的核心思想之前先来看一个例子
- 例子
在我们开发中,假如我们需要下载一张图片,并且显示出来,可以有多种方法,A可能使用new Thread+Handler来实现、B使用AsynTask来实现、C可能使用别的方式来实现。各自写的代码都不一样,这样就会造成下载图片的一个功能,没有统一的思维。
RxJava的思想就是基于起点到终点的一个模式,在起点中输入一个事件,终点输出我们想要的结果。在起点和终点之间可能会有多个拦截加工对事件的处理,但是目的都是为了在终点输出我们想要的结果。起点和终点的线路不会断。
就好比生活中的例子,在工厂的车间的生产线中,总会有一个材料的输入源头,和成品的输出结果。中间是一条流水线,在流水线中可能会有多道加工程序进行加工,当最后一道加工程序加工完之后,就会输出一个成品。也就是我们终点的结果就是拿到离我们最近的加工程序的结果。
下面我们通过RxJava来实现一个下载图片的功能:
String path = "http://39.108.14.94:9007/donghui_oa/IMG/20210103041020962378.jpg";
private void test1() {
//1、输入一个事件path
Observable.just(path)
//2、对事件加工
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(@NonNull String s) throws Exception {
URL url = new URL(path);
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
httpURLConnection.setConnectTimeout(16000);
int code = httpURLConnection.getResponseCode();
if (code == HttpURLConnection.HTTP_OK) {
InputStream inputStream = httpURLConnection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap;
}
return null;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
//3、订阅
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG, "onSubscribe: " );
}
//4、输出事件
@Override
public void onNext(@NonNull Bitmap bitmap) {
Log.e(TAG, "onNext: " );
image.setImageBitmap(bitmap);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "e: "+e );
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: " );
}
});
}
起点我们输入一个path事件,接着对事件进行加工即通过path去网络下载图片,然后将事件类型转换成bitmap。我们的终点接受上一道加工程序的事件bitmap。最后拿到结果显示我们的图片。
二、RxJava配合Retrofit的使用
在RxJava配合Retrofit的使用中,Retrofit负责去请求网络拿到数据,并且将拿到的数据封装成一个起点。接着通过RxJava反射数据将起点事件传递到终点。
- 例子
(1)封装一个下载图片的Api
package com.example.andoiddemo;
import io.reactivex.Observable;
import okhttp3.ResponseBody;
import retrofit2.http.GET;
import retrofit2.http.Streaming;
public interface Api {
@GET("IMG/20210103041020962378.jpg")
@Streaming
Observable<ResponseBody> downLoad();
}
(2)封装Retrofit
package com.example.andoiddemo;
import android.util.Log;
import com.facebook.stetho.okhttp3.StethoInterceptor;
import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.gson.GsonConverterFactory;
public class RetrofitUtil {
final static String baseUrl = "http://39.108.14.94:9007/donghui_oa/";
static OkHttpClient okHttpClient = new OkHttpClient.Builder()
.addInterceptor(new HttpLoggingInterceptor(new HttpLoggingInterceptor.Logger() {
@Override
public void log(String message) { //访问网络请求,和服务端响应请求时。将数据拦截并输出
Log.e("RetrofitUtil", "log: " + message);
}
}).setLevel(HttpLoggingInterceptor.Level.BODY)) //Log等级
.connectTimeout(16 ,TimeUnit.SECONDS) //超时时间
.readTimeout(16, TimeUnit.SECONDS)
.writeTimeout(16, TimeUnit.SECONDS)
.addNetworkInterceptor(new StethoInterceptor())
.build();
static Retrofit retrofit = new Retrofit
.Builder()
.baseUrl(baseUrl)
.client(okHttpClient)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create()).build();
public static Retrofit getInstance() {
return retrofit;
}
}
(3)下载图片
//下载图片
private void test2() {
//1、起点
Observable<ResponseBody> bodyObservable = RetrofitUtil.getInstance().create(Api.class).downLoad();
//2、起点发射事件
bodyObservable.
subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
//3、转换事件类型
.map(new Function<ResponseBody, Bitmap>() {
@Override
public Bitmap apply(@NonNull ResponseBody responseBody) throws Exception {
Bitmap bitmap = BitmapFactory.decodeStream(responseBody.byteStream());
return bitmap;
}
})
//4、订阅
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG, "onSubscribe: ");
}
//5、接受事件
@Override
public void onNext(@NonNull Bitmap bitmap) {
Log.e(TAG, "onNext: " + bitmap);
image.setImageBitmap(bitmap);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError: " + e);
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});
}
在Retrofit中支持添加一个addCallAdapterFactory为RxJava2CallAdapterFactory,即我们定义API的时候可以定义返回RxJava的数据类型 Observable<ResponseBody>。这个即是我们在RxJava中说的起点。
接着订阅Observable<ResponseBody>起点发射数据,并转换数据成Bitmap。最后终点接受我们的Bitmap,拿到最后结果。显示图片。
三、防抖
在RxJava中封装了一系列的库,其中包含了对View操作的rxbinding。我们演示一个开发中经常遇到的例子,就是一个View在某个时间段内快速点击了很多次,我们只响应一次。
private void test3() {
RxView.clicks(tvDown).throttleFirst(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
Log.e(TAG, "onNext: 响应事件....." );
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
设置1S钟内只响应一次。
四、网络嵌套
在我们开发中经常会有这种需求,就是我们的业务是需要调用某个接口,然后拿到某个接口的数据再去调用某个接口,这样是一个嵌套的网络请求。当然还有可能嵌套很多层。就会给我们开发造成了比较大的麻烦。我们通过使用RxJava来实现一个网络嵌套请求。
我们的业务是通过查询一个分类列表接口,然后再通过分类里面的ID 再查询分类下的集合。
- 定义API
/**
* 查询分类数据
* @return
*/
@GET("project/tree/json")
Observable<ProjectData> getProjectData();
/**
* 根据分类查询列表数据
* @param page
* @param cid
* @return
*/
@GET("project/list/{page}/json?cid=294")
Observable<ItemData> getItemData(@Path("page") String page, @Query("cid") Integer cid)
- 封装Retrofit
package com.example.andoiddemo;
import android.util.Log;
import com.facebook.stetho.okhttp3.StethoInterceptor;
import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.gson.GsonConverterFactory;
public class RetrofitUtil {
final static String baseUrl = "https://www.wanandroid.com/";
static OkHttpClient okHttpClient = new OkHttpClient.Builder()
.addInterceptor(new HttpLoggingInterceptor(new HttpLoggingInterceptor.Logger() {
@Override
public void log(String message) { //访问网络请求,和服务端响应请求时。将数据拦截并输出
Log.e("RetrofitUtil", "log: " + message);
}
}).setLevel(HttpLoggingInterceptor.Level.BODY)) //Log等级
.connectTimeout(16 ,TimeUnit.SECONDS) //超时时间
.readTimeout(16, TimeUnit.SECONDS)
.writeTimeout(16, TimeUnit.SECONDS)
.addNetworkInterceptor(new StethoInterceptor())
.build();
static Retrofit retrofit = new Retrofit
.Builder()
.baseUrl(baseUrl)
.client(okHttpClient)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create()).build();
public static Retrofit getInstance() {
return retrofit;
}
}
- 封装实体数据
package com.example.andoiddemo;
import java.util.List;
public class ProjectData {
private List<Data> data;
private int errorCode;
private String errorMsg;
public void setData(List<Data> data) {
this.data = data;
}
public List<Data> getData() {
return this.data;
}
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
public int getErrorCode() {
return this.errorCode;
}
public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
public String getErrorMsg() {
return this.errorMsg;
}
static class Data {
private int courseId;
private int id;
private String name;
private int order;
private int parentChapterId;
private boolean userControlSetTop;
private int visible;
public void setCourseId(int courseId) {
this.courseId = courseId;
}
public int getCourseId() {
return this.courseId;
}
public void setId(int id) {
this.id = id;
}
public int getId() {
return this.id;
}
public void setName(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
public void setOrder(int order) {
this.order = order;
}
public int getOrder() {
return this.order;
}
public void setParentChapterId(int parentChapterId) {
this.parentChapterId = parentChapterId;
}
public int getParentChapterId() {
return this.parentChapterId;
}
public void setUserControlSetTop(boolean userControlSetTop) {
this.userControlSetTop = userControlSetTop;
}
public boolean getUserControlSetTop() {
return this.userControlSetTop;
}
public void setVisible(int visible) {
this.visible = visible;
}
public int getVisible() {
return this.visible;
}
}
}
package com.example.andoiddemo;
import java.util.List;
public class ItemData {
private Data data;
private int errorCode;
private String errorMsg;
public void setData(Data data) {
this.data = data;
}
public Data getData() {
return this.data;
}
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
public int getErrorCode() {
return this.errorCode;
}
public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
public String getErrorMsg() {
return this.errorMsg;
}
static class Datas {
private String apkLink;
private int audit;
private String author;
private boolean canEdit;
private int chapterId;
private String chapterName;
private boolean collect;
private int courseId;
private String desc;
private String descMd;
private String envelopePic;
private boolean fresh;
private String host;
private int id;
private String link;
private String niceDate;
private String niceShareDate;
private String origin;
private String prefix;
private String projectLink;
private String publishTime;
private String realSuperChapterId;
private String selfVisible;
private String shareDate;
private String shareUser;
private String superChapterId;
private String superChapterName;
private String title;
private int type;
private int userId;
private int visible;
private int zan;
public String getApkLink() {
return apkLink;
}
public void setApkLink(String apkLink) {
this.apkLink = apkLink;
}
public int getAudit() {
return audit;
}
public void setAudit(int audit) {
this.audit = audit;
}
public String getAuthor() {
return author;
}
public void setAuthor(String author) {
this.author = author;
}
public boolean isCanEdit() {
return canEdit;
}
public void setCanEdit(boolean canEdit) {
this.canEdit = canEdit;
}
public int getChapterId() {
return chapterId;
}
public void setChapterId(int chapterId) {
this.chapterId = chapterId;
}
public String getChapterName() {
return chapterName;
}
public void setChapterName(String chapterName) {
this.chapterName = chapterName;
}
public boolean isCollect() {
return collect;
}
public void setCollect(boolean collect) {
this.collect = collect;
}
public int getCourseId() {
return courseId;
}
public void setCourseId(int courseId) {
this.courseId = courseId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
public String getDescMd() {
return descMd;
}
public void setDescMd(String descMd) {
this.descMd = descMd;
}
public String getEnvelopePic() {
return envelopePic;
}
public void setEnvelopePic(String envelopePic) {
this.envelopePic = envelopePic;
}
public boolean isFresh() {
return fresh;
}
public void setFresh(boolean fresh) {
this.fresh = fresh;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getLink() {
return link;
}
public void setLink(String link) {
this.link = link;
}
public String getNiceDate() {
return niceDate;
}
public void setNiceDate(String niceDate) {
this.niceDate = niceDate;
}
public String getNiceShareDate() {
return niceShareDate;
}
public void setNiceShareDate(String niceShareDate) {
this.niceShareDate = niceShareDate;
}
public String getOrigin() {
return origin;
}
public void setOrigin(String origin) {
this.origin = origin;
}
public String getPrefix() {
return prefix;
}
public void setPrefix(String prefix) {
this.prefix = prefix;
}
public String getProjectLink() {
return projectLink;
}
public void setProjectLink(String projectLink) {
this.projectLink = projectLink;
}
public String getPublishTime() {
return publishTime;
}
public void setPublishTime(String publishTime) {
this.publishTime = publishTime;
}
public String getRealSuperChapterId() {
return realSuperChapterId;
}
public void setRealSuperChapterId(String realSuperChapterId) {
this.realSuperChapterId = realSuperChapterId;
}
public String getSelfVisible() {
return selfVisible;
}
public void setSelfVisible(String selfVisible) {
this.selfVisible = selfVisible;
}
public String getShareDate() {
return shareDate;
}
public void setShareDate(String shareDate) {
this.shareDate = shareDate;
}
public String getShareUser() {
return shareUser;
}
public void setShareUser(String shareUser) {
this.shareUser = shareUser;
}
public String getSuperChapterId() {
return superChapterId;
}
public void setSuperChapterId(String superChapterId) {
this.superChapterId = superChapterId;
}
public String getSuperChapterName() {
return superChapterName;
}
public void setSuperChapterName(String superChapterName) {
this.superChapterName = superChapterName;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public int getVisible() {
return visible;
}
public void setVisible(int visible) {
this.visible = visible;
}
public int getZan() {
return zan;
}
public void setZan(int zan) {
this.zan = zan;
}
}
static class Data {
private int curPage;
private List<Datas> datas;
private int offset;
private boolean over;
private int pageCount;
private int size;
private int total;
public void setCurPage(int curPage) {
this.curPage = curPage;
}
public int getCurPage() {
return this.curPage;
}
public void setDatas(List<Datas> datas) {
this.datas = datas;
}
public List<Datas> getDatas() {
return this.datas;
}
public void setOffset(int offset) {
this.offset = offset;
}
public int getOffset() {
return this.offset;
}
public void setOver(boolean over) {
this.over = over;
}
public boolean getOver() {
return this.over;
}
public void setPageCount(int pageCount) {
this.pageCount = pageCount;
}
public int getPageCount() {
return this.pageCount;
}
public void setSize(int size) {
this.size = size;
}
public int getSize() {
return this.size;
}
public void setTotal(int total) {
this.total = total;
}
public int getTotal() {
return this.total;
}
}
}
使用RxJava实现网络嵌套请求
/**
* 查询分类的列表,
* 再根据分类id查询分类下的集合
*/
private void test4() {
RetrofitUtil.getInstance().create(Api.class).getProjectData()
.flatMap(new Function<ProjectData, ObservableSource<ProjectData.Data>>() {
@Override
public ObservableSource<ProjectData.Data> apply(@NonNull ProjectData projectData) throws Exception {
return Observable.fromIterable(projectData.getData());
}
})
.flatMap(new Function<ProjectData.Data, ObservableSource<ItemData>>() {
@Override
public ObservableSource<ItemData> apply(@NonNull ProjectData.Data data) throws Exception {
return RetrofitUtil.getInstance().create(Api.class).getItemData("1", data.getId());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<ItemData>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG, "onSubscribe: ");
}
@Override
public void onNext(@NonNull ItemData itemData) {
Log.e(TAG, "onNext: "+itemData.getData().getDatas() );
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError: "+e);
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});
}
五、doOnNext的运用
doOnNext操作符,允许我们从起点发射事件到终点期间对事件做一些额外的处理,然后继续发射。例如我们发射一个长度为3的集合,在发送期间,我们先对这个集合做一个校验,然后继续发射
private void test6() {
List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
Observable.just(list)
.doOnNext(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
check(list);
}
}).flatMap(new Function<List<String>, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull List<String> strings) throws Exception {
return Observable.fromIterable(strings);
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
Log.e(TAG, "onNext: "+s );
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG, "onError: "+e);
}
@Override
public void onComplete() {
}
});
}
private void check(List<String> list) {
list.remove(0);
}
2021-06-08 16:49:21.485 26310-26310/com.example.andoiddemo E/MainActivity: onNext: 2
2021-06-08 16:49:21.485 26310-26310/com.example.andoiddemo E/MainActivity: onNext: 3
六、小结
以上我们介绍了RxJava的核心思想,RxJava的主要核心思想就是基于起点和终点的模式,每一个事件从起点输入到终点结束,起点和终点之间可以做相应的拦截处理。最后一道拦截处理的结果就是终点的结果。
又介绍了一些平时常用的开发场景防抖、配合Retrofit的使用、优雅的解决网络嵌套问题,以及doOnNext操作符的介绍。以上还使用到了map和flatmap操作符,他们都是可以对事件发射期间变换处理,然后发射变换之后的数据。map发射的数据是我们的具体的对象,而flatmap发射的数据还是Observable包装后的对象,flatmap可以发射多次。例如起点输入的是一个集合,可以通过flatmap发射多次,发射集合里的每一个元素。
网友评论