Retrofit
Retrofit的本质


准确来说,Retrofit只是负责对网络请求接口的封装,真正的网络请求工作还是由OkHttp完成的,App应用程序通过Retrofit请求网络,实际上是使用Retrofit接口层封装请求参数、Header、URL等信息,之后由OkHttp完成后续的请求操作,在服务端返回数据后,OkHtttp将原始的数据结果交给Retrofit,Retrofit根据用户的需求对结果进行解析。
Retrofit和其他网络请求库的区别

Retrofit的使用
简单来讲,一共分为6步:
步骤1:添加Retrofit库的依赖;
步骤2:创建接收服务器返回数据的类;
步骤3:创建用于描述网络请求的接口
(采用注解描述网络请求参数和配置网络请求参数);
步骤4:创建Retrofit实例;
步骤5:创建网络请求接口实例并配置网络请求参数;
步骤6:发送网络请求
步骤一:添加Retrofit库的依赖
//步骤1.添加依赖和权限
compile 'com.squareup.retrofit2:retrofit:2.0.2'
并添加网络权限
<uses-permission android:name="android.permission.INTERNET" />
步骤二:创建接收服务器返回数据的类
public class WeatherObj {
private String date;
private String message;
private int status;
private String city;
private int count;
private WeatherBean data;
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public WeatherBean getData() {
return data;
}
public void setData(WeatherBean data) {
this.data = data;
}
public class WeatherBean {
private String shidu;
private int pm25;
private int pm10;
private String quality;
private String wendu;
private String ganmao;
private YesterdayBean yesterday;
public String getShidu() {
return shidu;
}
public void setShidu(String shidu) {
this.shidu = shidu;
}
public int getPm25() {
return pm25;
}
public void setPm25(int pm25) {
this.pm25 = pm25;
}
public int getPm10() {
return pm10;
}
public void setPm10(int pm10) {
this.pm10 = pm10;
}
public String getQuality() {
return quality;
}
public void setQuality(String quality) {
this.quality = quality;
}
public String getWendu() {
return wendu;
}
public void setWendu(String wendu) {
this.wendu = wendu;
}
public String getGanmao() {
return ganmao;
}
public void setGanmao(String ganmao) {
this.ganmao = ganmao;
}
public YesterdayBean getYesterday() {
return yesterday;
}
public void setYesterday(YesterdayBean yesterday) {
this.yesterday = yesterday;
}
public List<ForecastBean> getForecast() {
return forecast;
}
public void setForecast(List<ForecastBean> forecast) {
this.forecast = forecast;
}
private List<ForecastBean> forecast;
public class YesterdayBean {
private String date;
private String sunrise;
private String high;
private String low;
private String sunset;
private int aqi;
private String fx;
private String fl;
private String type;
private String notice;
}
public class ForecastBean {
private String date;
private String sunrise;
private String high;
private String low;
private String sunset;
private int aqi;
private String fx;
private String fl;
private String type;
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getSunrise() {
return sunrise;
}
public void setSunrise(String sunrise) {
this.sunrise = sunrise;
}
public String getHigh() {
return high;
}
public void setHigh(String high) {
this.high = high;
}
public String getLow() {
return low;
}
public void setLow(String low) {
this.low = low;
}
public String getSunset() {
return sunset;
}
public void setSunset(String sunset) {
this.sunset = sunset;
}
public int getAqi() {
return aqi;
}
public void setAqi(int aqi) {
this.aqi = aqi;
}
public String getFx() {
return fx;
}
public void setFx(String fx) {
this.fx = fx;
}
public String getFl() {
return fl;
}
public void setFl(String fl) {
this.fl = fl;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getNotice() {
return notice;
}
public void setNotice(String notice) {
this.notice = notice;
}
private String notice;
}
}
}
(其实我也不想贴这么多代码,可能是因为我撒。。。刚写了一个listview,又出错,一碰到适配器就蒙比扎毛出冷汗,我大概是真傻。。。)
步骤三:创建用于描述网络请求的接口
Retrofit将Http请求抽象成Java接口:采用注解描述网络请求参数和配置网络请求参数
注:
1. 用动态代理动态的将该接口的注解“翻译”成一个Http请求,最后再执行Http请求;
2. 接口中的每个方法的参数都需要使用注解标注,否则会报错。
public interface GetWeather_Interface {
@GET("json.shtml?city=上海")
Call<WeatherObj> getCall();
}
@GET注解的作用表示采用GET方法进行网络请求;
getCall()表示接收网络请求数据的方法,在activiity中调用,请求成功和请求失败后的操作处理(一会看代码)
接下来:知识点!知识点!知识点!
注解类型

网络请求方法

其中@GET,@POST,@PUT,@DELETE,@HEAD代表了HTTP请求中的网络请求方式
可以看到@GET括号里面代表的是URL的一部分,在Retrofit中把网络请求的URL分成了两部分设置,一部分在网络请求的接口的注释中设置,一部分在创建Retrofit实例时通过.baseUrl设置。网络请求的完整Url=在创建Retrofit实例时通过baseUrl()设置+网络请求接口的注解设置,具体整合规则如下:

@HTTP的作用:替换@GET,@POST,@PUT,@DELETE,@HEAD注解的作用及更多功能扩展,使用时通过属性method、path、hasBody进行设置。

标记

@FormUrlEncoded表示发送form-encoded的数据,每个键值对需要用@Filed来注解键名,随后的对象需要提供值。
@Multipart表示发送form-encoded的数据(适用于有文件上传的场景),每个键值对需要用@Part来注解键名,随后的对象需要提供值
(举个栗子,不是我写的栗子)

在Activity中:

网络请求参数

1.@Header&Headers
作用:添加请求头,作用于方法的参数&添加不固定的请求头,作用于方法
(其实这个地方我不懂)

2.@Body
作用:以POST方式传递自定义数据类型给服务器
注意:如果要提交的是一个Map,那么作用相当于 @Field ,不过Map要经过FormBody.Builder类处理成为符合OkHttp格式的表单。
3.@Field&FieldMap
作用:发送Post请求时提交请求的表单字段
使用时与@FormUrlEncoded注解配合使用

在Activity中使用

4.@Part&@PartMap
作用:发送Post请求时提交请求的表单字段
与@Filed的区别:功能相同,但携带的参数类型更加丰富,包括数据流,所以适用于有文件上传的场景,使用时雨@Multipart注解配合使用
5.@Query和@QueryMap
作用:用于@GET方法的查询参数(Query=Url中’?’后面的key-value)
6.@Path
作用:URL地址的缺省值
7.@Url
作用:直接传入一个请求的URL变量,用于URL设置

(总是有一种跑题的赶脚,已经不知道初衷是啥了,初衷是,下面该写步骤四了)
步骤四:创建Retrofitz实例
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://www.sojson.com/open/api/weather/")
.addConverterFactory(GsonConverterFactory.create())
.build();
关于这个地方,可以看源码学习下,一会再写,等我把下面的步骤写完!
1. 数据解析器(Converter)
Retrofit支持多种数据解析方式,使用时添加依赖即可:

2. 网络请求适配器(CallAdapter)
Retrofit支持多种网络请求适配器方式:guava、Java8和RxJava,使用时添加依赖即可

步骤五:创建请求接口实例
GetWeather_Interface request = retrofit.create(GetWeather_Interface.class);
步骤六:发送网络请求
Call<WeatherObj> call = request.getCall();
call.enqueue(new Callback<WeatherObj>() {
@Override
public void onResponse(Call<WeatherObj> call, Response<WeatherObj> response) {
String city = response.body().getCity();
// String ganmao = response.body().getData().getGanmao();
// String date = response.body().getData().getForecast().get(1).getDate();
forecastdata = response.body().getData().getForecast();
listview.setAdapter(new WeatherAdapter(context,forecastdata));
Log.e("eee",city);
// Log.e("eee",ganmao);
// Log.e("eee",date);
}
@Override
public void onFailure(Call<WeatherObj> call, Throwable t) {
Log.e("eee","连接失败");
}
});
onResponse()方法里德response.body()就是请求成功后获取到的返回数据。
至此,Retrofit进行网络请求的6个步骤就讲完了,搞事情,看看源码。。。
Retrofit的源码分析
一般网络通信的过程如下:

Retrofit的本质和一般网络请求的过程是一样的,只是Retrofit通过使用大量的设计模式进行功能模块的解耦,使得上面的过程进行的更加简单和流畅。如下图(没错我是偷得图):

可以这样解释各个阶段:
1. 通过解析网络请求接口的注解配置网络请求参数;
2. 通过动态代理生成网络请求对象;
3. 通过网络请求适配器将网络请求对象进行平台适配;
4. 通过网络请求执行器发送网络请求;
5. 通过数据转换器解析服务器返回的数据;
6. 通过回调执行器切换线程(从子线程切回主线程);
7. 通过主线程处理返回的结果。

还记得Retrofit实例是怎么创建的么,复习一下:
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fanyi.youdao.com/")
.addConverterFactory(GsonConverterFactory.create())
.build();
网上有人说。。。Retrofit实例时使用创造者模式通过Builder类进行创建的,那么我要问了,啥叫建造者模式呢?
网上又说,建造者模式:将一个复杂对象的构建于表示分离,使得用户在不知道对象的创建细节情况下就可以直接创建复杂的对象。(恩,我还是不懂,这是个知识点,设计模式,待整理)

步骤一



成功建立一个Retrofit对象的标准是配置好Retrofit类里德成员变量,也即配置好上面画框框的变量,他们依次表示:
serviceMethod:包含所有网络请求信息的对象
baseUrl:网络请求的url地址
callFactory:网络请求工厂
adpterFactories:网络请求适配器工厂的集合
converterFactories:数据转换器工厂的集合
callbackExecutor:回调方法执行器
前面两个参数不用多介绍,xxxFactory表示的是设计模式中工厂模式的体现:将“类实例化的操作”与“使用对象的操作”分开,使得使用者不用知道具体参数就可以实例化出所需要的“产品”类。(恩还是不懂,还是那个知识点,整理整理)
callAdapter表示网络请求执行器的适配器,默认是OkHttpCall,在Retrofit中提供了四种CallAdapterFactory:ExecutorCallAdapterFactory(默认)、GuavaCallAdapterFactory、Java8CallAdapterFactory、RxJavaCallAdapterFactory。作用:将默认的网络请求执行器(OkHttpCall)转换成适合被不同平台来调用的网络请求执行器模式。比如,RxJava不需要Handler来切换线程,想要使用RxJava就的使用RxJavaCallAdapterFactoryCallAdapter将OkHttpCall转换成RxJava(Scheduler)。(我们后面将Rxjava的时候再看这是啥意思)
步骤二

来看下Builder类的源码,把这个类看完,接下来的步骤也就看完了。。。所以,我突然间不想写小标题了,直接下源码下面分析吧。。。

执行了Builder()的构造方法之后,我们看到里面执行了Platform.get()方法,这个方法具体做了些啥看下面:

将findPlatForm()赋值给PlatFrom,从findPlatform()方法中可以看到,设置了Android平台作为默认平台




步骤三


这一步很简单,就是将传入的String类型url转化为适合OKHttp的HttpUrl类型的url
步骤四

括号里面的是创建了一个含有Gson对象实例的GsonConverterFactory返回给addConverterFactory()方法,那就来看一下这个方法的源码:

Retrofit是支持多种数据解析器的,在上面写过了,所以若用其他解析方式,可以通过自定义数据解析器来实现(必须继承Converter.Factory),就是上面括号里面的东西需要自定义。
步骤五



可以看到,步骤五就是将前面所有步骤中设置的变量配置完毕,成功创建Retrofit实例。
执行网络请求
这个地方就不看源码了,看下实现步骤。
Retrofit默认使用OKHttp,即OKHttpCall类进行网络请求,OKHttpCall提供了两种网络请求方式:
同步请求:OKHttpCall.execute()
异步请求:OKHttpCall.enqueue()
同步请求
1. 对网络请求接口的方法中的每个参数利用对应ParameterHandler进行解析,再根据ServiceMethod对象创建一个OKHttp的Request对象;
2. 使用OKHttp的Request发送网络请求;
3. 对返回的数据使用之前设置的数据转换器进行解析,最终得到一个Response<T>对象。
异步请求
1. 对网络请求接口的方法中的每个参数利用对应ParameterHandler进行解析,再根据ServiceMethod对象创建一个OKHttp的Request对象;.
2. 使用OKHttp的Request发送网络请求;
3. 对返回的数据使用之前设置的数据转换器进行解析,最终得到一个Response<T>对象;
4. 进行线程切换从而在主线程处理返回的数据结果(如果使用了RxJava,则直接回调到主线程)
可以看到,两者的区别在于:异步请求会将回调方法交给回调执行器在指定的线程中执行。
RxJava
有的没的简介
定义:RxJava是一个基于事件流、实现异步操作的库
作用:实现异步操作

原理:基于一种扩展的观察者模式
其中有4个角色:

被观察者(Observable)通过订阅(Subscribe)按顺序发送事件给观察者(Observer),观察者(Observer)按顺序接收事件并对应的相应动作。
步骤:
1. 创建被观察者并产生事件;
2. 创建观察者并定义响应事件行为;
3. 通过订阅连接观察者和被观察者。
首先添加依赖:
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
基于事件流的链式调用方法实现RxJava
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("eee", "开始采用subscribe连接");
}
@Override
public void onNext(Integer value) {
Log.e("eee", "对事件"+value+"进行响应");
}
@Override
public void onError(Throwable e) {
Log.e("eee", "对onError事件作出响应");
}
@Override
public void onComplete() {
Log.e("eee", "对Complete事件作出响应");
}
});
控制台输出数据如下:
03-23 10:19:23.462 3005-3005/com.example.xfqlu.gesturepassword E/eee: 开始采用subscribe连接
03-23 10:19:23.462 3005-3005/com.example.xfqlu.gesturepassword E/eee: 对事件1进行响应
03-23 10:19:23.462 3005-3005/com.example.xfqlu.gesturepassword E/eee: 对事件2进行响应
03-23 10:19:23.462 3005-3005/com.example.xfqlu.gesturepassword E/eee: 对事件3进行响应
03-23 10:19:23.462 3005-3005/com.example.xfqlu.gesturepassword E/eee: 对Complete事件作出响应
RxJava有很多操作符类型,很重要,这里先不展开,因为昨天刚看了Retrofit,这里想讲一下Retrofit和RxJava的联合使用。
Retrofit和RxJava基础案例
Retrofit除了提供传统的网络请求方式之外,还提供RxJava版本的网络请求方式。区别在于:传统方式采用CallBack接口,而RxJava方式采用了Observable接口,不同点主要体现在:
1. 用于描述网络请求的接口的设置;
2. 网络请求的封装形式和发送形式不同。
下面主要看下这两个的区别
不同点一:
传统方式的Call<>接口
public interface GetRequest_Interface {
@GET("ajax.php?a=fy&f=auto&t=auto&w=hello%20world")
Call<JSTranslation> getCall();
}
使用RxJava后的接口形式Observable<>
public interface GetRxJava_Interface {
@GET("ajax.php?a=fy&f=auto&t=auto&w=hello%20world")
Observable<JSTranslation> getCall();
}
不同点二:
传统的方式进行网络请求
//步骤4.创建Retrofit对象
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/")
.addConverterFactory(GsonConverterFactory.create())
.build();
//步骤5.创建网络请求接口实例
GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);
//对发送请求进行封装,采用Call<>方式
Call<JSTranslation> call = request.getCall();
//步骤6.发送网络请求
call.enqueue(new Callback<JSTranslation>() {
@Override
public void onResponse(Call<JSTranslation> call, Response<JSTranslation> response) {
response.body().show();
}
@Override
public void onFailure(Call<JSTranslation> call, Throwable t) {
Log.e("eee","连接失败");
}
});
使用RxJava进行网络请求(记得添加依赖,添加支持RXJava)
//创建Retrofit对象
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
//创建网络请求接口实例
GetRxJava_Interface request = retrofit.create(GetRxJava_Interface.class);
//对网络请求进行封装,采用Observable<>方式
Observable<JSTranslation> observable = request.getCall();
//发送网络请求(异步),这里是最大的不同之处
observable.subscribeOn(Schedulers.io()) //在IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread())//回到主线程处理请求结果
.subscribe(new Observer<JSTranslation>() {
@Override
public void onSubscribe(Disposable d) {
//初始化工作
}
@Override
public void onNext(JSTranslation value) {
value.show();
//对返回结果JSTranslation进行处理
}
@Override
public void onError(Throwable e) {
//请求失败
}
@Override
public void onComplete() {
//请求成功
}
});
后面还有很多实际开发的案例,我们继续学习!
RxJava操作符
创建操作符

