美文网首页Android拾萃RxJava系列教程Android知识
Android拾萃 - RxJava2之变换操作符及其demo

Android拾萃 - RxJava2之变换操作符及其demo

作者: 三也视界 | 来源:发表于2017-10-21 22:00 被阅读121次

    一、变换操作符列表

    操作符 解析
    buffer() 缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
    map() 对序列的每一项都应用一个函数来变换Observable发射的数据序列
    flatMap() , concatMap() , flatMapIterable() 将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable
    switchMap() 将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
    scan() 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值
    groupBy() 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
    buffer() 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
    window() 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项
    cast() 在发射之前强制将Observable发射的所有数据转换为指定类型

    二、变换操作符

    map 事件对象的直接变换 map()是一对一的转化

    map.png
    //事件的参数类型也由int转为Bitmap
            Observable.just(R.drawable.ic_launcher_round).map(new Function<Integer, Bitmap>() {
                @Override
                public Bitmap apply(@NonNull Integer id) throws Exception {
                    return BitmapFactory.decodeResource(getResources(), id);
                }
            })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Bitmap>() {
                        @Override
                        public void accept(Bitmap bitmap) throws Exception {
                            showBitmap(bitmap);
                        }
                    });
    
    
    private void showBitmap(Bitmap bitmap) {
            imageView.setImageBitmap(bitmap);
        }
    

    上面的代码直接将资源显示到了Imagview上面

    flatMap( )
    我们知道from/just会对数据进行一一分发,但是flatmap会改变这个行为,被订阅时将所有数据传递完毕汇总到一个Observable然后一一执行onNext方法(执行顺序不做控制).

    flatMap.png

    实际上是flatMap会创建很多的Observa(上游水管),然后共同往下游的一个Observable灌数据这个时候,谁的数据先进入下游水管是没法控制的.
    以下的图来自网络,侵删~~


    flatmap_水管.png

    以前获取ip一直使用InetAddress.getByName(domain).getHostAddress()的静态方法获取的,不知道这次会什么一直不能解析域名?有知道的麻烦告诉一下,谢谢.后面使用下面的代码获取到所有的InetAddress

    private Collection<InetAddress> getServerIP(String domain) {
            Collection<InetAddress> myServer = new ArrayList<InetAddress>();
            List array =  null;
            try{
                array = Arrays.asList(InetAddress.getAllByName(domain));
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
    
            if (array == null || array.size() <= 0 || array.isEmpty()) {
                try {
                    Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
                    while (networkInterfaces.hasMoreElements()) {
                        NetworkInterface networkInterface = networkInterfaces.nextElement();
                        Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
                        while (inetAddresses.hasMoreElements()) {
                            InetAddress inetAddress = inetAddresses.nextElement();
                            myServer.add(inetAddress);
                        }
                    }
                } catch (SocketException e) {
                    e.printStackTrace();
                }
            } else {
                myServer.addAll(array);
            }
            return myServer;
        }
    

    好了,继续上面的问题,我们知道flatMap会创建很多ObservableSource,看一下代码就知道了,每次调用一次createIpObservable就会返回一个新的createIpObservable.

     private ObservableSource<?> createIpObservable(final String s) {
            return Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<String> subscriber) throws Exception {
                    Collection<InetAddress> array = getServerIP(s);
                    for (InetAddress inetAddress : array){
                        if (!TextUtils.isEmpty(inetAddress.getHostAddress())) {
                            String ip = inetAddress.getHostAddress();
                            subscriber.onNext(ip);
                            Log.d(TAG, "Emit Data -> " + s+" : " +ip);
                        }
                    }
                    subscriber.onComplete();
                }
            }).subscribeOn(Schedulers.newThread());
        }
    

    这里贴上flatMap代码

     private void flatMap(){
            Observable.just(
    
                    "http://www.baidu.com/",
                    "http://www.google.com/",
                    "https://www.bing.com/").flatMap(new Function<String, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(@NonNull String s) throws Exception {
                    return createIpObservable(s);
                }
            })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Object>() {
                        @Override
                        public void accept(Object o) throws Exception {
                            Log.i(TAG, o.toString());
                        }
                    });
    

    打印结果如下,第一种获取ip的方法,报错了.从log可以看出来,并没有按照baidu google bing的顺序,结合上面的水管解释是否很形象.

    10-21 17:27:52.008 16057-19602/com.example.philos.rxjavademo W/System.err: java.net.UnknownHostException: Unable to resolve host "https://www.bing.com/": No address associated with hostname
    10-21 17:27:52.018 16057-19598/com.example.philos.rxjavademo W/System.err: java.net.UnknownHostException: Unable to resolve host "http://www.baidu.com/": No address associated with hostname
    10-21 17:27:52.018 16057-19599/com.example.philos.rxjavademo W/System.err: java.net.UnknownHostException: Unable to resolve host "http://www.google.com/": No address associated with hostname
    10-21 17:27:52.018 16057-19599/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.lookupHostByName(InetAddress.java:424)
    10-21 17:27:52.018 16057-19599/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.getAllByNameImpl(InetAddress.java:236)
    10-21 17:27:52.018 16057-19599/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.getAllByName(InetAddress.java:214)
    10-21 17:27:52.018 16057-19599/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity.getServerIP(TransformOperateExampleActivity.java:340)
    10-21 17:27:52.018 16057-19599/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity.access$700(TransformOperateExampleActivity.java:50)
    10-21 17:27:52.018 16057-19599/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity$18.subscribe(TransformOperateExampleActivity.java:318)
    10-21 17:27:52.018 16057-19599/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.lookupHostByName(InetAddress.java:424)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.getAllByNameImpl(InetAddress.java:236)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.getAllByName(InetAddress.java:214)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity.getServerIP(TransformOperateExampleActivity.java:340)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity.access$700(TransformOperateExampleActivity.java:50)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity$18.subscribe(TransformOperateExampleActivity.java:318)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at io.reactivex.Observable.subscribe(Observable.java:10903)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err:     at java.lang.Thread.run(Thread.java:841)
    10-21 17:27:52.028 16057-19602/com.example.philos.rxjavademo W/System.err: Caused by: libcore.io.GaiException: getaddrinfo failed: EAI_NODATA (No address associated with hostname)
    10-21 17:27:52.028 16057-19598/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.lookupHostByName(InetAddress.java:424)
    10-21 17:27:52.028 16057-19598/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.getAllByNameImpl(InetAddress.java:236)
    10-21 17:27:52.028 16057-19598/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.getAllByName(InetAddress.java:214)
    10-21 17:27:52.028 16057-19598/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity.getServerIP(TransformOperateExampleActivity.java:340)
    10-21 17:27:52.028 16057-19598/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity.access$700(TransformOperateExampleActivity.java:50)
    10-21 17:27:52.028 16057-19598/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity$18.subscribe(TransformOperateExampleActivity.java:318)
    10-21 17:27:52.028 16057-19598/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    10-21 17:27:52.028 16057-19598/com.example.philos.rxjavademo W/System.err:     at io.reactivex.Observable.subscribe(Observable.java:10903)
    10-21 17:27:52.028 16057-19598/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
    10-21 17:27:52.028 16057-19598/com.example.philos.rxjavademo W/System.err:     at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)
    10-21 17:27:52.028 16057-19598/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
    10-21 17:27:52.028 16057-19598/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
    10-21 17:27:52.028 16057-19598/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    10-21 17:27:52.028 16057-19598/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
    10-21 17:27:52.028 16057-19598/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
    10-21 17:27:52.038 16057-19598/com.example.philos.rxjavademo W/System.err:     at   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
    10-21 17:27:52.038 16057-19599/com.example.philos.rxjavademo W/System.err:     at libcore.io.Posix.getaddrinfo(Native Method)   at io.reactivex.Observable.subscribe(Observable.java:10903)
    10-21 17:27:52.038 16057-19599/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
    10-21 17:27:52.038 16057-19599/com.example.philos.rxjavademo W/System.err:     at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)
    10-21 17:27:52.038 16057-19599/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
    10-21 17:27:52.038 16057-19599/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
    10-21 17:27:52.038 16057-19602/com.example.philos.rxjavademo W/System.err:  at 
    10-21 17:27:52.038 16057-19602/com.example.philos.rxjavademo W/System.err:     at libcore.io.ForwardingOs.getaddrinfo(ForwardingOs.java:61)
    10-21 17:27:52.038 16057-19602/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.lookupHostByName(InetAddress.java:405)
    10-21 17:27:52.038 16057-19602/com.example.philos.rxjavademo W/System.err:  ... 17 more
    10-21 17:27:52.038 16057-19598/com.example.philos.rxjavademo W/System.err: java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)这里
    10-21 17:27:52.038 16057-19598/com.example.philos.rxjavademo W/System.err:     at java.lang.Thread.run(Thread.java:841)
    10-21 17:27:52.038 16057-19598/com.example.philos.rxjavademo W/System.err: Caused by: libcore.io.GaiException: getaddrinfo failed: EAI_NODATA (No address associated with hostname)
    10-21 17:27:52.038 16057-19599/com.example.philos.rxjavademo W/System.err: java.util.concurrent.FutureTask.run(FutureTask.java:237)
    10-21 17:27:52.038 16057-19599/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
    10-21 17:27:52.038 16057-19599/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
    10-21 17:27:52.038 16057-19598/com.example.philos.rxjavademo W/System.err:     at   at libcore.io.Posix.getaddrinfo(Native Method)
    10-21 17:27:52.038 16057-19598/com.example.philos.rxjavademo W/System.err:     at libcore.io.ForwardingOs.getaddrinfo(ForwardingOs.java:61)
    10-21 17:27:52.038 16057-19598/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.lookupHostByName(InetAddress.java:405)
    10-21 17:27:52.038 16057-19598/com.example.philos.rxjavademo W/System.err:  ... 17 more
    10-21 17:27:52.048 16057-19599/com.example.philos.rxjavademo W/System.err: java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
    10-21 17:27:52.048 16057-19599/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
    10-21 17:27:52.048 16057-19599/com.example.philos.rxjavademo W/System.err:     at java.lang.Thread.run(Thread.java:841)
    10-21 17:27:52.048 16057-19599/com.example.philos.rxjavademo W/System.err: Caused by: libcore.io.GaiException: getaddrinfo failed: EAI_NODATA (No address associated with hostname)
    10-21 17:27:52.048 16057-19599/com.example.philos.rxjavademo W/System.err:     at libcore.io.Posix.getaddrinfo(Native Method)
    10-21 17:27:52.048 16057-19599/com.example.philos.rxjavademo W/System.err:     at libcore.io.ForwardingOs.getaddrinfo(ForwardingOs.java:61)
    10-21 17:27:52.048 16057-19599/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.lookupHostByName(InetAddress.java:405)
    10-21 17:27:52.048 16057-19599/com.example.philos.rxjavademo W/System.err:  ... 17 more
    10-21 17:27:52.068 16057-16057/com.example.philos.rxjavademo I/TransformOperateExampleActivity: ::1%1
    10-21 17:27:52.068 16057-19599/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.google.com/ : ::1%1
    10-21 17:27:52.068 16057-16057/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 127.0.0.1
    10-21 17:27:52.068 16057-19602/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> https://www.bing.com/ : ::1%1
    10-21 17:27:52.068 16057-16057/com.example.philos.rxjavademo I/TransformOperateExampleActivity: ::1%1
    10-21 17:27:52.068 16057-19599/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.google.com/ : 127.0.0.1
    10-21 17:27:52.068 16057-19599/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.google.com/ : fe80::e1d:afff:fec5:96c3%wlan0
    10-21 17:27:52.068 16057-16057/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 127.0.0.1
    10-21 17:27:52.068 16057-19599/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.google.com/ : 192.168.1.13
    10-21 17:27:52.068 16057-16057/com.example.philos.rxjavademo I/TransformOperateExampleActivity: fe80::e1d:afff:fec5:96c3%wlan0
    10-21 17:27:52.068 16057-19599/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.google.com/ : 10.0.2.15
    10-21 17:27:52.068 16057-19598/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.baidu.com/ : ::1%1
    10-21 17:27:52.068 16057-19598/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.baidu.com/ : 127.0.0.1
    10-21 17:27:52.068 16057-16057/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 192.168.1.13
    10-21 17:27:52.068 16057-19598/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.baidu.com/ : fe80::e1d:afff:fec5:96c3%wlan0
    10-21 17:27:52.068 16057-19598/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.baidu.com/ : 192.168.1.13
    10-21 17:27:52.068 16057-16057/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 10.0.2.15
    10-21 17:27:52.068 16057-19598/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.baidu.com/ : 10.0.2.15
    10-21 17:27:52.068 16057-16057/com.example.philos.rxjavademo I/TransformOperateExampleActivity: ::1%1
    10-21 17:27:52.068 16057-16057/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 127.0.0.1
    10-21 17:27:52.068 16057-16057/com.example.philos.rxjavademo I/TransformOperateExampleActivity: fe80::e1d:afff:fec5:96c3%wlan0
    10-21 17:27:52.068 16057-16057/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 192.168.1.13
    10-21 17:27:52.068 16057-16057/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 10.0.2.15
    10-21 17:27:52.078 16057-19602/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> https://www.bing.com/ : 127.0.0.1
    10-21 17:27:52.078 16057-16057/com.example.philos.rxjavademo I/TransformOperateExampleActivity: fe80::e1d:afff:fec5:96c3%wlan0
    10-21 17:27:52.078 16057-19602/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> https://www.bing.com/ : fe80::e1d:afff:fec5:96c3%wlan0
    10-21 17:27:52.078 16057-16057/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 192.168.1.13
    10-21 17:27:52.078 16057-19602/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> https://www.bing.com/ : 192.168.1.13
    10-21 17:27:52.078 16057-16057/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 10.0.2.15
    10-21 17:27:52.078 16057-19602/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> https://www.bing.com/ : 10.0.2.15
    

    concatMap 和 flatmap很相近,只不过他的水管不再是并联接入下游的,他是按照顺序串联之后接入下游的,所以他是可以保证发射和接收顺序一致的.

    concatMap.png

    为了对比,我们还是使用上面获取ip的列子,代码如下:

    private void concatMap(){
            Observable.fromArray(Arrays.asList(
                    "http://www.baidu.com/",
                    "http://www.google.com/",
                    "https://www.bing.com/")).concatMap(new Function<List<String>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(@NonNull List<String> strings) throws Exception {
                    return createIpObservable(strings);
                }
            }).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Object>() {
                        @Override
                        public void accept(Object o) throws Exception {
                            Log.i(TAG, o.toString());
                        }
                    });
    
        }
    

    上面的代码使用了fromArray,所以一下函数也会有一些差别,这个也可以看下fromArray和justde

    
     private ObservableSource<?> createIpObservable(final List<String> urls) {
            return Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<String> subscriber) throws Exception {
                    for (String s : urls) {
                        Collection<InetAddress> array = getServerIP(s);
                        for (InetAddress inetAddress : array){
                            if (!TextUtils.isEmpty(inetAddress.getHostAddress())) {
                                String ip = inetAddress.getHostAddress();
                                subscriber.onNext(ip);
                                Log.d(TAG, "Emit Data -> " + s+" : " +ip);
                            }
                        }
                    }
                    subscriber.onComplete();
                }
            }).subscribeOn(Schedulers.newThread());
        }
    

    打印的结果如下:

    10-21 18:03:44.158 3869-4593/com.example.philos.rxjavademo W/System.err: java.net.UnknownHostException: Unable to resolve host "http://www.baidu.com/": No address associated with hostname
    10-21 18:03:44.158 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.lookupHostByName(InetAddress.java:424)
    10-21 18:03:44.158 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.getAllByNameImpl(InetAddress.java:236)
    10-21 18:03:44.158 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.getAllByName(InetAddress.java:214)
    10-21 18:03:44.158 3869-4593/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity.getServerIP(TransformOperateExampleActivity.java:340)
    10-21 18:03:44.158 3869-4593/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity.access$700(TransformOperateExampleActivity.java:50)
    10-21 18:03:44.158 3869-4593/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity$20.subscribe(TransformOperateExampleActivity.java:370)
    10-21 18:03:44.158 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    10-21 18:03:44.158 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.Observable.subscribe(Observable.java:10903)
    10-21 18:03:44.158 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
    10-21 18:03:44.158 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)
    10-21 18:03:44.168 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
    10-21 18:03:44.168 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
    10-21 18:03:44.168 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    10-21 18:03:44.168 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
    10-21 18:03:44.168 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
    10-21 18:03:44.168 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
    10-21 18:03:44.168 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
    10-21 18:03:44.168 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.lang.Thread.run(Thread.java:841)
    10-21 18:03:44.168 3869-4593/com.example.philos.rxjavademo W/System.err: Caused by: libcore.io.GaiException: getaddrinfo failed: EAI_NODATA (No address associated with hostname)
    10-21 18:03:44.168 3869-4593/com.example.philos.rxjavademo W/System.err:     at libcore.io.Posix.getaddrinfo(Native Method)
    10-21 18:03:44.168 3869-4593/com.example.philos.rxjavademo W/System.err:     at libcore.io.ForwardingOs.getaddrinfo(ForwardingOs.java:61)
    10-21 18:03:44.168 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.lookupHostByName(InetAddress.java:405)
    10-21 18:03:44.168 3869-4593/com.example.philos.rxjavademo W/System.err:    ... 17 more
    10-21 18:03:44.218 3869-4593/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.baidu.com/ : ::1%1
    10-21 18:03:44.218 3869-4593/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.baidu.com/ : 127.0.0.1
    10-21 18:03:44.218 3869-4593/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.baidu.com/ : fe80::e1d:afff:fec5:96c3%wlan0
    10-21 18:03:44.218 3869-4593/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.baidu.com/ : 192.168.1.13
    10-21 18:03:44.218 3869-4593/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.baidu.com/ : 10.0.2.15
    10-21 18:03:44.218 3869-3869/com.example.philos.rxjavademo I/TransformOperateExampleActivity: ::1%1
    10-21 18:03:44.218 3869-3869/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 127.0.0.1
    10-21 18:03:44.218 3869-3869/com.example.philos.rxjavademo I/TransformOperateExampleActivity: fe80::e1d:afff:fec5:96c3%wlan0
    10-21 18:03:44.218 3869-3869/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 192.168.1.13
    10-21 18:03:44.218 3869-3869/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 10.0.2.15
    10-21 18:04:00.568 3869-4593/com.example.philos.rxjavademo W/System.err: java.net.UnknownHostException: Unable to resolve host "http://www.google.com/": No address associated with hostname
    10-21 18:04:00.568 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.lookupHostByName(InetAddress.java:424)
    10-21 18:04:00.568 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.getAllByNameImpl(InetAddress.java:236)
    10-21 18:04:00.568 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.getAllByName(InetAddress.java:214)
    10-21 18:04:00.578 3869-4593/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity.getServerIP(TransformOperateExampleActivity.java:340)
    10-21 18:04:00.578 3869-4593/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity.access$700(TransformOperateExampleActivity.java:50)
    10-21 18:04:00.578 3869-4593/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity$20.subscribe(TransformOperateExampleActivity.java:370)
    10-21 18:04:00.578 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    10-21 18:04:00.578 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.Observable.subscribe(Observable.java:10903)
    10-21 18:04:00.578 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
    10-21 18:04:00.578 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)
    10-21 18:04:00.578 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
    10-21 18:04:00.578 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
    10-21 18:04:00.578 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    10-21 18:04:00.588 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
    10-21 18:04:00.588 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
    10-21 18:04:00.588 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
    10-21 18:04:00.588 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
    10-21 18:04:00.588 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.lang.Thread.run(Thread.java:841)
    10-21 18:04:00.588 3869-4593/com.example.philos.rxjavademo W/System.err: Caused by: libcore.io.GaiException: getaddrinfo failed: EAI_NODATA (No address associated with hostname)
    10-21 18:04:00.588 3869-4593/com.example.philos.rxjavademo W/System.err:     at libcore.io.Posix.getaddrinfo(Native Method)
    10-21 18:04:00.588 3869-4593/com.example.philos.rxjavademo W/System.err:     at libcore.io.ForwardingOs.getaddrinfo(ForwardingOs.java:61)
    10-21 18:04:00.588 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.lookupHostByName(InetAddress.java:405)
    10-21 18:04:00.588 3869-4593/com.example.philos.rxjavademo W/System.err:    ... 17 more
    10-21 18:04:00.608 3869-4593/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.google.com/ : ::1%1
    10-21 18:04:00.608 3869-4593/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.google.com/ : 127.0.0.1
    10-21 18:04:00.608 3869-4593/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.google.com/ : fe80::e1d:afff:fec5:96c3%wlan0
    10-21 18:04:00.608 3869-4593/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.google.com/ : 192.168.1.13
    10-21 18:04:00.608 3869-4593/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> http://www.google.com/ : 10.0.2.15
    10-21 18:04:00.608 3869-3869/com.example.philos.rxjavademo I/TransformOperateExampleActivity: ::1%1
    10-21 18:04:00.608 3869-3869/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 127.0.0.1
    10-21 18:04:00.608 3869-3869/com.example.philos.rxjavademo I/TransformOperateExampleActivity: fe80::e1d:afff:fec5:96c3%wlan0
    10-21 18:04:00.608 3869-3869/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 192.168.1.13
    10-21 18:04:00.608 3869-3869/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 10.0.2.15
    10-21 18:04:17.038 3869-4593/com.example.philos.rxjavademo W/System.err: java.net.UnknownHostException: Unable to resolve host "https://www.bing.com/": No address associated with hostname
    10-21 18:04:17.038 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.lookupHostByName(InetAddress.java:424)
    10-21 18:04:17.038 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.getAllByNameImpl(InetAddress.java:236)
    10-21 18:04:17.038 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.getAllByName(InetAddress.java:214)
    10-21 18:04:17.038 3869-4593/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity.getServerIP(TransformOperateExampleActivity.java:340)
    10-21 18:04:17.038 3869-4593/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity.access$700(TransformOperateExampleActivity.java:50)
    10-21 18:04:17.038 3869-4593/com.example.philos.rxjavademo W/System.err:     at com.example.philos.rxjavademo.operate.TransformOperateExampleActivity$20.subscribe(TransformOperateExampleActivity.java:370)
    10-21 18:04:17.038 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    10-21 18:04:17.038 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.Observable.subscribe(Observable.java:10903)
    10-21 18:04:17.038 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
    10-21 18:04:17.048 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)
    10-21 18:04:17.048 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
    10-21 18:04:17.048 3869-4593/com.example.philos.rxjavademo W/System.err:     at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
    10-21 18:04:17.048 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    10-21 18:04:17.048 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
    10-21 18:04:17.048 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
    10-21 18:04:17.048 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
    10-21 18:04:17.048 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
    10-21 18:04:17.048 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.lang.Thread.run(Thread.java:841)
    10-21 18:04:17.058 3869-4593/com.example.philos.rxjavademo W/System.err: Caused by: libcore.io.GaiException: getaddrinfo failed: EAI_NODATA (No address associated with hostname)
    10-21 18:04:17.058 3869-4593/com.example.philos.rxjavademo W/System.err:     at libcore.io.Posix.getaddrinfo(Native Method)
    10-21 18:04:17.058 3869-4593/com.example.philos.rxjavademo W/System.err:     at libcore.io.ForwardingOs.getaddrinfo(ForwardingOs.java:61)
    10-21 18:04:17.058 3869-4593/com.example.philos.rxjavademo W/System.err:     at java.net.InetAddress.lookupHostByName(InetAddress.java:405)
    10-21 18:04:17.058 3869-4593/com.example.philos.rxjavademo W/System.err:    ... 17 more
    10-21 18:04:17.078 3869-4593/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> https://www.bing.com/ : ::1%1
    10-21 18:04:17.078 3869-3869/com.example.philos.rxjavademo I/TransformOperateExampleActivity: ::1%1
    10-21 18:04:17.078 3869-4593/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> https://www.bing.com/ : 127.0.0.1
    10-21 18:04:17.078 3869-4593/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> https://www.bing.com/ : fe80::e1d:afff:fec5:96c3%wlan0
    10-21 18:04:17.078 3869-4593/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> https://www.bing.com/ : 192.168.1.13
    10-21 18:04:17.078 3869-4593/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> https://www.bing.com/ : 10.0.2.15
    10-21 18:04:17.078 3869-3869/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 127.0.0.1
    10-21 18:04:17.078 3869-3869/com.example.philos.rxjavademo I/TransformOperateExampleActivity: fe80::e1d:afff:fec5:96c3%wlan0
    10-21 18:04:17.078 3869-3869/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 192.168.1.13
    10-21 18:04:17.078 3869-3869/com.example.philos.rxjavademo I/TransformOperateExampleActivity: 10.0.2.15
    

    flatMapIterable
    flatMapIterable()和flatMap()几乎是一样的(把一个数据产生出多个数据,如图的把一个圆变成了两个其他类型的棱形),区别是 flatMap 参数把每个数据转换为 一个新的 Observable,而 flatMapIterable 参数把一个数据转换为一个新的 iterable 对象。
    处理一些复杂的数据,接受一个Observable之后,返回一个Iterable,然后,这个Iterable会依次的传递给下面一层或者是Observer

    flatMapIterable.png

    这里就不去验证他的无序性了(请参考flatmap),这里来一个简单的例子,我们打印出B,U,G以及他们后面的一个字符的ASCII码

    private void flatMapIterable(){
            Observable.just("B", "U", "G").flatMapIterable(new Function<String, Iterable<Integer>>() {
                @Override
                public Iterable<Integer> apply(@NonNull String s) throws Exception {
                    return getIntegers(s);
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.i(TAG, "ASCII 码:" + integer);
                }
            });
    
        }
    

    打印结果如下

    10-21 20:35:20.151 25291-25291/com.example.philos.rxjavademo I/TransformOperateExampleActivity: ASCII 码:67
    10-21 20:35:20.151 25291-25291/com.example.philos.rxjavademo I/TransformOperateExampleActivity: ASCII 码:68
    10-21 20:35:20.151 25291-25291/com.example.philos.rxjavademo I/TransformOperateExampleActivity: ASCII 码:86
    10-21 20:35:20.151 25291-25291/com.example.philos.rxjavademo I/TransformOperateExampleActivity: ASCII 码:87
    10-21 20:35:20.151 25291-25291/com.example.philos.rxjavademo I/TransformOperateExampleActivity: ASCII 码:72
    10-21 20:35:20.151 25291-25291/com.example.philos.rxjavademo I/TransformOperateExampleActivity: ASCII 码:73
    
    

    switchMap

    switchMap()和flatMap()很像,除了一点:每当源Observable发射一个新的数据项(Observable)时,它将取消订阅并停止监视之前那个数据项产生的Observable,并开始监视当前发射的这一个。

    switchMap.png
    private void  switchMap(){
            Observable.just(
                    "http://www.baidu.com/",
                    "http://www.google.com/",
                    "https://www.bing.com/").switchMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(@NonNull String s) throws Exception {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return Observable.just(s).subscribeOn(Schedulers.newThread());
                }
            }).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    Log.i(TAG, "onSubscribe");
                }
    
                @Override
                public void onNext(@NonNull String s) {
                    Log.i(TAG, "onNext:" + s);
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    Log.i(TAG, "onError " + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.i(TAG, "onComplete()");
                }
            });
        }
    

    打印结果

    10-21 20:42:47.721 32149-32149/com.example.philos.rxjavademo I/TransformOperateExampleActivity: onSubscribe
    10-21 20:42:53.751 32149-32424/com.example.philos.rxjavademo I/TransformOperateExampleActivity: onNext:https://www.bing.com/
    10-21 20:42:53.751 32149-32424/com.example.philos.rxjavademo I/TransformOperateExampleActivity: onComplete()
    

    可以看到,前面两个域名发射出来后都被取消了

    scan

    scan( ) 对一个序列的数据应用一个函数,并将这个函数的结果发射出去作为下个数据应用这个函数时候的第一个参数使用,有点类似于递归操作(第一个由于前面没有数据,只能直接原样输出)

    scan.jpg
    private void scan(){
            Observable.just(1, 2, 3, 4, 5).scan(new BiFunction<Integer, Integer, Integer>() {
                @Override
                public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
                    return integer + integer2;
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "Emit Data -> " + integer);
                }
            });
        }
    

    输出结果如下:

    10-21 20:54:32.891 10353-10353/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> 1
    10-21 20:54:32.891 10353-10353/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> 3
    10-21 20:54:32.891 10353-10353/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> 6
    10-21 20:54:32.891 10353-10353/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> 10
    10-21 20:54:32.891 10353-10353/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Emit Data -> 15
    

    buffer

    buffer 先计算一定量的结果,之后再去回调结果给下一个Observable或者是Observer

    buffer.png
     Observable.just("A", "B", "C")
                    .buffer(2)
                    .subscribe(new Consumer<List<String>>() {
                        @Override
                        public void accept(List<String> strings) throws Exception {
                            Log.i(TAG, strings + " == buffer");
                        }
                    });
    
    10-21 21:02:40.801 18367-18367/com.example.philos.rxjavademo I/TransformOperateExampleActivity: [A, B] == buffer
    10-21 21:02:40.811 18367-18367/com.example.philos.rxjavademo I/TransformOperateExampleActivity: [C] == buffer
    

    buffer(int count, int skip) // 一次缓存几个,之后每次跳过几个,按照顺序,重复,每次剔除最前面的
    buffer(long timespan, long timeshift, TimeUnit unit)
    buffer(long timespan, TimeUnit unit)// 每个多久,取出数据

    buffer.jpg

    经验正下面代码的结果,和上图一致

    Observable.just("A", "B", "C", "E", "F", "G")
                    .buffer(2,3)
                    .subscribe(new Consumer<List<String>>() {
                        @Override
                        public void accept(List<String> strings) throws Exception {
                            Log.i(TAG, strings + " == buffer");
                        }
                    });
    

    上面的例子看不出有什么用,那么在来看一个实际的例子.

    我们开发有碰到这样的需求,比如地图需要把附近的人的头像标注到地图上,为了优化聚合效果和体验,一般都是获取了头像之类的相关信息,之后全部一起显示到地图上.
    下面为伪代码:

     ArrayList<LatLng> latLngs=new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                LatLng latLng = new LatLng();
                latLng.setLat(24 + i);
                latLng.setLng(110 + i);
                latLngs.add(latLng);
            }
    
            Observable
                    .fromArray(latLngs).map(new Function<ArrayList<LatLng>, Object>() {
                @Override
                public Object apply(@NonNull ArrayList<LatLng> latLngs) throws Exception {
                    //伪代码
                    Bitmap bimap = loadAvatar();
    
    //                BitmapDescriptor bitDes = BitmapDescriptorFactory.fromBitmap(bimap);
    //                OverlayOptions option = new MarkerOptions().position(latLng).icon(bitDes);
                    //获得地图标注
                    Object option = null;
                    return option;
                }
            }).buffer(latLngs.size())//一次性全部集齐
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.computation())//Schedulers.computation()调度是精于计算工作的,它也是许多RxJava方法的默认调度器   buffer(),debounce(),delay(),interval(),sample(),skip()。
                    .subscribe(new Consumer<List<Object>>() {
                        @Override
                        public void accept(List<Object> objects) throws Exception {
    //                        baiduMap.addOverlays(overlayOptionses);//全部添加到地图上
                        }
                    });
    //                .subscribe(new Action1<List<OverlayOptions>>() {
    //                    @Override
    //                    public void call(List<OverlayOptions> overlayOptionses) {
    //                        baiduMap.addOverlays(overlayOptionses);//全部添加到地图上
    //                    }
    //                });
    
            Observable.just("A","B","C").buffer(2).subscribe(new Consumer<List<String>>() {
                @Override
                public void accept(List<String> strings) throws Exception {
                    Log.i(TAG, strings +" == buffer");
                }
            });
    

    groupBy

    groupBy() 分组 就是按照一种类型的key分组,将groupBy将原始Observable按照key分解为一个发射多个GroupedObservable的Observable,然后这些小Observable分别发射其所包含的的数据,一旦有订阅,每个GroupedObservable就开始缓存数据.其实和SQL中的groupBy类似,找出符合条件的相关数据。

    groupby.png
    Observable.just("A","AD","BC","AB","C","D").groupBy(new Function<String, Boolean>() {
                @Override
                public Boolean apply(@NonNull String s) throws Exception {
                    return s.contains("A");
                }
            }).subscribe(new Consumer<GroupedObservable<Boolean, String>>() {
                @Override
                public void accept(GroupedObservable<Boolean, String> booleanStringGroupedObservable) throws Exception {
                    Log.i(TAG, booleanStringGroupedObservable.getKey() + "====" +booleanStringGroupedObservable.toString());
    booleanStringGroupedObservable.subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.d(TAG, booleanStringGroupedObservable.getKey() + "包含的数据: " + s);
                        }
                    });
                }
            });
    

    根据s.contains("A")条件,我们知道所有的数据会被划分为包含和不包含两种GroupedObservable,从打印信息Log.i(TAG, booleanStringGroupedObservable.getKey() + "====" +booleanStringGroupedObservable.toString());我们只看到了两个,验证了上面的理论.但是两个GroupedObservable发射的数据并没有先后分开,那是因为subscribe订阅的时候,onext并没有这个功能.

    10-21 21:23:42.381 6448-6448/com.example.philos.rxjavademo I/TransformOperateExampleActivity: true====io.reactivex.internal.operators.observable.ObservableGroupBy$GroupedUnicast@42e4f390
    10-21 21:23:42.381 6448-6448/com.example.philos.rxjavademo D/TransformOperateExampleActivity: true包含的数据: A
    10-21 21:23:42.381 6448-6448/com.example.philos.rxjavademo D/TransformOperateExampleActivity: true包含的数据: AD
    10-21 21:23:42.381 6448-6448/com.example.philos.rxjavademo I/TransformOperateExampleActivity: false====io.reactivex.internal.operators.observable.ObservableGroupBy$GroupedUnicast@42e50120
    10-21 21:23:42.391 6448-6448/com.example.philos.rxjavademo D/TransformOperateExampleActivity: false包含的数据: BC
    10-21 21:23:42.391 6448-6448/com.example.philos.rxjavademo D/TransformOperateExampleActivity: true包含的数据: AB
    10-21 21:23:42.391 6448-6448/com.example.philos.rxjavademo D/TransformOperateExampleActivity: false包含的数据: C
    10-21 21:23:42.391 6448-6448/com.example.philos.rxjavademo D/TransformOperateExampleActivity: false包含的数据: D
    

    window

    window操作符与Buffer操作符类似,但是它发射的是Observable而不是列表。

    window.png

    // 每隔ns集中发射这段时间内的数据,而不是一有数据就发射。(之发送前面的10次) 优化接口请求次数

    window_skip.jpg
    Observable.interval(1, TimeUnit.SECONDS).take(10)
                    .window(3, TimeUnit.SECONDS)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Observable<Long>>() {
                        @Override
                        public void accept(Observable<Long> observable) throws Exception {
                            Log.d(TAG, "Sub Divide begin....");
                            textView.append("Sub Divide begin ....");
                            textView.append(" ....... ");
                            observable
                                    .subscribeOn(Schedulers.io())
                                    .observeOn(AndroidSchedulers.mainThread())
                                    .subscribe(new Consumer<Long>() {
                                        @Override
                                        public void accept(Long value) {
                                            Log.d(TAG, "Next:" + value);
                                            textView.append("Next:" + value);
                                            textView.append(" ....... ");
                                        }
                                    });
                        }
                    });
    

    interval 1秒发送一次数据,take(10)获取前面的10个数据,window(3, TimeUnit.SECONDS)3秒订阅一次,从进入accept函数 Log.d(TAG, "Sub Divide begin....");的时间间隔可以看出.

    10-21 21:34:18.051 18257-18257/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Sub Divide begin....
    10-21 21:34:19.051 18257-18257/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Next:0
    10-21 21:34:20.051 18257-18257/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Next:1
    10-21 21:34:21.051 18257-18257/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Sub Divide begin....
    10-21 21:34:21.071 18257-18257/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Next:2
    10-21 21:34:22.051 18257-18257/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Next:3
    10-21 21:34:23.051 18257-18257/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Next:4
    10-21 21:34:24.051 18257-18257/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Sub Divide begin....
    10-21 21:34:24.081 18257-18257/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Next:5
    10-21 21:34:25.051 18257-18257/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Next:6
    10-21 21:34:26.051 18257-18257/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Next:7
    10-21 21:34:27.051 18257-18257/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Sub Divide begin....
    10-21 21:34:27.091 18257-18257/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Next:8
    10-21 21:34:27.101 18257-18265/com.example.philos.rxjavademo I/dalvikvm: Total arena pages for JIT: 11
    10-21 21:34:27.101 18257-18265/com.example.philos.rxjavademo I/dalvikvm: Total arena pages for JIT: 12
    10-21 21:34:28.051 18257-18257/com.example.philos.rxjavademo D/TransformOperateExampleActivity: Next:9
    
    
    device-2017-10-21-213514.png

    cast
    cast 在发射之前强制将Observable发射的所有数据转换为指定类型 它是map()操作符的特殊版本

    cast.png
    public void cast() {
            List<Person> list = new ArrayList<>();
            for (int i = 0; i < 30; i++) {
                if (i/11 == 0) {
                    list.add(new TaiJian("i" + i));
                } else {
                    list.add(new Male(true, "i" + i));
                }
    
            }
            for (Person person : list) {
                Observable.just(person).cast(Male.class).subscribe(new Observer<Male>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        Log.i(TAG, "onSubscribe");
                    }
    
                    @Override
                    public void onNext(@NonNull Male male) {
                        Log.i(TAG, "name:" + male.name + "\n 我是男人:" + male.isMan);
                    }
    
                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.i(TAG, "onError: " + e.getMessage());
                    }
    
                    @Override
                    public void onComplete() {
                        Log.i(TAG, "onComplete");
                    }
                });
            }
    
        }
    
        class Person {
            String name;
    
            public Person(String name) {
                this.name = name;
            }
        }
    
        class Male extends Person {
            public boolean isMan;
    
            public Male(boolean isMan, String name) {
                super(name);
                this.isMan = isMan;
            }
        }
    
        class TaiJian extends Male {
    
            public TaiJian(String name) {
                super(false, name);
            }
        }
    

    到这里,转换操作符都讲完了,下一章研究下过滤操作符.

    相关文章

      网友评论

        本文标题:Android拾萃 - RxJava2之变换操作符及其demo

        本文链接:https://www.haomeiwen.com/subject/vdkmuxtx.html