一、函数相应式编程
RxJava的异步操作是通过扩展的观察者模式来实现的,它有4个角色:Observable、Observer、Subscriber、Suject,其中Observable和Observer通过subscribe方法实现订阅关系,Observable就可以在需要的时候通知Observer。使用时依赖:
- compile 'io.reactivex:rxandroid:1.2.1'
- compile 'io.reactivex:rxjava:1.1.6'
//创建观察者
Subscriber subscriber = new Subscriber() {
@Override
public void onCompleted() {
//事件队列完结
}
@Override
public void onError(Throwable e) {
//事件队列异常
}
@Override
public void onNext(Object o) {
//普通的事件
}
@Override
public void onStart() {
//事件发送前调用,预操作
super.onStart();
}
};
//创建最基本的观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
//创建被观察者
Observable observable = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Object o) {
//观察者与被观察者实现订阅关系
subscriber.onNext("wjx");
subscriber.onCompleted();
}
});
//just方法实现订阅关系,传入的参数是一个不定长度的数组
Observable observable = Observable.just("wjx","wjx","wjx");
//from方法实现订阅关系
String[] words ={"wjx","wjx"};
Observable observable = Observable.from(words);
//Subscribe订阅,即观察者订阅,参数是上面第一个创建的观察者
observable..subscribe(subscriber);
二、RxJava的不完整定义回调
Action0 - Action09 extends Action,参数都是泛型,不同的是参数个数,如Action0没有参数,Action09有9个参数。
//自定义Action
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
};
Action0 onCompletedAction = new Action0() {
@Override
public void call() {
}
};
//不完整回调,重载有多个方法
observable.subscribe(onNextAction,onErrorAction,onCompletedAction);
//interval,按固定时间发射整数序列的Observable
Observable.interval(3, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
// 每3秒就调用call方法
}
});
//rang,发送指定范围的整数序列的Observable
Observable.range(0,5).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
// integer = {0,1,2,3,4}
}
});
//repeat,执行N次特定数据的Observable
Observable.range(0,3)
.repeat(2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
// integer = { 0,1,2 }
// 执行6次
}
});
//flatMap、cast,将Observable发射的数据转换成Observable集合
final String Host ="http://blog.csdn.net/";
List<String> list = new ArrayList<>();
list.add("wjx1");
list.add("wjx2");
list.add("wjx3");
Observable.from(list).flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String s) {
return Observable.just(Host+s);
// s = list.get(index);
}
}).cast(String.class).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, "call: "+s);
// s = Observable.just(Host+s);
}
});
//flatMapIterable,将数据包装成Iterable
Observable.just(1,2,3).flatMapIterable(new Func1<Integer, Iterable<?>>() {
@Override
public Iterable<Integer> call(Integer integer) {
List<Integer> list = new ArrayList<>();
list.add(integer+1);
// integer = list.get(index);
return list;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
// buffer,将源Observable转换为新的Observable
Observable.just(1,2,3,4,5,6)
.buffer(3)
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
for(Integer i :integers){
// buffer.size = 3;
// i = 1,2,3;
}
}
});
// groupby
Observable<GroupedObservable<String,String>> groupedObservableObservable =
Observable.just("s1","s2","s3","s4","s5")
.groupBy(new Func1<String, String>() {
@Override
public String call(String s) {
return s;
}
});
三、过滤操作符
//filter
Observable.just(1,2,3,4)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>2;
}
});
//elementAt
Observable.just(1,2,3,4)
.elementAt(2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
//distinct
Observable.just(1,2,3,4)
.distinct()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
//skip
Observable.just(1,2,3,4)
.skip(2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
//take
Observable.just(1,2,3,4)
.take(2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
//ignoreElements
Observable.just(1,2,3,4)
.ignoreElements()
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
});
//throttleFirst
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
}
}).throttleFirst(200,TimeUnit.MILLISECONDS)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
//throttleWithTimeout
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
}
}).throttleWithTimeout(200,TimeUnit.MILLISECONDS)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
四、组合操作符
//startWith
Observable.just(3,4,5).startWith(1,2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
//merge
Observable<Integer> observable2 = Observable.just(1,2,3)
.subscribeOn(Schedulers.io());
Observable<Integer> observable3 = Observable.just(4,5,6);
Observable.merge(observable2,observable3).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
//concat
Observable<Integer> observable1 = Observable.just(1,2,3)
.subscribeOn(Schedulers.io());
Observable<Integer> observable2 = Observable.just(4,5,6);
Observable.concat(observable1,observable2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
//zip
Observable<Integer> observable1 = Observable.just(1,2,3)
.subscribeOn(Schedulers.io());
Observable<Integer> observable2 = Observable.just(4,5,6);
Observable.zip(observable1,observable2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
//zip
Observable<Integer> observable1 = Observable.just(1,2,3)
.subscribeOn(Schedulers.io());
Observable<Integer> observable2 = Observable.just(4,5,6);
Observable.zip(observable1, observable2, new Func2<Integer, Integer, Object>() {
@Override
public Object call(Integer integer, Integer integer2) {
return null;
}
}).subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
}
});
//combineLatest
Observable<Integer> observable1 = Observable.just(1,2,3)
.subscribeOn(Schedulers.io());
Observable<Integer> observable2 = Observable.just(4,5,6);
Observable.combineLatest(observable1, observable2, new Func2<Integer, Integer, Object>() {
@Override
public Object call(Integer integer, Integer integer2) {
return null;
}
}).subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
}
});
五、辅助操作符
//delay
Observable.create(new Observable.OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Long> subscriber) {
}
}).delay(2,TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
}
});
//doOnNext
Observable.just(1,2)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
});
//subscribeOn、observeOn
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
});
observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
//timeout
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
}
}).timeout(200,TimeUnit.MILLISECONDS,Observable.just(10,11));
observable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
六、错误处理操作符
//catch
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onCompleted();
}
}).onErrorReturn(new Func1<Throwable, Integer>() {
@Override
public Integer call(Throwable throwable) {
return 8;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
});
// retry
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onCompleted();
}
}).retry(2).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
}
});
七、条件操作符和布尔操作符
// all
Observable.just(1,2,3,4)
.all(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer<3;
}
}).subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Boolean aBoolean) {
}
});
// contains
Observable.just(1,2,3).contains(1)
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
}
});
// isEmpty
Observable.just(1,2,3).()
.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean aBoolean) {
}
});
// amb
Observable.amb(Observable.just(1,2,3).delay(2,TimeUnit.SECONDS),
Observable.just(4,5,6)).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
// defaultIfEmpty
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onCompleted();
}
}).defaultIfEmpty(3).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
});
八、转换操作符
// toList
Observable.just(1,2,3).toList().subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
}
});
// toSortedList
Observable.just(3,1,2).toSortedList().subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
}
});
// toMap
Observable.just("s1","s2","s3").toMap(new Func1<String, String>() {
@Override
public String call(String s) {
return s;
}
}).subscribe(new Action1<Map<String, String>>() {
@Override
public void call(Map<String, String> stringStringMap) {
}
});
网友评论