作用:创建被观察者(Observable)对象并发送事件
基本创建create()
完整的创建被观察者对象
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).subscribe(new Observer<Integer>() {。。。});
快速创建
just()
快速创建被观察者;
直接发送传入的事件(最多只能发送10个事件)
//快速创建10个以下的事件
Observable.just(1,2,3,4,5,6,7,8,9,10)
.subscribe(new Observer<Integer>() {。。。}
just()的参数个数最多为10个
fromArray()
快速创建被观察者;
直接发送传入的数组数据(对事件个数不限制)。
Integer [] items = {1,2,3,4,5};
Observable.fromArray(items)
.subscribe(new Observer<Integer>() {。。。}
fromIterable()
快速创建被观察者;
直接发送传入的集合数据(对时间个数不限制)
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list)
.subscribe(new Observer<Integer>() {。。。}
firstElement()
在组合事件中,取出第一个事件进行判断,若返回的是结束事件onComplete()则表示第一个事件为无效事件,接着取出第二个事件执行,若返回的是onNext()则表示是有效事件,当firstElement()已经发出一个有效事件,则停止判断不在进行下去。
延迟创建
defer()
直到有观察者(Observer)订阅时,才动态创建被观察者对象(Observable),并发送事件
Integer i = 11;
private void rxdelay() {
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(i);
}
});
i = 22;
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.e("eee",value+"");
//这里接收到的value值为22
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
timer()
快速创建被观察者对象;
延迟指定时间后,发送1个数值0(一般用于检测)
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long value) {
Log.e("eee",value+"");
//延时指定时间后,发送一个数值0
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
interval()
快速创建一个被观察者对象;
每隔指定时间就发送事件
intervalRange()
快速创建被观察者;
每隔指定时间就发送事件,可指定发送的事件数量
(类似于interval(),但可以指定发送的事件的数量)

range()
快速创建被观察者;
连续发送一个事件序列,可以指定范围
(类似于intervalRange(),区别在于不可以设置延迟时间发送事件)

rangeLong()
类似于range(),区别在于支持的数据类型为long型
头痛。。。
举个栗子,其实我发现,这就像在背公式一样,你要记住这些标签都能干嘛,都有哪些参数,一步一步都是怎么进行设置,就是设置的东西太多,也有可能是现在还不够熟练,总是记不住下一步要配置什么,吧。。。
//第一次延迟3秒进行网络请求,后面每隔2秒进行一次轮询
//每次轮训都会在onNext()方法中产生一个数字,从0开始递增1,无限个
//这里onNext()产生的数字是在下面和(ddddddd)一起输出的,在accept()方法中的aLong也是这个数字
Observable.interval(3,2, TimeUnit.SECONDS)
//在执行onNext()方法之前先执行这个doOnNext()方法,所以我们将网络请求放在这里进行
.doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("eee","第"+aLong+"次轮询");
//这边的代码是不是很熟悉,是的,写了N次了,可我还是记不住。。。
//创建实例,巴拉巴拉巴拉。。。
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
//封装接口,巴拉巴拉巴拉。。。
GetRxJava_Interface request = retrofit.create(GetRxJava_Interface.class);
Observable<JSTranslation> observable = request.getCall();
//发送请求,巴拉巴拉巴拉。。。
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<JSTranslation>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(JSTranslation value) {
value.show();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long value) {
//从控制台可以看到,这一句是在doOnNext()后输出的
Log.e("eee","ddddddddd"+value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});


变换操作符

对事件序列中的事件或者整个事件序列进行加工处理(即变换),使得其转变成不同的事件。
Map()
对被观察者发送的每个事件都通过指定的函数处理,从而变换成另一种事件。(可以进行数据类型装换)
FlatMap()
将被观察者发送的事件序列进行拆分和数据转换,再合并成一个新的事件序列,最后再进行发送。(新合并生成的事件序列顺序是无序的)
ConcatMap()
和FlatMap()类似,区别在于:拆分并重新合并生成的事件序列的顺序=被观察者旧序列生产的顺序。
Buffer()
定期从被观察者需要发送的事件中获取一定数量的事件并放到缓存区,最终发送。
实例:网络请求嵌套回调
public interface GetRxJava_Interface {
@GET("ajax.php?a=fy&f=en&t=zh&w=hi%20register")
Observable<JSTranslation> register();
@GET("ajax.php?a=fy&f=en&t=zh&w=hi%20login")
Observable<JSTranslation> login();
}
public class JSTranslation {
private int status;
private content content;
private static class content{
private String from;
private String to;
private String vendor;
private String out;
private int err_no;
}
public void show(){
Log.e("eee","翻译的内容"+content.out);
}
}
Retrofit retrofit = new Retrofit.Builder()
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.baseUrl("http://fy.iciba.com/")
.build();
GetRxJava_Interface request = retrofit.create(GetRxJava_Interface.class);
Observable<JSTranslation> register = request.register();
final Observable<JSTranslation> login = request.login();
register.subscribeOn(Schedulers.io()) //初始被观察者,切换至IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) //新观察者,切换至主线程处理网络请求结果
.doOnNext(new Consumer<JSTranslation>() {
@Override
public void accept(JSTranslation jsTranslation) throws Exception {
jsTranslation.show();
}
})
.observeOn(Schedulers.io()) //新的被观察者,也就是上面的新观察者,
// 因为flatMap是对初始被观察者做变换,
//所以对于初始被观察者而言,它是新观察者,所以用observeOn切换线程
.flatMap(new Function<JSTranslation, ObservableSource<JSTranslation>>() {
@Override
public ObservableSource<JSTranslation> apply(JSTranslation jsTranslation) throws Exception {
//将网络请求1换成网络请求2
return login;
}
})
.observeOn(AndroidSchedulers.mainThread()) //切换到主线程,处理网络请求2的结果
.subscribe(new Consumer<JSTranslation>() {
@Override
public void accept(JSTranslation jsTranslation) throws Exception {
jsTranslation.show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("eee","登录失败");
}
});
这里有一个知识点,线程切换
线程切换(subscribeOn()/observeOn())
对于一般的需求场景,需要在子线程中实现耗时操作,然后回到主线程中实现UI操作,在RxJava中可以这样理解:
1. 被观察者(Observable)在子线程中生产事件(如耗时操作等)
2. 观察者(Observer)在主线程中接受并响应事件(如实现UI操作等)
为了实现真正的异步操作,需要对RxJava进行线程控制。采用RxJava内置的线程调度器(Scheduler),即通过功能性操作符subsrcibeOn()和observeOn()实现。
线程类型:

Observable.subscribeOn(Schedulers.Thread):指定被观察者发送事件的线程
Observable.observeOn(Schedulers.Thread):指定观察者接受或响应事件的线程
所以要先分清是观察者还是被观察者!
需要注意的地方:
1. 若Observable.subscribeOn()多次指定被观察者生产事件的线程,则只有第一次指定的有效,其余的指定均无效。
2. 若Observable.observeOn()多次指定观察者接收或响应事件的线程,则每次指定均有效,即每指定一次,就会进行一次线程的切换。
组合/合并操作符

concat()/concatArray()
组合多个被观察者一起发送数据,合并后按发送顺序串行执行
concat()组合被观察者数量≤4个,concatArray()对被观察者数量不限制。
merge()/mergeArray()
组合多个被观察者一起发送数据,合并后按时间线并行执行。
merge()组合被观察者数量≤4个,mergeArray()对被观察者数量不限制。
concatDelayError()/mergeDelayError()
在使用concat()和merge()操作符时,若其中一个被观察者发出onError事件,则会马上终止其他被观察者继续发送事件,若希望onError事件推迟到其他被观察者发送事件结束后再触发,则需要使用对应的concatDelayError()或mergeDelayError()操作符。
zip()
合并多个被观察者发送的事件,生成一个新的事件序列,并最终发送。
1. 事件组合方式:严格按照原先事件序列进行对位合并;
2. 最终合并的事件数量=多个被观察者中数量最少的数量。
比如被观察者1发出事件为1,2,3,被观察者2发出的事件为A,B,C,D,则最后组合事件为1A,2B,3C,D
实例:合并数据源
从不同网络请求中获取数据源,并合并显示
public interface GetRxJava_Interface {
@GET("ajax.php?a=fy&f=en&t=zh")
Observable<JSTranslation> test(@Query("w") String w);
}
分别向上面的接口传入不同的数据进行翻译,一个China一个May
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
GetRxJava_Interface request = retrofit.create(GetRxJava_Interface.class);
Observable<JSTranslation> observable1 = request.test("hi%20China").subscribeOn(Schedulers.io());
Observable<JSTranslation> observable2 = request.test("hi%20May").subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<JSTranslation, JSTranslation, String>() { //第三个参数表示最终输出的数据格式
@Override
public String apply(JSTranslation jsTranslation, JSTranslation jsTranslation2) throws Exception {
return jsTranslation.show()+"***"+jsTranslation2.show();
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("eee", s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("eee","接收数据失败");
}
});

combineLatest()
两个Observables发送数据,将先发送数据的Observables的最后一个数据与另一个Observable发送的每个数据结合。与zip()的区别,zip是1对1合并,combineLatest()是按时间合并。
Observable.combineLatest(Observable.just(1l, 2l, 3l), Observable.just(4l, 5l, 6l),
new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long aLong, Long aLong2) throws Exception {
Log.e("eee","合并的数据是:"+aLong+"和"+aLong2);
return aLong+aLong2;
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("eee","合并的结果是:"+aLong);
}
});

