1、RxJava的重要组成
Observable (可观察者,即被观察者)、Observer (观察者)、subscribe (订阅)、事件。
Observable的生命周期中有三个重要的事件,onNext(检索数据)、onCompleted(完成)和onError(发现错误)。
Subscriber:也是一种观察者,在2.0中 它与Observer没什么实质的区别,不同的是 Subscriber要与Flowable(也是一种被观察者)联合使用。
Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable
RxJava三部曲:初始化一个Observable,初始化一个Observer,建立订阅关系
1.1 Observable创建
Observable<T> create(ObservableOnSubscribe<T> source)
代码效果如下:
@Test
public void testObservable() {
System.out.println("---------testObservale----------");
// RxJava1的写法,本次使用的是Rxjava2依赖
//Observable<Integer> observable = Observable.create(new Observable.Onsubscrible<Integer>(){});
//创建一个Observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
// 创建Observable时,回调的是ObservableEmitter,字面意思即发射器,用于发射数据(onNext)和通知(onError/onComplete)
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 5; i++) {
e.onNext(i);
}
e.onComplete();
}
});
//创建一个Observer
Observer<Integer> observer = new Observer<Integer>() {
/**
* RxJava 2.0 中新增的,传递参数为Disposable ,Disposable 相当于
* RxJava1.x中的Subscription,用于解除订阅。
*/
@Override
public void onSubscribe(Disposable d) {
System.out.println("---------onSubscribe:" + d);
}
@Override
public void onNext(Integer integer) {
// 收到数据
System.out.println("---------onNext:" + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("---------onError----------");
}
@Override
public void onComplete() {
System.out.println("---------onComplete----------");
}
};
// 建立连接(订阅)
observable.subscribe(observer);
}
运行结果如下:
---------testObservale----------
---------onSubscribe:null
---------onNext:0
---------onNext:1
---------onNext:2
---------onNext:3
---------onNext:4
---------onComplete----------
1.2 Observable的其他创建方式
1.2.1 fromIterable()
使用fromIterable(),遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item。
代码效果如下:
@Test
public void testObservable2() {
List<String> lists = new ArrayList<String>();
for (int i = 0; i < 5; i++) {
lists.add("Hello Boy" + i);
}
Observable observable = Observable.fromIterable((Iterable<String>) lists);
//创建一个Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String str) {
System.out.println("---------onNext:" + str);
}
@Override
public void onError(Throwable e) {
System.out.println("---------onError----------");
}
@Override
public void onComplete() {
System.out.println("---------onComplete----------");
}
};
// 建立连接(订阅)
observable.subscribe(observer);
}
运行结果如下:
---------onNext:Hello Boy0
---------onNext:Hello Boy1
---------onNext:Hello Boy2
---------onNext:Hello Boy3
---------onNext:Hello Boy4
---------onComplete----------
1.2.2 just()
使用just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据。
通过just( )方式 直接触发onNext(),just中传递的参数将直接在Observer的onNext()方法中接收到。
代码实现如下:
@Test
public void testObservable3() {
List<String> lists = new ArrayList<String>();
for (int i = 0; i < 5; i++) {
lists.add("Hello Boy" + i);
}
Observable observable = Observable.just(lists);
//创建一个Observer
Observer<List<String>> observer = new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(d);
}
@Override
public void onNext(List<String> str) {
System.out.println("---------onNext:" + str);
}
@Override
public void onError(Throwable e) {
System.out.println("---------onError----------");
}
@Override
public void onComplete() {
System.out.println("---------onComplete----------");
}
};
// 建立连接(订阅)
observable.subscribe(observer);
}
// 运行结果如下:
Disposable:0
---------onNext:[Hello Boy0, Hello Boy1, Hello Boy2, Hello Boy3, Hello Boy4]
---------onComplete----------
1.2.3 其它
defer():当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。
interval( ):固定的时间间隔发射一个无限递增的整数序列
range( ):创建一个发射特定整数序列的Observable,接收两个参数,第一个参数是范围的起始值,第二个参数是范围的数据数目
timer( ):创建一个Observable,它在一个给定的延迟后发射一个特殊的值
repeat( ):创建一个Observable,该Observable的事件可以重复调用。
2、RxJava中的操作符
2.1、map()操作符
map 操作符是可以将返回的数据变换成别的数据。一般对服务器端返回结果处理
具体看代码实现效果:
@Test
public void testObservable4() {
List<String> lists = new ArrayList<String>();
for (int i = 0; i < 5; i++) {
lists.add("Hello Boy" + i);
}
Observable observable = Observable.fromIterable((Iterable<String>) lists);
//创建一个Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Disposable:" + d);
}
@Override
public void onNext(String str) {
System.out.println("---------onNext:" + str);
}
@Override
public void onError(Throwable e) {
System.out.println("---------onError----------");
}
@Override
public void onComplete() {
System.out.println("---------onComplete----------");
}
};
Function fun1 = new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
// 这个可以对返回的数据格式进行修改(比如我在每个前面加入"zyzhang")
return "zyzhang:" + s;
}
};
// 建立连接(订阅)
observable.map(fun1).subscribe(observer);
}
// 运行结果如下:
---------onNext:zyzhang:Hello Boy0
---------onNext:zyzhang:Hello Boy1
---------onNext:zyzhang:Hello Boy2
---------onNext:zyzhang:Hello Boy3
---------onNext:zyzhang:Hello Boy4
---------onComplete----------
2.2、flatMap()操作符
对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想要的数据形式。
比如有如下两个实体类:
class Student {
private String name;//姓名
private List<Course> coursesList;//所修的课程
...
}
class Course {
private String name;//课程名
private String id;
...
}
我们需要学生所修的课程名(课程可以由多个)。如果直接用map实现的话,很简单得到学生的课程列表,然后一个for循环打印学生的课程信息。篇幅关系不再举例。
flatMap实现如下:
@Test
public void testObservable5() {
List<Student> students = new ArrayList<Student>();
for (int i = 0; i < 5; i++) {
Student student = new Student();
student.setName("张三" + i);
List<Course> courses = new ArrayList<Course>();
for (int j = 0; j < 2; j++) {
Course course = new Course();
course.setName("数学" + j);
courses.add(course);
student.setCoursesList(courses);
}
students.add(student);
}
Observable observable = Observable.fromIterable(students);
//创建一个Observer
Observer<Course> observer = new Observer<Course>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(d);
}
@Override
public void onNext(Course course) {
System.out.println("---------onNext:" + course.getName());
}
@Override
public void onError(Throwable e) {
System.out.println("---------onError----------");
}
@Override
public void onComplete() {
System.out.println("---------onComplete----------");
}
};
Function fun1 = new Function<Student, Observable<Course>>() {
@Override
public Observable<Course> apply(Student student) throws Exception {
System.out.println(student.getName());
return Observable.fromIterable(student.getCoursesList());
}
};
// 建立连接(订阅)
observable.flatMap(fun1).subscribe(observer);
}
// 运行结果如下:
张三0
---------onNext:数学0
---------onNext:数学1
张三1
---------onNext:数学0
---------onNext:数学1
张三2
---------onNext:数学0
---------onNext:数学1
张三3
---------onNext:数学0
---------onNext:数学1
张三4
---------onNext:数学0
---------onNext:数学1
---------onComplete----------
2.3、filter()操作符
集合进行过滤
2.4、take()操作符
取出集合中的前几个
2.5、doOnNext()操作符
此操作符可以在消费者也就是观察者 接收到数据之前做事
3、compose()操作符
4、subscribeOn() 和observeOn()
对线程进行控制,subscribeOn(): 指定Observable(被观察者)所在的线程,或者叫做事件产生的线程。observeOn(): 指定 Observer(观察者)所运行在的线程,或者叫做事件消费的线程。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d("所在的线程:",Thread.currentThread().getName());
Log.d("发送的数据:", 1+"");
e.onNext(1);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("所在的线程:",Thread.currentThread().getName());
Log.d("接收到的数据:", "integer:" + integer);
}
});
// 运行结果
// 所在的线程:RxCachedThreadScheduler-1
// 发送的数据:1
// 所在的线程: main
// 接收到的数据:integer:1
可以看到,Observable(被观察者)发送事件的线程的确改变了,而Observer(观察者)仍然在主线程中接收事件。由此我们实现了线程调度的操作,可以在此基础上尽情的进行异步操作。
网友评论