概念
函数式编程就是一种编程范式,常见的编程范式有命令式编程 函数式编程 和逻辑式编程。。。常见的面向对象编程是一种命令式编程。。
响应式编程是一种面向数据流和变化传播的编程范式
RxJava 响应式的函数编程,采用的是观察者模式
观察者模式
观察者模式又被称之为发布-订阅模式,定义 对象间一对多的依赖关系,每当一个对象改变状态时,则所有依赖于塔的对象都会得到通知并被自动更新。
22222.png
在观察者模式中有如下角色。
• Subject:抽象主题(抽象被观察者)。抽象主题角色把所有观察者对象保存在一个集合里,每个主题 都可以有任意数量的观察者。抽象主题提供一个接口,可以增加和删除观察者对象。
• ConcreteSubject:具体主题(具体被观察者)。该角色将有关状态存入具体观察者对象,在具体主题 的内部状态发生改变时,给所有注册过的观察者发送通知。
• Observer:抽象观察者,是观察者的抽象类。它定义了一个更新接口,使得在得到主题更改通知时更 新自己。
• ConcrereObserver:具体观察者,实现抽象观察者定义的更新接口,以便在得到主题更改通知时更新 自身的状态。
观察者模式demo
公众号 订阅为例
抽象观察者
/**
* @author 付影影
* @desc 抽象观察者
* @date 2019/10/29
*/
public interface Observer {
/**
* 更新数据
* @param message
*/
public void update(String message);
}
具体观察者
/**
* @author 付影影
* @desc 具体观察者
* @date 2019/10/29
*/
public class WexinUser implements Observer {
private String userName;
public WexinUser(String userName) {
this.userName = userName;
}
@Override
public void update(String message) {
System.out.println(userName + "---" + message);
}
}
抽象被观察者
/**
* @author 付影影
* @desc 抽象 被观察者
* @date 2019/10/29
*/
public interface Subject {
/**
* 增加 订阅者
* @param observer
*/
public void attach(Observer observer);
/**
* 删除 订阅者
* @param observer
*/
public void detach(Observer observer);
/**
* 通知订阅者 更新信息
* @param message
*/
public void notify(String message);
}
具体 被观察者
/**
* @author 付影影
* @desc 具体 被观察者
* @date 2019/10/29
*/
public class SubscriptionSubject implements Subject {
private List<Observer> weixinUserList = new ArrayList<>();
@Override
public void attach(Observer observer) {
if (weixinUserList.indexOf(observer) == -1) {
weixinUserList.add(observer);
}
}
@Override
public void detach(Observer observer) {
if (weixinUserList.indexOf(observer) != -1) {
weixinUserList.remove(observer);
}
}
@Override
public void notify(String message) {
for (Observer observer : weixinUserList) {
observer.update(message);
}
}
}
客户端调用
/**
* @author 付影影
* @desc 客户端调用
* @date 2019/10/29
*/
public class Client {
public static void main(String[] args) {
Subject subscriptionSubject = new SubscriptionSubject();
//创建微信用户
WexinUser user1 = new WexinUser("付小影");
WexinUser user2 = new WexinUser("付小影子2");
WexinUser user3 = new WexinUser("付小影子3");
WexinUser user4 = new WexinUser("付小影子4");
WexinUser user5 = new WexinUser("付小影子5");
//订阅
subscriptionSubject.attach(user1);
subscriptionSubject.attach(user2);
subscriptionSubject.attach(user3);
subscriptionSubject.attach(user4);
subscriptionSubject.attach(user5);
//更新 数据 发送通知
subscriptionSubject.notify("hello 付小影子,不要纠结");
}
}
33333.png
RxJava的基本用法
1.创建Observer(观察者) 它决定事件触发的时候将有怎样的行为
public abstract class Subscriber<T> implements Observer<T>, Subscription
public interface Observer<T> {
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onCompleted();
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onCompleted}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(Throwable e);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(T t);
}
其中onCompleted、onError和onNext是必须要实现的方法,其含义如下。
• onCompleted:事件队列完结。RxJava不仅把每个事件单独处理,其还会把它们看作一个队列。当不 会再有新的 onNext发出时,需要触发 onCompleted()方法作为完成标志。
• onError:事件队列异常。在事件处理过程中出现异常时,onError()会被触发,同时队列自动终 止,不允许再有事件发出。
• onNext:普通的事件。将要处理的事件添加到事件队列中。
• onStart:它会在事件还未发送之前被调用,可以用于做一些准备工作。例如数据的清零或重置。这是 一个可选方法,默认情况下它的实现为空
2.创建 Observable(被观察者)
3.订阅
//1. 创建观察者对象
Subscriber<String> mSubscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onNext(String s) {
System.out.println("hello -" + s);
}
@Override
public void onStart() {
super.onStart();
}
};
//2. 创建被观察者
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
subscriber.onNext("shadow");
subscriber.onCompleted();
}
});
//3. 订阅
observable.subscribe(mSubscriber);
RxJava 操作符
RxJava操作符的类型分为创建操作符、变换操作符、过滤操作符、组合操作符、错误处理操作符、辅 助操作符、条件和布尔操作符、算术和聚合操作符及连接操作符
创建操作符
create、just、from、defer、range、interval、start、repeat和timer
Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onNext("3");
subscriber.onCompleted();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d("hh", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d("hh", "onError");
}
@Override
public void onNext(String s) {
Log.d("hh", "onNext -- " + s);
}
});
String[] nameArray = new String[]{"1", "2", "4"};
Observable.from(nameArray).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("hh", "call -- " + s);
}
});
Observable.just(1, 2, 3, 4, 5).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("hh", integer.toString());
}
});
Observable.interval(2, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.d("hh", "每隔2秒调用一次");
}
});
Observable.range(0, 5).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("hh", "发送指定范围的整数序列");
}
});
Observable.range(0, 3)
.repeat(5)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("hh", "重复五次 发送 指定范围的整数序列 ,0,1,2,3,4");
}
});
变换 操作符
变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发射 出去。
变换操作符有map、flatMap、concatMap、switchMap、flatMapIterable、buffer、groupBy、cast、 window、scan
//map操作符通过指定一个Func对象,将Observable转换为一个新的Observable对象并发射,观察者将收 到新的Observable处理
Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onNext("3");
subscriber.onCompleted();
}
}).map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
//进行数据变换
return Integer.valueOf(s);
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("hh", "我是变换后的数据");
}
});
//flatMap操作符将Observable发射的数据集合变换为Observable集合,然后将这些Observable发射的数据 平坦化地放进一个单独的 Observable
//flatMap的合并允许交叉,也就是说可能会交错地发送事件,最终结果的顺序可能并不是原始 Observable发送时的顺序
//concatMap操作符功能与flatMap操作符一致;不过,它解决了flatMap交叉问题,提供了一种能够把发射 的值连续在一起的函数,而不是合并它们
String[] nameArray = new String[]{"1", "2", "4"};
Observable.from(nameArray).flatMap(new Func1<String, Observable<Integer>>() {
@Override
public Observable<Integer> call(String s) {
return Observable.just(Integer.valueOf(s));
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer s) {
Log.d("hh", "call -- " + s);
}
});
//flatMapIterable操作符可以将数据包装成Iterable,在Iterable中我们就可以对数据进行处理了
Observable.just(1, 2, 3, 4, 5)
.flatMapIterable(new Func1<Integer, Iterable<String>>() {
@Override
public Iterable<String> call(Integer integer) {
List<String> mStrings = new ArrayList<>();
mStrings.add(String.valueOf(integer + 1));
return mStrings;
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
// 2,3,4,5,6
Log.d("hh", s);
}
});
//buffer操作符将源Observable变换为一个新的Observable,这个新的Observable每次发射一组列表值而不 是一个一个发射
Observable.just(1, 2, 3, 4, 5)
.buffer(3)
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
//输出 1,2,3
Log.d("hh", "buffer - " + integers);
}
});
过滤操作符
过滤操作符用于过滤和选择Observable发射的数据序列,让Observable只返回满足我们条件的数据。
过 滤操作符有filter、elementAt、distinct、skip、take、skipLast、takeLast、ignoreElements、throttleFirst、 sample、debounce和throttleWithTimeout等
//filter操作符是对源Observable产生的结果自定义规则进行过滤,只有满足条件的结果才会提交给订阅 者
Observable.just(1, 2, 3, 4, 5)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer > 2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
//输出 3,4,5
Log.d("hh", "filter -- " + integer);
}
});
//elementAt操作符用来返回指定位置的数据
Observable.just(1, 2, 3, 4, 5)
.elementAt(3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
//输出 4
Log.d("hh", "elementAt - " + integer);
}
});
//distinct 操作符用来去重,其只允许还没有发射过的数据项通过
Observable.just(1, 2, 3, 3, 2, 1, 4)
.distinct()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
//输出 1,2,3,4
Log.d("hh", "distinct -- " + integer);
}
});
//skip操作符将源Observable发射的数据过滤掉前n项;而take操作符则只取前n项;
// 另外还有skipLast和 takeLast操作符,则是从后面进行过滤操作
Observable.just(1, 2, 3, 4, 5, 6, 7)
.skip(3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
//输出 4,5,6,7
Log.d("hh", "skip -- " + integer);
}
});
组合操作符
组合操作符可以同时处理多个Observable来创建我们所需要的Observable。组合操作符有startWith、 merge、concat、zip、combineLastest、join和switch等
//startWith操作符会在源Observable发射的数据前面插上一些数据
Observable.just(1, 2, 3, 4)
.startWith(6, 7, 8)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
// 输出 6,7,8,1,2,3,4
Log.d("hh", "startWith -- " + integer);
}
});
//merge操作符将多个Observable合并到一个Observable中进行发射,merge可能会让合并的Observable发射 的数据交错。
Observable observable1 = Observable.just(1, 2, 3);
Observable observable2 = Observable.just(4, 5, 6);
Observable.merge(observable1, observable2)
.subscribe(new Action1() {
@Override
public void call(Object o) {
//输出结果不确定 1,2,3,4,5,6 或者 4,5,6,1,2,3
Log.d("hh", "merge -- " + o);
}
});
//concat 将多个 Obserbavle 发射的数据进行合并发射。
// concat 严格按照顺序发射数据,前一个Observable没发射 完成是不会发射后一个Observable的数据的
Observable.concat(observable1, observable2)
.subscribe(new Action1() {
@Override
public void call(Object o) {
//输出 1,2,3,4,5,6
Log.d("hh", "merge -- " + o);
}
});
//zip操作符合并两个或者多个Observable发射出的数据项,根据指定的函数变换它们,并发射一个新值。
Observable observable3 = Observable.just("a", "b", "v");
Observable.zip(observable1, observable3, new Func2<Integer, String, String>() {
@Override
public String call(Integer integer, String s) {
return integer + s;
}
}).subscribe(new Action1() {
@Override
public void call(Object o) {
// 输出1a 2b 3v
Log.d("hh", "zip -- " + o);
}
});
辅助操作符
辅助操作符可以帮助我们更加方便地处理 Observable。辅助操作符包括 delay、DO、subscribeOn、 observeOn、timeout、materialize、dematerialize、timeInterval、timestamp和to等。在这里介绍delay、Do、 subscribeOn、observeOn和timeout。
Do系列操作符就是为原始Observable的生命周期事件注册一个回调,当Observable的某个事件发生时就 会调用这些回调。RxJava中有很多Do系列操作符,如下所示。
• doOnEach:为 Observable注册这样一个回调,当Observable每发射一项数据时就会调用它一次,包括 onNext、onError和 onCompleted。
• doOnNext:只有执行onNext的时候会被调用。
• doOnSubscribe:当观察者订阅Observable时就会被调用。
• doOnUnsubscribe:当观察者取消订阅Observable时就会被调用;Observable通过onError或者 onCompleted结束时,会取消订阅所有的Subscriber。
• doOnCompleted:当Observable 正常终止调用onCompleted时会被调用。
• doOnError:当Observable 异常终止调用onError时会被调用。
• doOnTerminate:当Observable 终止(无论是正常终止还是异常终止)之前会被调用。
• finallyDo:当Observable 终止(无论是正常终止还是异常终止)之后会被调用。
subscribeOn、observeOn subscribeOn操作符用于指定Observable自身在哪个线程上运行。如果Observable需要执行耗时操作,一 般可以让其在新开的一个子线程上运行。observerOn用来指定Observer所运行的线程,也就是发射出的数据 在哪个线程上使用。一般情况下会指定在主线程中运行,这样就可以修改UI
//timeout 如果原始 Observable 过了指定的一段时长没有发射任何数据,
// timeout 操作符会以一个onError通知终止 这个Observable,或者继续执行一个备用的Observable
Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onNext("2");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext("3");
subscriber.onCompleted();
}
//如果Observable在3s这段时长没有发射数据,就会切换到Observable.just("1", "2")
}).timeout(3, TimeUnit.SECONDS, Observable.just("1", "2"))
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
// 输出结果 1,2,1,2
Log.d("hh", "timeout---" + s);
}
});
错误操作符
RxJava在错误出现的时候就会调用Subscriber的onError方法将错误分发出去,由Subscriber自己来处理错 误。但是如果每个 Subscriber 都处理一遍的话,工作量就有点大了,这时候可以使用错误处理操作符。
错误 处理操作符有catch和 retry。
1.catch
catch操作符拦截原始Observable的onError通知,将它替换为其他数据项或数据序列,让产生的 Observable能够正常终止或者根本不终止。RxJava将catch实现为以下 3个不同的操作符。
• onErrorReturn:Observable遇到错误时返回原有Observable行为的备用Observable,备用Observable会忽 略原有Observable的onError调用,不会将错误传递给观察者。作为替代,它会发射一个特殊的项并调用观察 者的onCompleted方法。
• onErrorResumeNext:Observable遇到错误时返回原有Observable行为的备用Observable,备用 Observable会忽略原有Observable的onError调用,不会将错误传递给观察者。作为替代,它会发射备用 Observable的数据。
• onExceptionResumeNext:它和onErrorResumeNext类似。不同的是,如果onError收到的Throwable不是 一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable。
Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onError(new Throwable());
subscriber.onNext("3");
subscriber.onCompleted();
}
//如果Observable在3s这段时长没有发射数据,就会切换到Observable.just("1", "2")
}).onErrorReturn(new Func1<Throwable, String>() {
@Override
public String call(Throwable throwable) {
return "6";
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d("hh","onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d("hh","onError");
}
@Override
public void onNext(String s) {
Log.d("hh","onNext -- "+s);
}
});
//运行结果为 onNext -- 1,onNext --2,onNext --6,onCompleted
retry操作符不会将原始Observable的onError通知传递给观察者,它会订阅这个Observable,再给它一次 机会无错误地完成其数据序列。
retry总是传递onNext通知给观察者,由于重新订阅,这可能会造成数据项 重复。
RxJava 中的实现为retry和retryWhen。
Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onError(new Throwable());
subscriber.onNext("2");
subscriber.onNext("3");
subscriber.onCompleted();
}
//如果Observable在3s这段时长没有发射数据,就会切换到Observable.just("1", "2")
}).retry(2).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d("hh", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d("hh", "onError");
}
@Override
public void onNext(String s) {
Log.d("hh", "onNext -- " + s);
}
});
//运行结果为 onNext -- 1,onNext --1,onNext --2,onError
条件操作符和布尔操作符
条件操作符和布尔操作符可用于根据条件发射或变换Observable,或者对它们做布尔运算。
布尔操作符有all、contains、isEmpty、exists和sequenceEqual
all操作符根据一个函数对源Observable发射的所有数据进行判断,最终返回的结果就是这个判断结果。 这个函数使用发射的数据作为参数,内部判断所有的数据是否满足我们定义好的判断条件。如果全部都满 足则返回true,否则就返回false。
contains 操作符用来判断源 Observable 所发射的数据是否包含某一个数据。如果包含该数据,会返回 true;
如果源Observable已经结束了却还没有发射这个数据,则返回false。
isEmpty操作符用来判断源 Observable 是否发射过数据。如果发射过该数据,就会返回 false;
如果源Observable已经结束了却还没有发 射这个数据,则返回true
Observable.just(1, 2, 3)
.all(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
Log.d("hh", "call -- " + integer);
return integer > 0;
}
}).subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {
Log.d("hh", "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d("hh", "onError");
}
@Override
public void onNext(Boolean aBoolean) {
Log.d("hh", "onNext -- " + aBoolean);
}
});
//输出 call -- 1 ,call -- 2,call -- 3,onNext -- true,onCompleted
条件操作符有amb、defaultIfEmpty、skipUntil、skipWhile、takeUntil和takeWhile等
转换操作符
转换操作符用来将 Observable 转换为另一个对象或数据结构,转换操作符有 toList、toSortedList、 toMap、toMultiMap、getIterator和nest等
//toList操作符将发射多项数据且为每一项数据调用onNext方法的Observable发射的多项数据组合成一个 List,
// 然后调用一次onNext方法传递整个列表
Observable.just(1,2,3).toList()
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
for(Integer integers1 :integers){
//输出 call -- 1,call -- 2,call -- 3
Log.d("hh","call -- "+integers1);
}
}
});
//toSortedList操作符类似于toList操作符;不同的是,它会对产生的列表排序,默认是自然升序。
// 如果发 射的数据项没有实现Comparable接口,会抛出一个异常。
// 当然,若发射的数据项没有实现Comparable接口, 可以使用toSortedList(Func2)变体,
// 其传递的函数参数Func2会作用于比较两个数据项。
Observable.just(1,4,5,2,6,3).toSortedList()
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
for(Integer integers1 :integers){
//输出 call -- 1,call -- 2,call -- 3,call -- 4,call -- 5,call -- 6
Log.d("hh","call -- "+integers1);
}
}
});
//toMap操作符收集原始Observable发射的所有数据项到一个Map(默认是HashMap),然后发射这个 Map。
// 你可以提供一个用于生成Map的key的函数,也可以提供一个函数转换数据项到Map存储的值(默认 数据项本身就是值)
RxJava的线程控制
内置的Scheduler 如果我们不指定线程,默认是在调用subscribe方法的线程上进行回调的。
如果我们想切换线程,就需要 使用Scheduler。
RxJava 已经内置了如下5个Scheduler。
• Schedulers.immediate():直接在当前线程运行,它是timeout、timeInterval和timestamp操作符的默认 调度器。
• Schedulers.newThread():总是启用新线程,并在新线程执行操作。
• Schedulers.io():I/O操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模 式和 newThread()差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的 线程,因此多数情况下 io() 比 newThread() 更有效率。
• Schedulers.computation():计算所使用的 Scheduler,例如图形的计算。这个 Scheduler使用固定线程 池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O操作的等待时间会浪费 CPU。它 是 buffer、debounce、delay、interval、sample 和 skip操作符的默认调度器。
• Schedulers.trampoline():当我们想在当前线程执行一个任务时,并不是立即时,可以 用.trampoline()将它入队。这个调度器将会处理它的队列并且按序运行队列中的每一个任务。它是repeat 和retry操作符默认的调度器。
另外,RxAndroid也提供了一个常用的Scheduler:
• AndroidSchedulers.mainThread()—RxAndroid库中提供的Scheduler,它指定的操作在主线程中运 行。
- 感觉有点累了,耐心似乎快用完了。。。找个自己喜欢的也喜欢自己的怎么这么难。。。很明显感觉到他的平淡和无感。。。
网友评论