实例:联合判断
Observable<CharSequence> edi1Observable = RxTextView.textChanges(edit1).skip(1);
Observable<CharSequence> edi2Observable = RxTextView.textChanges(edit2).skip(1);
Observable<CharSequence> edi3Observable = RxTextView.textChanges(edit3).skip(1);
Observable.combineLatest(edi1Observable, edi2Observable, edi3Observable,
new Function3<CharSequence, CharSequence, CharSequence, Boolean>() {
@Override
public Boolean apply(CharSequence charSequence, CharSequence charSequence2, CharSequence charSequence3) throws Exception {
boolean isedit1 = !TextUtils.isEmpty(edit1.getText());
boolean isedit2 = !TextUtils.isEmpty(edit2.getText());
boolean isedit3 = !TextUtils.isEmpty(edit3.getText());
return isedit1&&isedit2&&isedit3;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
commit.setEnabled(aBoolean);
}
});
combineLatestDelayError()
作用类似于concatDelayError()/mergeDelayError(),错误处理
reduce()
把被观察者需要发送的事件聚合成一个事件并发送,前两个数据聚合,然后与后一个数据继续进行聚合。
collect()
将被观察者发送的数据事件收集到一个数据结构里
startWith()/startWithArray()
在一个被观察者发送时间前,追加发送一些数据/一个新的被观察者
count()
统计被观察者发送事件的数量
过滤操作符

