美文网首页
RxJava学习笔记

RxJava学习笔记

作者: lycknight | 来源:发表于2019-01-15 14:45 被阅读0次

    RxJava

    Rxjava的GitHub官网上是这样介绍rxjava的:RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programes by using observable sequences(RxJava是一个基于Reactive Extensions的JVM实现框架,它使用观察者模式的做法,将异步和基于事件的编程很好的结合起来),虽然是简短的一句话,但是很清晰的解释了RxJava,其中有三个关键字:观察者异步事件

    为什么要使用RxJava

    总结来说RxJava主要有以下几点优点:

    1. 异步(作用):可以自由切换线程
    2. 观察者模式(模式)
    3. 响应式编程(结构)
    4. 逻辑简单(逻辑简洁):减少回调嵌套

    响应式编程

    在某种程度上,这并不是新东西。事件总线(Event buses)或常见的单机事件就是一个异步事件流,你可以观察这个流,也可以基于这个流做一些自定义的操作。响应式就是基于这种想法。你能够创建所有事物的数据流,而不仅仅只是单击和监听事件数据流。

    简洁编程

    假设有这样一个需求:给定一个图片目录,需要将这个目录下的所有图片显示到对应的imageview空间上,由于读取图片是一个比较耗时的过程,需要新启动一个线程来获得bitmap类型的图片,然后在主线程上显示图片,我们平时的做法是:

     new Thread() {
                @Override
                public void run() {
                    super.run();
                    for (File folder : files) {
                        File[] files = folder.listFiles();
                        for (File file : files) {
                            if (file.getName().endsWith(".png")) {
                                final Bitmap bitmap = getBitmapFromFile(file);
                                MainActivity.this.runOnUiThread(new Runnable() {
                                    @Override
                                    public void run() {
                                        imageCollectorView.addImage(bitmap);
                                    }
                                });
                            }
                        }
                    }
                }
            }.start();
    
    

    但是如果是RxJava2,实现的方式是这样的:

    Observable.fromArray(files)
                    .flatMap(new Function<File, ObservableSource<File>>() {
                        @Override
                        public ObservableSource<File> apply(@NonNull File file) throws Exception {
                            return Observable.fromArray(file.listFiles());
                        }
                    })
                    .filter(new Predicate<File>() {
                        @Override
                        public boolean test(@NonNull File file) throws Exception {
                            return file.getName().endsWith(".png");
                        }
                    })
                    .map(new Func1<File, Bitmap>() {
                        @Override
                        public Bitmap call(File file) {
                            return getBitmapFromFile(file);
                        }
                    })
                    .subscribeOn(rx.schedulers.Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Object>() {
                        @Override
                        public void accept(@NonNull Object o) throws Exception {
                            if (o instanceof  Bitmap){
                                imageView.addImage(o);
                            }
                        }
                    });
    

    从代码上看,的确是RxJava的代码变多了,但是从逻辑来说,RxJava的实现是从上到下的链式调用,没有任何嵌套,这在逻辑的间接性上是具有优势的。

    基本概念

    RxJava使用的是观察者模式来实现的,观察者模式对于我们来说应该是比较熟悉的,我们在日常开发中经常使用到的,例如,setOnClickListener(listener)就是一个观察者模式的应用。

    • Observable: 被观察者
    • Observer:观察者

    基本用法

    Observable(被观察者)的创建

    • 使用create(),最基本的创建方式:
     Observable.create(new ObservableOnSubscribe<Object>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Object> e) throws Exception {
                    e.onNext("next1");//发送字符串"next1"
                    e.onNext("next2");//发送字符串"next2"
                    e.onComplete();//发送完成
                }
            });
    
    • 使用just(),将为你创建一个ObservableJust对象,并且依次发送just中的参数。
    Observable.just("just1","just2");//依次发送“just1”和“just2”
    
    • 使用fromXXX(),遍历集合,发送每个item。
    List<String> list = new ArrayList<>();
            list.add("from1");
            list.add("from2");
            list.add("from3");
            Observable.fromIterable(list);//遍历发送list的每个item。
            String[] strings = new String[]{"from1","from2","from3"};
            Observable.fromArray(strings);//遍历发送strings的每个item。
    
    • 使用defer(),有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable
     Observable.defer(new Callable<ObservableSource<?>>() {
                @Override
                public ObservableSource<?> call() throws Exception {
                    return Observable.just("defer");
                }
            });
    
    • 使用interval(),创建一个按固定时间间隔发送整数序列的Observable,类似于计数器
    Observable.interval(1, TimeUnit.SECONDS);//每个一秒发送一个数字
    
    • 使用range(),创建一个发送特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数。
    Observable.rang(20,5);//发送20,21,22,23,24
    
    • 使用timer(),创建一个Observable,它在一个给定的延迟发射一个特殊值,用做定时器。
    Observable.timer(4,TimeUnit.SECONDS);//4秒后发射一个值
    
    • 使用repeat(),创建一个重复发射特定数据的Observable
    Observable.just("repeat").repeat(3);//重复发射3次
    

    Observer(观察者)的创建

     Observer observer = new Observer() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    
                }
    
                @Override
                public void onNext(@NonNull Object o) {
    
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            };
    

    创建了Observer之后,就可以将Observable与其关联起来

    Observable.subscribe(observer);//订阅
    

    线程控制

    如果订阅时不特别指定线程,那么就会在订阅时所在的线程产生事件。如果需要切换线程,就需要用到Scheduler。

    • Schedulers.computation():计算所使用的Scheduler。这个Scheduler使用的固定的线程池,大小为CPU核数。不要把I/O操作放在computation()中,否则I/O操作的等待时间会浪费CPU
    • Schedulers.io():I/O操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThrea()差不多,区别在于io()的内部实现是一个无上限的线程池,可以用重用空闲的线程,因此多数情况下io()比newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。
    • Schedulers.newThread():总是启用新线程,并在新线程执行操作。
    • Schedulers.single():在一个固定的线程中(非主线程),保证顺序性。
    • Schedulers.trampoline():直接在当前线程中运行,相当于不指定线程。
    • AndroidSchedulers.mainThread():指定的操作将在Android主线程运行。

    有了这几个Scheduler,就可以使用subscribeOn()和observeOn()两个方法来对线程进行控制了。

    • subscribeOn():指定subscribe()所发生的线程,即Observable.OnSubscribe被激活时所处的线程。
    • observeOn():指定Subscriber所运行的线程。

    举个例子,在I/O线程发送事件,在主线程接收事件

    Observable.fromArray("from1","from2","from3")
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@NonNull String s) throws Exception {
                            Log.i("knight","S: "+ s);
                        }
                    });
    

    变换

    RxJava提供了对事件序列进行变换的支持,也就是核心的功能之一。所谓的变化,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或者事件序列。

    • map():map就是将传入的数据类型,经过一些处理,转换为你需要的数据类型,例如,将输入的类型为Int类型,需要在接受事件的时候的数据类型为String。
    Observable.fromArray(1,2,3)// 输入类型 Integer
                    .subscribeOn(Schedulers.io())
                    .map(new Function<Integer, String>() {
                        @Override
                        public String apply(@NonNull Integer s) throws Exception { // 参数类型 Integer
                            Log.i("knight","thread: "+ Thread.currentThread().getName());
                            return String.valueOf(s);// 返回类型 String
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@NonNull String s) throws Exception {// 参数类型 String
                            Log.i("knight","thread: "+ Thread.currentThread().getName());
                            Log.i("knight","S: "+ s);
                        }
                    });
    
    • flatMap():将list集合,逐个发送给观察者,flatMap()map()有一个相同点:它也是把传入的参数转化之后返回另一个对象。不同点:flatMap()中返回的是个Observable对象,并且这个Observable对象并不是被直接发送到了Observer的回调方法中。
      假如有这样一个需求:需要得到公司每个部门的每个员工的姓名,不用flatMap()
     List<Dept> depts = new ArrayList<>();
            Observable.fromIterable(depts)
                    .subscribe(new Consumer<Dept>() {
                        @Override
                        public void accept(@NonNull Dept dept) throws Exception {
                            List<String> persons = dept.getPerson();
                            for (String person : persons) {
                                System.out.print(person);
                            }
                        }
                    });
    

    使用flatMap()

    List<Dept> depts = new ArrayList<>();
            Observable.fromIterable(depts).flatMap(new Function<Dept, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(@NonNull Dept dept) throws Exception {
                    return Observable.fromIterable(dept.getPerson());
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String s) throws Exception {
                    System.out.print(s);
                }
            });
    

    相关文章

      网友评论

          本文标题:RxJava学习笔记

          本文链接:https://www.haomeiwen.com/subject/wukydqtx.html