版本依赖:
implementation "io.reactivex.rxjava2:rxjava:2.2.14"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
Observable.zip批量裁剪图片示例
class Picture {
public String localPath; //本地路径
public String cutPath; //裁剪路径
public int index; //图片顺序
}
List<Picture> pictures = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Picture picture = new Picture();
picture.localPath = "www.baidu.com/picture/" + i;
pictures.add(picture);
}
Observable.zip(getObservableList(), new Function<Object[], Object>() {
@Override
public Object apply(Object[] objects) {
return "";
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
Log.e(TAG, JSON.toJSONString(pictures));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
List<Observable<Picture>> getObservableList() {
List<Observable<Picture>> observables = new ArrayList<>();
for (int i = 0; i < pictures.size(); i++) {
final int finalI = i;
Observable<Picture> observable = Observable.create(new ObservableOnSubscribe<Picture>() {
@Override
public void subscribe(ObservableEmitter<Picture> emitter) throws InterruptedException {
Thread.sleep(1000);//模拟图片裁剪耗时
Picture picture = pictures.get(finalI);
picture.cutPath = picture.localPath + "_cut";
emitter.onNext(picture);
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io());//此处不加所有执行变成串行
observables.add(observable);
}
return observables;
}
2019-11-05 10:36:35.692 30420-30420/? E/MainActivity: [{"cutPath":"www.baidu.com/picture/0_cut","localPath":"www.baidu.com/picture/0"},{"cutPath":"www.baidu.com/picture/1_cut","localPath":"www.baidu.com/picture/1"},{"cutPath":"www.baidu.com/picture/2_cut","localPath":"www.baidu.com/picture/2"}]
去除.subscribeOn(Schedulers.io())将变成串行执行,输出3次log
2019-11-05 10:48:45.571 31906-31906/com.code.layoutanalyze E/MainActivity: [{"cutPath":"www.baidu.com/picture/0_cut","localPath":"www.baidu.com/picture/0"},{"localPath":"www.baidu.com/picture/1"},{"localPath":"www.baidu.com/picture/2"}]
2019-11-05 10:48:46.516 31906-31906/com.code.layoutanalyze E/MainActivity: [{"cutPath":"www.baidu.com/picture/0_cut","localPath":"www.baidu.com/picture/0"},{"cutPath":"www.baidu.com/picture/1_cut","localPath":"www.baidu.com/picture/1"},{"localPath":"www.baidu.com/picture/2"}]
2019-11-05 10:48:47.521 31906-31906/com.code.layoutanalyze E/MainActivity: [{"cutPath":"www.baidu.com/picture/0_cut","localPath":"www.baidu.com/picture/0"},{"cutPath":"www.baidu.com/picture/1_cut","localPath":"www.baidu.com/picture/1"},{"cutPath":"www.baidu.com/picture/2_cut","localPath":"www.baidu.com/picture/2"}]
Observable.flatmap变换数据结构并上传
class Picture {
public String localPath; //本地路径
public String cutPath; //裁剪路径
public int index; //图片顺序
}
class PictureServer {
public String localPath; //本地路径
public String serverPath; //服务端路径
public int index; //图片顺序
}
private PictureServer transform(Picture picture) {
PictureServer pictureServer = new PictureServer();
pictureServer.localPath = picture.localPath;
pictureServer.index = picture.index;
return pictureServer;
}
for (int i = 0; i < 3; i++) {
Picture picture = new Picture();
picture.localPath = "www.baidu.com/picture/" + i;
picture.index = i;
pictures.add(picture);
}
Observable.fromIterable(pictures).flatMap(new Function<Picture, ObservableSource<PictureServer>>() {
@Override
public ObservableSource<PictureServer> apply(final Picture picture) throws Exception {
Observable<PictureServer> observable = Observable.create(new ObservableOnSubscribe<PictureServer>() {
@Override
public void subscribe(ObservableEmitter<PictureServer> emitter) throws Exception {
PictureServer pictureServer = transform(picture); //转换数据
Thread.sleep(1000); //模拟图片上传
pictureServer.serverPath = picture.localPath + "_server";
emitter.onNext(pictureServer);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
return observable;
}
}).toList().toObservable().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<PictureServer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<PictureServer> objects) {
Log.e(TAG, JSON.toJSONString(objects));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
2019-11-05 11:08:05.778 32476-32476/com.code.layoutanalyze E/MainActivity: [{"index":0,"localPath":"www.baidu.com/picture/0","serverPath":"www.baidu.com/picture/0_server"},{"index":1,"localPath":"www.baidu.com/picture/1","serverPath":"www.baidu.com/picture/1_server"},{"index":2,"localPath":"www.baidu.com/picture/2","serverPath":"www.baidu.com/picture/2_server"}]
将flatmap
改成concatMap
或者去除subscribeOn(Schedulers.io())将变成串行上传.
RxJava自定义线程池
在一些低配机器上,如果过多进行并发很有可能出现OOM事件,需要根据手机配置
或者内存状况
自定义并发线程数量
//自定义线程池
final Scheduler schedulers = Schedulers.from(Executors.newFixedThreadPool(2));
Observable.fromIterable(pictures).flatMap(new Function<Picture, ObservableSource<PictureServer>>() {
@Override
public ObservableSource<PictureServer> apply(final Picture picture) throws Exception {
Observable<PictureServer> observable = Observable.create(new ObservableOnSubscribe<PictureServer>() {
@Override
public void subscribe(ObservableEmitter<PictureServer> emitter) throws Exception {
PictureServer pictureServer = transform(picture); //转换数据
Thread.sleep(1000); //模拟图片上传
Log.e(TAG,picture.index+"");
pictureServer.serverPath = picture.localPath + "_server";
emitter.onNext(pictureServer);
emitter.onComplete();
}
}).subscribeOn(schedulers);//使用自定义线程池
return observable;
}
})
2019-11-05 11:20:58.804 2753-2787/com.code.layoutanalyze E/MainActivity: 0
2019-11-05 11:20:58.804 2753-2788/com.code.layoutanalyze E/MainActivity: 1
2019-11-05 11:20:59.804 2753-2788/com.code.layoutanalyze E/MainActivity: 2
2019-11-05 11:20:59.868 2753-2753/com.code.layoutanalyze E/MainActivity: [{"index":0,"localPath":"www.baidu.com/picture/0","serverPath":"www.baidu.com/picture/0_server"},{"index":1,"localPath":"www.baidu.com/picture/1","serverPath":"www.baidu.com/picture/1_server"},{"index":2,"localPath":"www.baidu.com/picture/2","serverPath":"www.baidu.com/picture/2_server"}]
网友评论