根据指定条件过滤

filter()
过滤特定条件的事件
(过滤的事件都是满足filter()条件的)
ofType()
过滤特定数据类型的数据
skip()/skipLast()
跳过某个事件
(两种用法)

distinct()/distinctUntilChanged()
过滤事件序列中重复的事件/连续重复的事件
根据指定事件数量过滤事件
take()
指定观察者最多能接收到的事件数量
takeLast()
指定观察者只能接收到的被观察者发送的最后几个事件
根据指定时间过滤事件

throttleFirst()/throttleLast()
在某段事件内,只发送该段时间内第一次事件/最后一次事件
sample()
在某段时间内,只发送该段时间内最后一次事件
(与throttleLast()操作符类似)
throttleWithTimeout()/debounce()
发送数据事件时,若后一次发送的事件间隔<指定时间,就会丢弃前一次的数据,若后一次发送的事件间隔>指定时间,则会发送上一次的事件。
根据指定事件位置过滤

firstElement()/lastElement()
仅选取第一个元素/最后一个元素
elementAt()
指定接收某个元素(通过索引值确定)
允许越界,即获取的位置索引>发送事件序列长度,此时可以设置默认参数
elementAtOrError()
在elementAt()的基础上,当出现越界情况时抛出异常
实例:防抖
纸机写的,不知道对不对,难过。。。给自己梳理一下
我好像还没说这个是要实现什么哈:就是点击按钮,倒计时,并请求网络,在倒计时期间不管有几次点击事件,只请求一次网络。
1.首先定义按键bt
2.一个知识点RxBinding(这里先不展开,总之就是它是对view事件的扩展,使用它可以对View进行RxJava的一系列操作,恩,看着更装比一点。。。)
RxView.clicks(bt)
.subscribeOn(AndroidSchedulers.mainThread())
//这个地方的意思就是不管60秒内有几次点击事件,只取第一次的事件
.throttleFirst(60,TimeUnit.SECONDS)
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
//这个地方就是跟以前一样的网络请求了
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://fy.iciba.com/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
GetRxJava_Interface request = retrofit.create(GetRxJava_Interface.class);
Observable<JSTranslation> login = request.login();
login.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<JSTranslation>() {
@Override
public void accept(JSTranslation jsTranslation) throws Exception {
jsTranslation.show();
}
});
//这个地方是我不确定的地方,我又重新声明了一个被观察者进行倒计时的操作,这样对不?总觉得RXjava的本意应该和上面网络请求的事件写在一条链上,但,我不会。。。。
Observable.interval(1, TimeUnit.SECONDS, AndroidSchedulers.mainThread())
.take(60)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long value) {
bt.setText(60-value-1+"S后可重新发送");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
bt.setText("点击并发送");
}
});
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
条件/布尔操作符

