package com.zhang.rxjavademo1;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
/**
* RxJava入门
*/
public class MainActivity extends AppCompatActivity {
public static final String TAG = "MainActivity";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
RxJava01();
RxJava02();
RxJava03();
RxJava04();
RxJava05();
RxJavaJiChuCaoZuoFu();
}
/**
* RxJava的基础操作符
*/
private void RxJavaJiChuCaoZuoFu() {
RxJavaCreate();
RxJavaCreateUse();
RxJavaQuitCreateJust();
RxJavaQuitCreateFromArray();
RxJavaQuitCreateFromIterable();
RxJavaCreateDefer();
RxJavaCreateTimer();
// RxJavaCreateInterval();
RxJavaCreateIntervalRange();
}
/**
* 快速创建一个被观察者对象
*
* intervalRange()
* 快速创建1个被观察者对象(Observable)
* 发送事件的特点:每隔指定时间 就发送 事件,可指定发送的数据的数量
*
* 发送的事件序列 = 从0开始、无限递增1的的整数序列
* 作用类似于interval(),但可指定发送的数据的数量
*
*/
private void RxJavaCreateIntervalRange() {
log("==========================RxJavaCreateIntervalRange分割start===============================");
// 参数说明:
Observable.intervalRange(
3, //事件序列起始点
10, //事件数量
2, //第1次事件延迟发送时间
1, //间隔时间数字
TimeUnit.SECONDS //时间单位
)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
log("onSubscribe连接");
}
@Override
public void onNext(Long aLong) {
log("接收到的事件:" + aLong);
}
@Override
public void onError(Throwable e) {
log("响应error");
}
@Override
public void onComplete() {
log("响应onComplete");
}
});
//结果
// ==========================RxJavaCreateIntervalRange分割start===============================
// onSubscribe连接
// ==========================RxJavaCreateIntervalRange分割end===============================
// 接收到的事件:0
// 响应onComplete
// 接收到的事件:3
// 接收到的事件:4
// 接收到的事件:5
// 接收到的事件:6
log("==========================RxJavaCreateIntervalRange分割end===============================");
}
/**
* 快速创建一个被观察者对象
*
* interval()作用
* 快速创建1个被观察者对象(Observable)
* 发送事件的特点:每隔指定时间 就发送 事件
*
* 发送的事件序列 = 从0开始、无限递增1的的整数序列
*
*/
private void RxJavaCreateInterval() {
log("==========================RxJavaCreateInterval分割start===============================");
// 参数说明:
// 参数1 = 第1次延迟时间;
// 参数2 = 间隔时间数字;
// 参数3 = 时间单位;
Observable.interval(3,1,TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
log("采用onSubscribe连接");
}
@Override
public void onNext(Long aLong) {
log("接收到的事件:" + aLong);
}
@Override
public void onError(Throwable e) {
log("响应error");
}
@Override
public void onComplete() {
log("响应onComplete");
}
});
//结果 每过三秒接收一个事件
// ==========================RxJavaCreateInterval分割start===============================
// 采用onSubscribe连接
// ==========================RxJavaCreateInterval分割end===============================
// 接收到的事件:0
// 响应onComplete
// 接收到的事件:0
// 接收到的事件:1
// 接收到的事件:2
// 接收到的事件:3
// 接收到的事件:4
log("==========================RxJavaCreateInterval分割end===============================");
}
/**
* 快速创建一个被观察者对象
*
* timer()作用
* 快速创建1个被观察者对象(Observable)
* 发送事件的特点:延迟指定时间后,发送1个数值0(Long类型)
*
* 本质 = 延迟指定时间后,调用一次 onNext(0)
*
* 注:timer操作符默认运行在一个新线程上
* 也可自定义线程调度器(第3个参数):timer(long,TimeUnit,Scheduler)
*
*/
private void RxJavaCreateTimer() {
// 该例子 = 延迟2s后,发送一个long类型数值
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
log("采用onSubscribe连接");
}
@Override
public void onNext(Long aLong) {
log("接收到的事件:" + aLong);
}
@Override
public void onError(Throwable e) {
log("响应onError");
}
@Override
public void onComplete() {
log("响应onComplete");
}
});
//结果
// 采用onSubscribe连接
// 接收到的事件:0
// 响应onComplete
log();
}
/**
* 延迟创建被观察者对象
*
* defer()作用
* 直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件
*
* 通过 Observable工厂方法创建被观察者对象(Observable)
* 每次订阅后,都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的
*
* 应用场景
* 动态创建被观察者对象(Observable) & 获取最新的Observable对象数据
*
*/
//1. 第1次对i赋值
Integer i = 10;
private void RxJavaCreateDefer() {
// 2. 通过defer 定义被观察者对象
// 注:此时被观察者对象还没创建
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(i);
}
});
//第2次对i赋值
i = 15;
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
log("开始采用onSubscribe连接");
}
@Override
public void onNext(Integer integer) {
log("接收到的值:" + integer);
}
@Override
public void onError(Throwable e) {
log("响应onError事件");
}
@Override
public void onComplete() {
log("响应onComplete事件");
}
});
//结果
// 开始采用onSubscribe连接
// 接收到的值:15
// 响应onComplete事件
//因为是在订阅时才创建,所以i值会取第2次的赋值
log();
}
/**
* 快速创建被观察者对象
* <p>
* fromIterable()作用
* 快速创建1个被观察者对象(Observable)
* 发送事件的特点:直接发送 传入的集合List数据
* <p>
* 会将数组中的数据转换为Observable对象
* <p>
* 应用场景
* 快速创建 被观察者对象(Observable) & 发送10个以上事件(集合形式)
* 集合元素遍历
*/
private void RxJavaQuitCreateFromIterable() {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 4; i++) {
list.add(i);
}
// 2. 通过fromIterable()将集合中的对象 / 数据发送出去
Observable.fromIterable(list)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
log("开始采用subscribe()连接");
}
@Override
public void onNext(Integer integer) {
log("收到了事件:" + integer);
}
@Override
public void onError(Throwable e) {
log("对Error事件作出响应");
}
@Override
public void onComplete() {
log("对onComplete事件作出响应");
}
});
//结果:
// 开始采用subscribe()连接
// 收到了事件:0
// 收到了事件:1
// 收到了事件:2
// 收到了事件:3
// 对onComplete事件作出响应
log();
//遍历集合
Observable.fromIterable(list)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
log("开始采用subscribe()连接");
}
@Override
public void onNext(Integer integer) {
log("集合中的元素:" + integer);
}
@Override
public void onError(Throwable e) {
log("对Error事件作出响应");
}
@Override
public void onComplete() {
log("遍历完成");
}
});
//结果
// 开始采用subscribe()连接
// 集合中的元素:0
// 集合中的元素:1
// 集合中的元素:2
// 集合中的元素:3
// 遍历完成
log();
}
/**
* 快速创建被观察者对象
* <p>
* fromArray()作用
* 快速创建1个被观察者对象(Observable)
* 发送事件的特点:直接发送 传入的数组数据
* <p>
* 注 :会将数组中的数据转换为Observable对象
* <p>
* 应用场景
* <p>
* 快速创建 被观察者对象(Observable) & 发送10个以上事件(数组形式)
* 数组元素遍历
*/
private void RxJavaQuitCreateFromArray() {
// 1. 设置需要传入的数组
Integer[] items = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
// 2. 创建被观察者对象(Observable)时传入数组
// 在创建后就会将该数组转换成Observable & 发送该对象中的所有数据
Observable.fromArray(items)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
log("开始采用subscribe()连接");
}
@Override
public void onNext(Integer integer) {
log("收到了事件:" + integer);
}
@Override
public void onError(Throwable e) {
log("对Error事件作出响应");
}
@Override
public void onComplete() {
log("对Complete事件作出响应");
}
});
//结果
// 开始采用subscribe()连接
// 收到了事件:0
// 收到了事件:1
// 收到了事件:2
// 收到了事件:3
// 收到了事件:4
// 收到了事件:5
// 收到了事件:6
// 收到了事件:7
// 收到了事件:8
// 收到了事件:9
// 收到了事件:10
// 收到了事件:11
// 对Complete事件作出响应
log();
//遍历数组
Observable.fromArray(items)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
log("遍历数组");
}
@Override
public void onNext(Integer integer) {
log("数组中的元素:" + integer);
}
@Override
public void onError(Throwable e) {
log("对Error事件作出响应");
}
@Override
public void onComplete() {
log("遍历结束");
}
});
//结果
// 遍历数组
// 数组中的元素:0
// 数组中的元素:1
// 数组中的元素:2
// 数组中的元素:3
// 数组中的元素:4
// 数组中的元素:5
// 数组中的元素:6
// 数组中的元素:7
// 数组中的元素:8
// 数组中的元素:9
// 数组中的元素:10
// 数组中的元素:11
// 遍历结束
log();
}
/**
* 快速创建被观察者对象
* just()作用
* 快速创建1个被观察者对象(Observable)
* 发送事件的特点:直接发送 传入的事件
* 注:最多只能发送10个参数
* <p>
* 应用场景
* 快速创建 被观察者对象(Observable) & 发送10个以下事件
*/
private void RxJavaQuitCreateJust() {
// 1. 创建时传入整型1、2、3、4
// 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
Observable.just(1, 2, 3, 4)
// 至此,一个Observable对象创建完毕,以下步骤仅为展示一个完整demo,可以忽略
// 2. 通过通过订阅(subscribe)连接观察者和被观察者
// 3. 创建观察者 & 定义响应事件的行为
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
log("开始采用subscribe()连接");
}
@Override
public void onNext(Integer integer) {
log("收到了事件:" + integer);
}
@Override
public void onError(Throwable e) {
log("对Error事件作出响应");
}
@Override
public void onComplete() {
log("对Complete事件作出响应");
}
});
//结果
// 开始采用subscribe()连接
// 收到了事件:1
// 收到了事件:2
// 收到了事件:3
// 收到了事件:4
// 对Complete事件作出响应
log();
}
/**
* RxJava基础操作符-create操作符
*/
private void RxJavaCreate() {
//1,通过create()创建被观察者对象Observable
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
// 传入参数: OnSubscribe 对象
// 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
// 即观察者会依次调用对应事件的复写方法从而响应事件
// 从而实现由被观察者向观察者的事件传递 & 被观察者调用了观察者的回调方法 ,即观察者模式
/**
* 在重写的方法中定义要发送的事件
* @param emitter
* @throws Exception
*/
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
//通过ObservableEmitter对象产生发送事件
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
//至此 一个完整的被观察者对象就创建完毕
log();
}
/**
* 在具体使用的时候,一般采用链式调用来创建
*/
private void RxJavaCreateUse() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
log("开始采用subscribe()连接");
}
//默认最先调用重写的onSubscribe()
@Override
public void onNext(Integer integer) {
log("收到了事件:" + integer);
}
@Override
public void onError(Throwable e) {
log("对Error事件作出响应");
}
@Override
public void onComplete() {
log("对Complete事件作出响应");
}
});
//程序执行的结果
// 开始采用subscribe()连接
// 收到了事件:1
// 收到了事件:2
// 收到了事件:3
// 对Complete事件作出响应
log();
}
private void RxJava05() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
log("上游 发射--a");
emitter.onNext("上游 发射--a");
log("上游 发射--b");
emitter.onNext("上游 发射--b");
log("上游 发射--c");
emitter.onNext("上游 发射--c");
log("上游 发射--d");
emitter.onNext("上游 发射--d");
}
})
.subscribe(new Observer<String>() {
Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
log("下游 onSubscribe");
}
@Override
public void onNext(String s) {
if (s.equals("上游 发射--c")) {
mDisposable.dispose();
}
log("下游 onNext :" + s);
}
@Override
public void onError(Throwable e) {
log("下游 onError");
}
@Override
public void onComplete() {
log("下游 onComplete");
}
});
//程序执行结果
// 2019-04-29 22:48:39.534 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 下游 onSubscribe
// 2019-04-29 22:48:39.534 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--a
// 2019-04-29 22:48:39.537 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--a
// 2019-04-29 22:48:39.537 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--b
// 2019-04-29 22:48:39.537 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--b
// 2019-04-29 22:48:39.537 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--c
// 2019-04-29 22:48:39.537 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--c
// 2019-04-29 22:48:39.537 11100-11100/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--d
log();
}
private void RxJava04() {
//创建上游Observable
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
log("上游 发射--a");
emitter.onNext("上游 发射--a");
log("上游 发射--b");
emitter.onNext("上游 发射--b");
log("上游 发射--c");
emitter.onNext("上游 发射--c");
log("上游 发射--d");
emitter.onNext("上游 发射--d");
}
});
//创建下游Observer
Observer<String> observer = new Observer<String>() {
Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
log("下游 onSubscribe");
}
@Override
public void onNext(String s) {
if (s.equals("上游 发射--c")) {
mDisposable.dispose();
}
log("下游 onNext :" + s);
}
@Override
public void onError(Throwable e) {
log("下游 onError");
}
@Override
public void onComplete() {
log("下游 onComplete");
}
};
//连接上下游
observable.subscribe(observer);
//程序执行结果
// 2019-04-29 22:45:20.821 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 下游 onSubscribe
// 2019-04-29 22:45:20.821 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--a
// 2019-04-29 22:45:20.823 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--a
// 2019-04-29 22:45:20.823 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--b
// 2019-04-29 22:45:20.823 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--b
// 2019-04-29 22:45:20.823 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--c
// 2019-04-29 22:45:20.824 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--c
// 2019-04-29 22:45:20.824 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--d
log();
}
private void RxJava03() {
//创建上游Observable
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
log("上游 发射--a");
emitter.onNext("上游 发射--a");
log("上游 发射--b");
emitter.onNext("上游 发射--b");
emitter.onError(new NullPointerException());
log("上游 发射--c");
emitter.onNext("上游 发射--c");
log("上游 发射--d");
emitter.onNext("上游 发射--d");
}
});
//创建下游Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
log("下游 onSubscribe");
}
@Override
public void onNext(String s) {
log("下游 onNext :" + s);
}
@Override
public void onError(Throwable e) {
log("下游 onError");
}
@Override
public void onComplete() {
log("下游 onComplete");
}
};
//连接上下游
observable.subscribe(observer);
//程序执行结果
// 2019-04-29 22:40:52.290 10822-10822/? E/MainActivity: 下游 onSubscribe
// 2019-04-29 22:40:52.290 10822-10822/? E/MainActivity: 上游 发射--a
// 2019-04-29 22:40:52.292 10822-10822/? E/MainActivity: 下游 onNext :上游 发射--a
// 2019-04-29 22:40:52.292 10822-10822/? E/MainActivity: 上游 发射--b
// 2019-04-29 22:40:52.292 10822-10822/? E/MainActivity: 下游 onNext :上游 发射--b
// 2019-04-29 22:40:52.292 10822-10822/? E/MainActivity: 下游 onError
// 2019-04-29 22:40:52.293 10822-10822/? E/MainActivity: 上游 发射--c
// 2019-04-29 22:40:52.293 10822-10822/? E/MainActivity: 上游 发射--d
log();
}
private void RxJava02() {
//创建上游Observable
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
log("上游 发射--a");
emitter.onNext("上游 发射--a");
log("上游 发射--b");
emitter.onNext("上游 发射--b");
emitter.onComplete();
log("上游 发射--c");
emitter.onNext("上游 发射--c");
log("上游 发射--d");
emitter.onNext("上游 发射--d");
}
});
//创建下游Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
log("下游 onSubscribe");
}
@Override
public void onNext(String s) {
log("下游 onNext :" + s);
}
@Override
public void onError(Throwable e) {
log("下游 onError");
}
@Override
public void onComplete() {
log("下游 onComplete");
}
};
//连接上下游
observable.subscribe(observer);
//程序执行结果
// 2019-04-29 22:34:16.577 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 下游 onSubscribe
// 2019-04-29 22:34:16.577 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--a
// 2019-04-29 22:34:16.580 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--a
// 2019-04-29 22:34:16.580 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--b
// 2019-04-29 22:34:16.580 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--b
// 2019-04-29 22:34:16.582 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 下游 onComplete
// 2019-04-29 22:34:16.582 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--c
// 2019-04-29 22:34:16.582 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--d
log();
}
private void RxJava01() {
//创建上游的Observable
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello");
emitter.onNext("World");
emitter.onNext("!!!!!!");
emitter.onComplete();
}
});
//创建下游的Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError");
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete");
}
};
//连接上下游
observable.subscribe(observer);
//程序执行结果
// 2019-04-29 22:23:12.222 10151-10151/com.zhang.rxjavademo1 E/MainActivity: onSubscribe
// 2019-04-29 22:23:12.224 10151-10151/com.zhang.rxjavademo1 E/MainActivity: onNext : Hello
// 2019-04-29 22:23:12.224 10151-10151/com.zhang.rxjavademo1 E/MainActivity: onNext : World
// 2019-04-29 22:23:12.224 10151-10151/com.zhang.rxjavademo1 E/MainActivity: onNext : !!!!!!
// 2019-04-29 22:23:12.224 10151-10151/com.zhang.rxjavademo1 E/MainActivity: onComplete
log();
}
private void log(String msg) {
Log.e(TAG, msg);
}
private void log() {
Log.e(TAG, "----------------------------------华丽的分割线------------------------------------");
}
}
网友评论