美文网首页Java技术干货Android开发那些事
RxJava 第一篇 - Single使用及示例

RxJava 第一篇 - Single使用及示例

作者: H3c | 来源:发表于2016-04-13 10:00 被阅读8601次

    本文开始介绍RxJava的使用,和别的RxJava文章不同的地方在于我们直接从实战开始介绍,不讲那么多的花拳绣腿。好多RxJava启蒙文章都是你抄我我抄你,抄来抄去也就那么几个场景,换到自己的项目中还是不会使用RxJava,本文另辟蹊径实战讲解。

    既然是另辟蹊径,那么肯定开场不能跟别人一样。什么Observable,什么Subscribe,什么Observe,什么Subscribe...这几个长得差不多的都是什么鬼?我们暂时统统不管,且请还没入门的小白同学们暂且忘记这些。我来告诉你们什么才是RxJava的启蒙。

    简单入门

    我们要知道的第一个概念叫做"Single",中文翻译叫一个或者单一的,也可以作为单身的意思,想必你就是单身吧....

    第二个概念叫做"just",在RxJava中Single的just支持传入任意参数。使用方法如:

    Single.just(T value);
    

    知道这段代码,就可以说RxJava你要入门了噢。你想想看我们为啥要用RxJava,主要是为了异步处理一些任务。通常情况的异步任务都是什么样子的呢?传入一些值,经过逻辑处理之后返回结果。

    以上代码中的just(T value),可以看作是传入参数。

    有了输入,如何输出呢?我们讲到了异步处理,那自然得有一个异步回调才行吧,这简直理所应当吧。所以这里介绍最重要的"subscribe"概念。有的同学会疑惑说“你不是说不讲subscribe的嘛”。嗯咳,不讲不行嘛,谁叫人家是最重要的“订阅概念”呢。我只是让你们学习之前先忘记那些让人混淆的概念,从新开始嘛。废话少说,看代码

    Single.just(T value).subscribe();
    

    例如:

    int addValue(int a, int b) {
        return a + b;
    }
    
    // ps:不管是否subscribe(),只要使用just(),addValue()都会执行
    Single.just(addValue(1, 2)).subscribe();
    

    异步任务知道如何执行了,那咱们该想办法拿到回调了!

    Single.just(addValue(1, 2)).subscribe(new SingleSubscriber<Integer>() {    
        @Override    
        public void onSuccess(Integer value) {
            // value = 3
        }
        @Override
        public void onError(Throwable error) {}
    });
    

    这里我们使用一个叫SingleSubscriber的对象接收回调,作为入门只要记住就好了。其中onSuccess(T value)会接收just(T value)的传入值,当addValue()方法抛异常的时候会自动调用onError()。

    通过以上的例子,我们学会了如何简单的传入参数,并且输出结果。但是这里我并不想误导大家,这里仅仅只是RxJava的开始,其中just()方法无论如何都只会在当前线程里执行。所以即使看上去有异步的过程,但其实这是个同步的过程!

    Single高阶

    接下来我们会很深入的讲解Single的用法了。主要介绍Single的操作符,用一星~三星表示重要程度,三星表示很常用,一星表示了解就行。按首字母顺序介绍。

    compose操作符[*]

    创建一个自定义的操作符,将某种范型的Single转换为另一种范型一般用不到,示例如下,将Integer的Single转为String Single。

    Single.just(addValue(1, 2))
            .compose(new Single.Transformer<Integer, String>() {
                @Override
                public Single<String> call(Single<Integer> integerSingle) {
                    return integerSingle.map(new Func1<Integer, String>() {
                        @Override
                        public String call(Integer integer) {                        
                            return String.valueOf(integer + 2);
                        }
                    });
                }
            })
            .subscribe(new SingleSubscriber<String>() {
                @Override
                public void onSuccess(String value) {
                    // value = 5
                }
                @Override
                public void onError(Throwable error) {}
            });
    

    这个例子就看起来很复杂了吧,不过不用担心,这个场景很少用到,这里只是简单介绍一下:之前的例子中我们创建了一个Integer的Single其中返回值是一加二的结果三,但是我们其实需要的结果并不是整型的三是字符串三又或是其他类型的对象,这个时候我们就需要结合map()操作符转换一下Single。map()操作符我们只会会讲,只要知道它可以用来转换类型就行。这里值得注意的是compose()方法可以指定线程运行,即可以指定Schedulers。如果不懂不要担心,以后还会介绍。

    concat操作符[*]

    用来连接多个Single和Observable发射的数据。

    Single.concat(Single.just(checkNetwork()), Single.just(checkMemory()), Single.just(doSth()))
    

    仅仅用来连接Single顺序执行的,比如顺序执行检查网络,检查内存,执行任务,注意:如果某个Single调用了onError()会导致被中断。

    create操作符[***]

    // 作用同Single.just(addValue(1, 2));
    Single.create(new Single.OnSubscribe<Integer>() {
        @Override
        public void call(SingleSubscriber<? super Integer> singleSubscriber) {
            singleSubscriber.onSuccess(addValue(1, 2));
        }
    });
    
    // 常见的示例,这是一个异步操作
    Single.create(new Single.OnSubscribe<Integer>() {
        @Override
        public void call(SingleSubscriber<? super Integer> singleSubscriber) {
            // 这里被指定在IO线程
            singleSubscriber.onSuccess(addValue(1, 2));
        }
    }).subscribeOn(Schedulers.io())// 指定运行在IO线程
    .subscribe(new Subscriber<Integer>() {
        @Override
        public void onCompleted() {            }
        @Override
        public void onError(Throwable e) {    }
        @Override
        public void onNext(Integer o) {  
            // o = 3
        }
    });
    

    值得注意的是之前我们使用的just()是一种特殊的create(),它不能指定Schedulers。只能在当前线程中运行,而create()可以指定Schedulers实现异步处理。且just()不管是否被subscribe()订阅均会被调用,而create()如果不被订阅是不会被调用的。所以我们通常可以用just()传递简单参数,而用create()处理复杂异步逻辑。

    error操作符[*]

    返回一个立即给订阅者发射错误通知的Single,一般用于调试,不常用。

    // 如人为让concat中断:
    Single.concat(s1, Single.error(new Throwable("error"), s2)).subscribe();
    

    flatMap操作符[*]

    flatMap<T,R>基本上等同于map(),唯一的区别在于flatMap的R一般用于返回Observable对象,这样随后的subscribe参数可以使用原始类型。详见代码

    Single.just(1).flatMap(new Func1<Integer, Single<String>>() {
        @Override
        public Single<String> call(Integer x) {
            return Single.just(x + "");
        }}).subscribe(new Action1<String>() {// 注意这里返回值的区别
        @Override
        public void call(String s) {
            LogHelper.e("_flatMap:"+s);
        }});
    
    Single.just(1).map(new Func1<Integer, Single<String>>() {
        @Override
        public Single<String> call(Integer x) {
            return Single.just(x + "");
        }}).subscribe(new Action1<Single<String>>() {
        @Override
        public void call(Single<String> s) {// 注意这里返回值的区别
            LogHelper.e("_flatMap:"+s);
        }});
    

    一般map()是用于一对一的返回,而flatMap()用于一对0~多的返回。比如我们看下面这个例子:

    static Observable<List<String>> query() {
        List<String> s = Arrays.asList("Java", "Android", "Ruby", "Ios", "Swift"); 
        return Observable.just(s);
    }
    
    // 注意这里的参数是 query所返回的Observable的输出,并且返会一个Observable<String>
    query().flatMap(new Func1<List<String>, Observable<String>>() { 
        @Override 
        public Observable<String> call(List<String> strings) {
            //结合from处理 return Observable.from(strings);
           }}).subscribe(new Action1<String>() {
              @Override 
              public void call(String s) { 
                  System.out.println("_flatMap:"+s);
              }
    });
    

    输出:

    _flatMap:Java
    _flatMap:Android
    _flatMap:Ruby
    _flatMap:Ios
    _flatMap:Swift
    

    这里传入了一个List<String>,传出了多个String。而且应该多用于Observable,很少用在Single中,即使用也不如map()来的爽快,这里只做了解即可。

    flatMapObservable操作符[**]

    刚刚说到flatMap()和map()类似,区别在于flatMap可以返回多个值,而map只能返回一个。但在Single中flatMap只能返回Single,几乎等同于map实用性不高。而flatMapObservable就不同了,它支持将Single转化为Observable对象,可以返回多个值。下面这个例子介绍如何将Single转化为Observable。

    Single.just(1).flatMapObservable(new Func1<Integer, Observable<String>>() {
        @Override
        public Observable<String> call(Integer integer) {
            return Observable.just("H", "3", "c");
        }}).subscribe(new Action1<String>() {// 注意这里返回值的区别
        @Override
        public void call(String s) {
            LogHelper.e("kk:"+s); 
       }
    });
    

    这里传入一个整型1,输出"H","3","c"三个字符串。

    from操作符[*]

    Single的from操作符仅允许传入一个java Future对象,由于java Future几乎很少使用,所以该操作符在Single中没什么实用意义。

    map操作符[***]

    Single.just(1).map(new Func1<Integer, String>() {
        @Override
        public String call(Integer integer) {
            return "x";
        }}).subscribe(new Action1<String>() {// 注意这里返回值的区别
        @Override
        public void call(String s) {
            LogHelper.e("kk:"+s);
        }
    });
    

    map操作符之前有介绍过,用于类型一对一转换,比较简单。

    merge & mergeWith操作符[*]

    merge操作符类似于concat,他们的区别见下图


    concat imgconcat img
    merge imgmerge img

    subscribeOn操作符[***]

    用于指定异步任务的线程,常见的有:

    Schedulers.computation( );// 计算线程
    Schedulers.from(executor);// 自定义
    Schedulers.immediate();// 当前线程
    Schedulers.io();// io线程
    Schedulers.newThread();// 创建新线程
    Schedulers.trampoline();// 当前线程队列执行
    

    onErrorReturn操作符[***]

    相当于try catch中的return,具体意思就是当函数抛出错误的时候给出一个返回值,看代码:

    Single.create(new Single.OnSubscribe<Integer>() {
        @Override
        public void call(SingleSubscriber<? super Integer> singleSubscriber) {
            singleSubscriber.onError(new Throwable("x"));
        }}).onErrorReturn(new Func1<Throwable, Integer>() {
        @Override
        public Integer call(Throwable throwable) {
            return 2;
        }}).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer s) {
            LogHelper.e("kk:"+s);
        }
    });
    

    返回结果2

    observeOn

    指定回调所在线程

    // 常见的为,即Android UI线程
    AndroidSchedulers.mainThread();
    

    timeout操作符[***]

    超时操作操作,在指定时间内如果没有调用onSuccess()就判定为失败,且可支持失败的时候调用其他Single()

    toSingle操作符[*]

    将传入一个参数的Observable转换为Single

    Observable.just(1).toSingle();
    

    zip & zipWith操作符[**]

    如果说flatMap()是将一个Single变成多个的操作,那么zip刚刚相反,他可以将多个Single整合为一个

    Single.zip(s1, s2, new Func2<Integer, Integer, String>() {
        @Override
        public String call(Integer o, Integer o2) {
            LogHelper.e("A:" + o + "=" + o2);
            return null;
        }}).subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            LogHelper.e("kk:"+s);
        }
    });
    

    RxJava Single的用法就全部讲完了,你学会了吗?如果还不会....那我告诉你Single的万金示例吧....

    Single.just(1)
            .map(new Func1<Integer, String>() {
                @Override
                public String call(Integer integer) {
                    return integer + "";
                }
            })
            .observeOn(Schedulers.io())
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    // result
                }
            });
    

    用这个例子足以应付大部分的一入一输出的异步任务了。
    下篇我们会介绍Subject。

    相关文章

      网友评论

      • 厡子:有点疑问,简单入门中说:“当addValue()方法抛异常的时候会自动调用onError()”,例子中的代码是
        Single.just(addValue(1, 2)).subscribe(...);
        例子中是用addValue(1, 2)的返回值,放到just()中,为何它抛的异常不是到当前上下文,而是被这个single获取呢?如果是Single.create((a) -> {a.next(addValue(1, 2))}) 那我觉得还能理解
      • 五万年前走丢了:楼主这个自言自语的习惯非常好,我很喜欢.
      • 施瓦辛格777:讲得非常好,让我快速入门了。
      • captain991:个人感觉just也就是写写demo等简单例子的时候能用,就像subscribe的时候传Action似的,不抛异常没问题,一抛异常没写处理,只能崩溃,而且还只能在subscribe的线程运行,不能调度,太鸡肋了
      • MigrationUK:其实你这篇入门更难☺
      • jdsjlzx:public static void test1() {

        Single<String> single = Single.create(new OnSubscribe<String>() {

        @Override
        public void call(SingleSubscriber<? super String> subscriber) {
        subscriber.onSuccess("Tom");
        }
        });

        SingleSubscriber<String> singleSubscriber = new SingleSubscriber<String>() {

        @Override
        public void onSuccess(String value) {
        // TODO Auto-generated method stub
        System.out.println("onSuccess value = " + value);
        }

        @Override
        public void onError(Throwable error) {
        System.out.println("onError error : " + error);
        }
        };

        single.subscribe(singleSubscriber);

        System.out.println("singleSubscriber.isUnsubscribed() ? " + singleSubscriber.isUnsubscribed());

        }

        运行结果:

        onSuccess value = Tom
        singleSubscriber.isUnsubscribed() ? false

        官网说:Single只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关系终止。

        而代码运行后,为什么singleSubscriber没有取消订阅呢?
      • openGL小白: Single.just(1).subscribeOn(Schedulers.io()).map(new Func1<Integer, String>() {
        @Override
        public String call(Integer integer) {
        Log.d("RxJava","Func1.thread.id"+Thread.currentThread().getId());
        try {
        Thread.sleep(1000);
        } catch (InterruptedException e) {
        e.printStackTrace();
        }
        return ""+(integer+1);
        }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
        Log.d("RxJava","Action1.thread.id"+Thread.currentThread().getId());
        }
        });
        Log.d("RxJava","onClick1Event.after:"+Thread.currentThread().getId());
        //=================================================
        06-23 08:39:33.481 23706-23706/com.coolstar.rxjavatest D/RxJava: onClick1Event.after:1
        06-23 08:39:33.486 23706-26035/com.coolstar.rxjavatest D/RxJava: Func1.thread.id4202
        06-23 08:39:34.491 23706-26035/com.coolstar.rxjavatest D/RxJava: Action1.thread.id4202
        //==================================================
        为什么我写的observeOn(AndroidSchedulers.mainThread())没起作用?
        按作者原来的写法,Func在我这也是在1主UI线程中执行的。
        H3c:@coolstar1204 subscribeOn方法必须写在map()后observeOn()前。注意顺序
      • 989977a78105:'其中onSuccess(T value)会接收just(T value)的传入值',其中的'传入值'是不是应该为'返回值'?
      • 51f82b05f43f:用Single和直接用OObservable有什么优势和好处呢?我一直没明白这个问题,请指教
        reStart丶丶:Single 应该是 : 总是只发射一个值,或者一个错误通知,而不是发射一系列的值
        __huazhou:所以嘛...我建议作者你最好能把 Single 和 Observable 的区别说下,我开始还以为 Single 是你自己包装的类
        H3c:@brucewuu 简单啊,Single就是单一参数的Observable,可以处理大部分简单的场景。当然你想用Observable也行
      • Max____:private void demo() {
        Single.just(1).map(integer -> {
        showCurrentThread("map function call()");
        return integer + "";
        })
        .observeOn(Schedulers.io())
        .subscribeOn(AndroidSchedulers.mainThread())
        .subscribe(s -> {
        // result
        showCurrentThread("onSuccess");
        });
        }

        private void showCurrentThread(String tag) {
        Log.i(tag, " thread = " + Thread.currentThread());
        }


        运行结果:
        I/map function call(): thread = Thread[main,5,main]
        I/onSuccess: thread = Thread[RxCachedThreadScheduler-1,5,main]


        是博主说的subscribOn和observeOn反了还是我哪里理解错了?
        H3c:@maodq 是的subscribeOn是任务线程, observeOn是回调线程。

      本文标题:RxJava 第一篇 - Single使用及示例

      本文链接:https://www.haomeiwen.com/subject/lckwlttx.html