all()
判断发送的每项数据是否都满足,若满足,返回true,否则,返回false
takeWhile()
判断发送的每项数据是否满足设置的函数条件,若发送的数据满足该条件,则发送该项数据,否则不发送
skipWhile()
判断发送的每项数据是否满足设置的函数条件,直到该判断条件=false时,才开始发送被观察者的数据,也就是当条件不满足时才发送被观察者
takeUntil()
执行到某个条件时,停止发送事件
skipUntil()
直到传入的被观察者开始发送数据,第一个被观察者才开始发送数据(我是这么认为的,相当于延时)
SequenceEqual()
判断两个被观察者需要发送的数据是否相同,相同返回true,不同返回false
contains()
判断发送的数据中是否包含指定的数据
isEmpty()
判断发送的数据是否为空,若为空,则返回true,否则返回false
amb()
当需要发送多个被观察者时,只会发送先发送数据的被观察者的数据,而其余被观察者将被丢弃。
defaultlfEmpty()
在不发送任何有效事件、仅发送了Complete事件的前提下,发送一个默认值
功能性操作符

subscribe()
连接观察者和被观察者
subscribeOn()/observeOn()
线程切换(subscribeOn()/observeOn())
delay()
被观察者延迟一段时间再发送事件
do()

错误处理

onErrorReturn()
遇到错误时,发送一个特殊事件&正常终止
onErrorResumeNext()
遇到错误时,发送一个新的Observable
注:
当拦截的错误=Throwable,若需要拦截Exception,用onExceptionResumeNext()
当拦截的错误=Exception,则将错误传递给观察者的onError方法
onExceptionResumeNext()
遇到错误时发送一个新的Observable
注:
当拦截的错误=Exception,若需要拦截Throwable,用onErrorResumeNext()
当拦截的错误=Throwable,则将错误传递给观察者的onError()方法
retry()
当出现错误时,让被观察者重新发送数据

