参考链接:
http://www.jianshu.com/p/5e93c9101dc5
http://blog.csdn.net/qq_35064774/article/details/53045298
http://www.jianshu.com/p/240f1c8ebf9d
http://www.jianshu.com/p/464fa025229e
http://blog.csdn.net/qq_35064774/article/details/53057359
RxJava是一种响应式的编程。最近自己才学习下,感觉自己已经落后好多,下面是自己的简单总结。这边是对RxJava的使用介绍,对其编程思想上我这边较少介绍。因为自己对RxJava的学习较晚,对1版本,大家可以到别的博客上获取,这边主要是对2版本的介绍。
我们来大致分析下它的流程。主要分两块,上游发送数据,在下游获取数据,中间可以对数据进行操作,使用观察者模式。
我们直接上代码:
Observable<String> sender = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
System.out.println("sendOnNext1");
e.onNext("1");
System.out.println("sendOnNext2");
e.onNext("2");
System.out.println("sendOnComplete");
e.onComplete();
}
});
Observer<String> receiver = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String value) {
System.out.println("onNext" + value);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
sender.subscribe(receiver);
输出:
image.png
Observable对象其实就是我们所说的“被观察者”,Observer对象为“观察者”,我们重写subscribe方法,通过onNext方法发送数据,在下游Observer的onNext方法中获取数据。其余方法如onSubscribe,onError,onComplete方法的调用流程见输出。
这个最简单的一个流程演示了。
在2.x中,Action1被重命名为Consumer,我们在使用Observable的时候,如不需要获取其余的流程状态,下游可以用Consumer替换,由此,代码可以如下:
Observable<String> sender = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
System.out.println("sendOnNext1");
e.onNext("1");
System.out.println("sendOnNext2");
e.onNext("2");
System.out.println("sendOnComplete");
e.onComplete();
}
});
Consumer<String> receiver = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept"+s);
}
};
sender.subscribe(receiver);
输出:
image.png在到2.x时,出现一个Flowable类,它与Observable不同的是,它支持背压,但这个不意味着MissingBackpressureException不会出现。Observable完全不支持背压。
代码如下:
Flowable<String> sender = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
System.out.println("sendOnNext1");
e.onNext("1");
System.out.println("sendOnNext2");
e.onNext("2");
System.out.println("sendOnComplete");
e.onComplete();
}
}, BackpressureStrategy.BUFFER);
Subscriber<String> receiver = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("onSubscribe");
s.request(Long.MAX_VALUE); // 接收参数的数量
}
@Override
public void onNext(String s) {
System.out.println("onNext" + s);
}
@Override
public void onError(Throwable t) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
sender.subscribe(receiver);
这里Flowable为“被观察者”,Subscriber为“观察者”。
输出结果:
看起来与第一个代码十分相似。
我们在发送数据数量上如果有一定限制的话,在2.x中还有几个扩展类:Completable,Single,Maybe。
首先看Completable,代码如下:
Completable sender = Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter e) throws Exception {
System.out.println("subscribe");
}
});
CompletableObserver receiver = new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
};
sender.subscribe(receiver);
输出如下:
image.png上游不发送数据。
在看Single:
Single<String> sender = Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> e) throws Exception {
System.out.println("subscribe");
e.onSuccess("1111");
}
});
SingleObserver receiver = new SingleObserver() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onSuccess(Object value) {
System.out.println("onSuccess" + value);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
};
sender.subscribe(receiver);
输出如下:
image.png上游发送一条onSuccess数据。
最后看下Maybe:
Maybe sender = Maybe.create(new MaybeOnSubscribe() {
@Override
public void subscribe(MaybeEmitter e) throws Exception {
System.out.println("subscribe");
e.onSuccess("1111");
}
});
MaybeObserver receiver = new MaybeObserver() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onSuccess(Object value) {
System.out.println("onSuccess"+value);
}
};
sender.subscribe(receiver);
输出如下:
image.png上游可发送一条或0条数据。
通过上面的学习,我们可以看到上游与下游的工作流程。有时我们需要对上游到下游的数据进行中间操作,这边我们来演示下RxJava的链式代码:
Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
List<User> users = getUsers();
e.onNext(users);
}
}).flatMap(new Function<List<User>, ObservableSource<User>>() {
@Override
public ObservableSource<User> apply(List<User> users) throws Exception {
return Observable.fromIterable(users);
}
}).filter(new Predicate<User>() {
@Override
public boolean test(User user) throws Exception {
return user.getId().equals("2") || user.getId().equals("5");
}
}).subscribe(new Consumer<User>() {
@Override
public void accept(User user) throws Exception {
System.out.println(user.getName());
}
});
public static List<User> getUsers(){
List<User> users = new ArrayList<>();
users.add(new User("user1","1"));
users.add(new User("user2","2"));
users.add(new User("user3","3"));
users.add(new User("user4","4"));
users.add(new User("user5","5"));
return users;
}
代码输出:
image.png是不是看起来很酷炫。关于代码flatMap,filter等等方法大家可以去网上收集。
同样,我们也可以试着用Flowable方法,同时用另一种发送数据源方式,代码如下:
Flowable.just(1, 2, 3)
.take(2) //取前两个数据
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("保存:" + integer);
}
})//中间操作
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
输出如下:
image.png
just或take方法同上,可以去网上查询使用方法。
到此,是我简单的对RxJava的代码的理解,如有不当之处,还请大家多多指教。
网友评论