retryUntil()
出现错误后,判断是否需要重新发送数据
类似于retry(Predicate predicate) 区别在返回true则不重新发送数据事件
retryWhen()
当遇到错误时,将发生的错误传递给一个新的被观察者,并决定是否需要重新订阅原始被观察者&发送事件
重复发送
repeat()
无条件地、重复发送被观察者事件
repeatWhen()
有条件地、重复发送被观察者事件
若新被观察者返回一个complete/error事件,则不重新订阅&发送原来的被观察者;
若新被观察者返回其余事件时,则重新订阅&发送原来的被观察者
背压策略
啥意思类,就是在异步订阅关系中,会存在被观察者发送事件的速度与观察者接收事件的速度不匹配的情况,大多数情况下主要是被观察者发送事件的速度>观察者接收事件的速度,从而导致观察者无法及时响应/处理所有发送过来的事件,最终导致缓存区溢出,事件丢失,OOM。
具体实现:
被观察者定义为Flowable,观察者定义为Subscriber,在使用上,多了背压的功能而已。
控制观察者接收事件的数量

控制被观察者发送事件的数量

在异步订阅中,不能直接使用这种方法设置被观察者发送事件的个数,要通过RxJava内部固定调用被观察者线程中的request(n)从而反向控制被观察者的发送事件速度。
背压策略模式
BackpressureStrategy.ERROR
直接抛出异常MissingBackpressureException
BackpressureStrategy.MISSING
有好提示:缓存区满了
BackpressureStrategy.BUFFER
将缓存区大小设置成无限大
BackpressureStrategy.DROP
超过缓存区大小(128)的事件丢弃
BackpressureStrategy.LATEST
只保存最后的事件,超过缓存区大小(128)的事件丢弃
RxJava源码分析
这个标题我写了又删,删了又写,MMP,看不懂!!!简单写一下基本流程源码。。。。
使用流程很清楚,这一个周就敲这个了,还是贴张图来

首先调用Observable.create()创建一个被观察者Observable,同时创建一个OnSubscribe作为create()方法的入参,接着创建一个观察者Subscribe,然后通过subseribe()实现两者的订阅关系,好啦,写完啦,哈哈哈,开玩笑,站着说话不腰疼。。。。
1.Observable.create()


问题来了,这个构造函数的参数是个啥:RxJavaHooks.onCreate(f)

看到啦,返回结果就是入参。
结论:Observable.create()方法构造了一个被观察者对象,同时将new出来的OnSubscribe赋值给了该Observable的成员变量OnSubscribe。
2.Subscriber源码


Subscriber实现了Subscription接口,从而提供了isUnsubscribed()和unsubscribe()方法,前者用于判断是否已经取消订阅;后者用于将订阅事件列表中的所有Subscription取消订阅,并不再接收观察者发送的后续事件。
3. subscribe()源码

画黄线的部分对subscriber进行了包装SafeSubscriber,它其实是subscriber的一个代理,保证了onCompleted()和onError()只会有一个被执行并只执行一次,一旦它们其中之一被执行,onNext()方法就不再执行了。那么划红线的地方是啥,和上面一样,它返回的是它的第二个入参observable.onSubscribe,也就是当前observable的成员变量onSubscribe,没错就是前面Observable.create()创建的,所以这行代码可以简化为onSubscribe.call(subscriber)。
最后我想说,那几十种操作符的源码,见鬼去吧。。。。。。。。。。。
网友评论