给初学者的RxJava2.0教程(二)

作者: Season_zlc | 来源:发表于2016-12-06 11:16 被阅读63367次

    Outline

    [TOC]

    前言

    上一节教程讲解了最基本的RxJava2的使用, 在本节中, 我们将学习RxJava强大的线程控制.

    正题

    还是以之前的例子, 两根水管:

    RxJava

    正常情况下, 上游和下游是工作在同一个线程中的, 也就是说上游在哪个线程发事件, 下游就在哪个线程接收事件.

    怎么去理解呢, 以Android为例, 一个Activity的所有动作默认都是在主线程中运行的, 比如我们在onCreate中打出当前线程的名字:

        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
            Log.d(TAG, Thread.currentThread().getName());
        }
    

    结果便是:

    D/TAG: main
    

    回到RxJava中, 当我们在主线程中去创建一个上游Observable来发送事件, 则这个上游默认就在主线程发送事件.

    当我们在主线程去创建一个下游Observer来接收事件, 则这个下游默认就在主线程中接收事件, 来看段代码:

    @Override                                                                                       
    protected void onCreate(Bundle savedInstanceState) {                                            
        super.onCreate(savedInstanceState);                                                         
        setContentView(R.layout.activity_main);                                                     
                                                                              
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {   
            @Override                                                                               
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {            
                Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());           
                Log.d(TAG, "emit 1");                                                               
                emitter.onNext(1);                                                                  
            }                                                                                       
        });                                                                                         
                                                                                                    
        Consumer<Integer> consumer = new Consumer<Integer>() {                                      
            @Override                                                                               
            public void accept(Integer integer) throws Exception {                                  
                Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());              
                Log.d(TAG, "onNext: " + integer);                                                   
            }                                                                                       
        };                                                                                          
                                                                                                    
        observable.subscribe(consumer);                                                             
    }                                                                                                                                                                   
    

    在主线程中分别创建上游和下游, 然后将他们连接在一起, 同时分别打印出它们所在的线程, 运行结果为:

    D/TAG: Observable thread is : main
    D/TAG: emit 1                     
    D/TAG: Observer thread is :main   
    D/TAG: onNext: 1                  
    

    这就验证了刚才所说, 上下游默认是在同一个线程工作.

    这样肯定是满足不了我们的需求的, 我们更多想要的是这么一种情况, 在子线程中做耗时的操作, 然后回到主线程中来操作UI, 用图片来描述就是下面这个图片:

    thread.png

    在这个图中, 我们用黄色水管表示子线程, 深蓝色水管表示主线程.

    要达到这个目的, 我们需要先改变上游发送事件的线程, 让它去子线程中发送事件, 然后再改变下游的线程, 让它去主线程接收事件. 通过RxJava内置的线程调度器可以很轻松的做到这一点. 接下来看一段代码:

    @Override                                                                                       
    protected void onCreate(Bundle savedInstanceState) {                                            
        super.onCreate(savedInstanceState);                                                         
        setContentView(R.layout.activity_main);                                                     
                                                                                                    
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {   
            @Override                                                                               
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {            
                Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());           
                Log.d(TAG, "emit 1");                                                               
                emitter.onNext(1);                                                                  
            }                                                                                       
        });                                                                                         
                                                                                                    
        Consumer<Integer> consumer = new Consumer<Integer>() {                                      
            @Override                                                                               
            public void accept(Integer integer) throws Exception {                                  
                Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());              
                Log.d(TAG, "onNext: " + integer);                                                   
            }                                                                                       
        };                                                                                          
                                                                                                    
        observable.subscribeOn(Schedulers.newThread())                                              
                .observeOn(AndroidSchedulers.mainThread())                                          
                .subscribe(consumer);                                                               
    }                                                                                               
    

    还是刚才的例子, 只不过我们太添加了一点东西, 先来看看运行结果:

     D/TAG: Observable thread is : RxNewThreadScheduler-2  
     D/TAG: emit 1                                         
     D/TAG: Observer thread is :main                       
     D/TAG: onNext: 1                                      
    

    可以看到, 上游发送事件的线程的确改变了, 是在一个叫 RxNewThreadScheduler-2的线程中发送的事件, 而下游仍然在主线程中接收事件, 这说明我们的目的达成了, 接下来看看是如何做到的.

    和上一段代码相比,这段代码只不过是增加了两行代码:

    .subscribeOn(Schedulers.newThread())                                              
    .observeOn(AndroidSchedulers.mainThread())   
    

    作为一个初学者的入门教程, 并不会贴出一大堆源码来分析, 因此只需要让大家记住几个要点, 已达到如何正确的去使用这个目的才是我们的目标.

    简单的来说, subscribeOn() 指定的是上游发送事件的线程, observeOn() 指定的是下游接收事件的线程.

    多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.

    多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次.

    举个例子:

     observable.subscribeOn(Schedulers.newThread())     
             .subscribeOn(Schedulers.io())              
             .observeOn(AndroidSchedulers.mainThread()) 
             .observeOn(Schedulers.io())                
             .subscribe(consumer);                      
    

    这段代码中指定了两次上游发送事件的线程, 分别是newThread和IO线程, 下游也指定了两次线程,分别是main和IO线程. 运行结果为:

    D/TAG: Observable thread is : RxNewThreadScheduler-3
    D/TAG: emit 1                                       
    D/TAG: Observer thread is :RxCachedThreadScheduler-1
    D/TAG: onNext: 1                                    
    

    可以看到, 上游虽然指定了两次线程, 但只有第一次指定的有效, 依然是在RxNewThreadScheduler 线程中, 而下游则跑到了RxCachedThreadScheduler 中, 这个CacheThread其实就是IO线程池中的一个.

    为了更清晰的看到下游的线程切换过程, 我们加点log:

         observable.subscribeOn(Schedulers.newThread())
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "After observeOn(mainThread), current thread is: " + Thread.currentThread().getName());
                        }
                    })
                    .observeOn(Schedulers.io())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "After observeOn(io), current thread is : " + Thread.currentThread().getName());
                        }
                    })
                    .subscribe(consumer);                                                
    

    我们在下游线程切换之后, 把当前的线程打印出来, 运行结果:

    D/TAG: Observable thread is : RxNewThreadScheduler-1                                             
    D/TAG: emit 1                                                                                    
    D/TAG: After observeOn(mainThread), current thread is: main                                      
    D/TAG: After observeOn(io), current thread is : RxCachedThreadScheduler-2                        
    D/TAG: Observer thread is :RxCachedThreadScheduler-2                                             
    D/TAG: onNext: 1                                 
    

    可以看到, 每调用一次observeOn() 线程便会切换一次, 因此如果我们有类似的需求时, 便可知道如何处理了.

    在RxJava中, 已经内置了很多线程选项供我们选择, 例如有

    • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
    • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
    • Schedulers.newThread() 代表一个常规的新线程
    • AndroidSchedulers.mainThread() 代表Android的主线程

    这些内置的Scheduler已经足够满足我们开发的需求, 因此我们应该使用内置的这些选项, 在RxJava内部使用的是线程池来维护这些线程, 所有效率也比较高.

    实践

    对于我们Android开发人员来说, 经常会将一些耗时的操作放在后台, 比如网络请求或者读写文件,操作数据库等等,等到操作完成之后回到主线程去更新UI, 有了上面的这些基础, 那么现在我们就可以轻松的去做到这样一些操作.

    下面来举几个常用的场景.

    网络请求

    Android中有名的网络请求库就那么几个, Retrofit能够从中脱颖而出很大原因就是因为它支持RxJava的方式来调用, 下面简单讲解一下它的基本用法.

    要使用Retrofit,先添加Gradle配置:

        //retrofit
        compile 'com.squareup.retrofit2:retrofit:2.1.0'
        //Gson converter
        compile 'com.squareup.retrofit2:converter-gson:2.1.0'
        //RxJava2 Adapter
        compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
        //okhttp
        compile 'com.squareup.okhttp3:okhttp:3.4.1'
        compile 'com.squareup.okhttp3:logging-interceptor:3.4.1'
    

    随后定义Api接口:

    public interface Api {
        @GET
        Observable<LoginResponse> login(@Body LoginRequest request);
    
        @GET
        Observable<RegisterResponse> register(@Body RegisterRequest request);
    }
    

    接着创建一个Retrofit客户端:

    private static Retrofit create() {
                OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
                builder.readTimeout(10, TimeUnit.SECONDS);
                builder.connectTimeout(9, TimeUnit.SECONDS);
    
                if (BuildConfig.DEBUG) {
                    HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
                    interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
                    builder.addInterceptor(interceptor);
                }
    
                return new Retrofit.Builder().baseUrl(ENDPOINT)
                        .client(builder.build())
                        .addConverterFactory(GsonConverterFactory.create())
                        .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                        .build();
    }
    

    发起请求就很简单了:

            Api api = retrofit.create(Api.class);
            api.login(request)
                 .subscribeOn(Schedulers.io())               //在IO线程进行网络请求
                 .observeOn(AndroidSchedulers.mainThread())  //回到主线程去处理请求结果
                .subscribe(new Observer<LoginResponse>() {
                    @Override
                    public void onSubscribe(Disposable d) {}
    
                    @Override
                    public void onNext(LoginResponse value) {}
    
                    @Override
                    public void onError(Throwable e) {
                        Toast.makeText(mContext, "登录失败", Toast.LENGTH_SHORT).show();
                    }
    
                    @Override
                    public void onComplete() {
                        Toast.makeText(mContext, "登录成功", Toast.LENGTH_SHORT).show();
                    }
                });
    

    看似很完美, 但我们忽略了一点, 如果在请求的过程中Activity已经退出了, 这个时候如果回到主线程去更新UI, 那么APP肯定就崩溃了, 怎么办呢, 上一节我们说到了Disposable , 说它是个开关, 调用它的dispose()方法时就会切断水管, 使得下游收不到事件, 既然收不到事件, 那么也就不会再去更新UI了. 因此我们可以在Activity中将这个Disposable 保存起来, 当Activity退出时, 切断它即可.

    那如果有多个Disposable 该怎么办呢, RxJava中已经内置了一个容器CompositeDisposable, 每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的水管.

    读写数据库

    上面说了网络请求的例子, 接下来再看看读写数据库, 读写数据库也算一个耗时的操作, 因此我们也最好放在IO线程里去进行, 这个例子就比较简单, 直接上代码:

    public Observable<List<Record>> readAllRecords() {
            return Observable.create(new ObservableOnSubscribe<List<Record>>() {
                @Override
                public void subscribe(ObservableEmitter<List<Record>> emitter) throws Exception {
                    Cursor cursor = null;
                    try {
                        cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{});
                        List<Record> result = new ArrayList<>();
                        while (cursor.moveToNext()) {
                            result.add(Db.Record.read(cursor));
                        }
                        emitter.onNext(result);
                        emitter.onComplete();
                    } finally {
                        if (cursor != null) {
                            cursor.close();
                        }
                    }
                }
            }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
        }
    

    好了本次的教程就到这里吧, 后面的教程将会教大家如何使用RxJava中强大的操作符. 通过使用这些操作符可以很轻松的做到各种吊炸天的效果. 敬请期待.

    相关文章

      网友评论

      • 光羽隼:牛批
      • 永远改不完的bug的Coder:2018.10.16 第二篇打卡~
      • 鲨鱼尤德曼:如果不是用链式.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()) ,而是用observable.subscribeOn(Schedulers.newThread());
        observable.observeOn(AndroidSchedulers.mainThread()) ;
        线程都是主线程
      • 罗密欧与猪过夜126:写得不错,看了好几遍了!!!:sunglasses:
      • doudousha:写的很通俗易懂!看了就是豁然开朗的感觉!
      • sakurekid:好评
      • whd_Alive:写的讲真通俗易懂,看了一堆教程,要么是出现一堆名词,要么大段大段分析源码,楼主的这个系列真是很适合我这种小白学习,多谢
      • 断金岁月:楼主 那个CompositeDisposable是内置的 有什么方法能得到这个容器啊 你提了一下 也没说怎么获取
        Season_zlc:直接new一个对象就行了呀:joy:
      • 杨晓是大V:写的真的很赞
      • Mubly:定义接口和创建客户端的时候,你少贴了东西啊!对于小白的我……懵逼了
        慎独_337c:米兔 直接懵逼了
        Jay_Lwp:定义四个空的类LoginRequest等等...
      • e1ddbd1bfac4:这个教程应该被定为 中文官方教程,写的太好了
      • fulai_xy:请问doOnNext是运行在上游还是下游的??
        e67494fc13df:肯定是下游啊
        BugFree张瑞:下游啊,该方法用来在消费者真正处理事件之前做一下其他处理
      • b8a354b32d9e:CompositeDisposable从哪里获取呢?
        吕氏春秋i:直接new CompositeDisposable mDisposable = new CompositeDisposable();
        mDisposable.add( do something... );
      • 聆世冷暖:很有帮助
      • 千江明月:很容易懂,写的不错
      • 小新哥的大梦想:楼主大大,身边好多朋友推荐你这个RxJava2系列入门,真的是循序渐进,很容易理解,非常感谢!!!楼主有没有打算把RxJava2+Retrofit2+Okhttp3+Mvp写个项目开源下呢,想学习下你的封装思想。。:relaxed:
        小新哥的大梦想:最近打算先看完你这个系列再做个总结,楼主有微信公众号吗?
      • nyzs_n9001:其实我更想知道,retrofit和Rxjava一起用了之后,不关注retrofit请求到的返回数据吗?只在onComplete()和onError()里就能知道是否"登录成功"吗?不是服务器返回 成功码 才成功登录吗?
        2746e6b709f0:作者这只是模拟情景吧,这个toast只是例子而已,真正的登陆逻辑当然得你自己通过返回数据来判断了
      • b88be24d43a8:声明接口
        public interface Api {
        @post
        Observable<LoginResponse> login(@Body LoginRequest request);

        @post
        Observable<RegisterResponse> register(@Body RegisterRequest request);
        }

        调用方法
        api.login(new LoginRequest())
        .subscribeOn(Schedulers.io()) //在IO线程进行网络请求
        .observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求结果
        .subscribe(new Observer<LoginResponse>() {

        @Override
        public void onSubscribe(Disposable d) {
        mCd = new CompositeDisposable();
        mCd.add(d);
        }

        @Override
        public void onNext(LoginResponse value) {
        }

        @Override
        public void onError(Throwable e) {
        Toast.makeText(LoginActivity.this, "登录失败", Toast.LENGTH_SHORT).show();
        }

        @Override
        public void onComplete() {
        Toast.makeText(LoginActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
        }
        });

        break;
        }

        报错
        java.lang.IllegalArgumentException: Missing either @post URL or @URL parameter.
        for method Api.login
        为什么@Body 报错 , @Url就可以呢
        nyzs_n9001:其实我更想知道,retrofit和Rxjava一起用了之后,不关注retrofit请求到的返回数据吗?只在onComplete()和onError()里就能知道是否"登录成功"吗?不是服务器返回 成功码 才成功登录吗?
      • 23c26d4259b4:谢谢作者的有心编辑,真的是看过最好的RxJava系列的文章,很用心,谢谢你!
      • c61089d34a4c:请问
        @get
        Observable<RegisterResponse> register(@Body RegisterRequest request);
        里的RegisterResponse 是什么
      • 4b24b4c0fd7e:挺不错的,作者对RxJava线程调度原理进行了浅显易懂的讲解。
      • x耶律:我忍不住的给你一个赞:smiley:
      • ec2612df5cb8:感谢楼主分享:heart:
      • 我是龙俊:写的很好啊 循序渐进 楼主棒棒哒💯:+1: :+1: :+1:
      • Carson带你学安卓:请问一下,AndroidSchedulers调度器不是属于RxAndroid的么?能来个人解答下么
        叨码:是
      • 9d13dac0b326:教程一960个喜欢了,教程二就339个了,说明坚持下来的人少了,我就要坚持跟着您老人家学习下去,在此膜拜大佬!!!
        winelx:因为直接关注作者那还用一篇篇点,很麻烦的\(^o^)/~
      • 安久哲丶_1ab2:支持大神,写的很不错!
      • 被时光移动的城:看完文章,想给你你生猴子
      • smile_30ba:因为数据格式是固定的,我把网络请求封装在在BaseActivity里了,网络请求时CompositeDisposable.add(),在baseActivity的ondestory()方法时CompositeDisposable.clear() ,这样会不会有问题,比如后面打开的页面关闭,会把前面页面还没请求完的也给关掉了:pray:
        Season_zlc:不会
      • 七七就是七七:楼主你有试过demo吗,floatMap加了delay运行,不会打印出任何东西,还有就是不加delay的时候flatMap打印出来也并不是无序的
      • bf2770eeec88:不错不错,收藏了。

        推荐下,分库分表中间件 Sharding-JDBC 源码解析 17 篇:http://www.yunai.me/categories/Sharding-JDBC/?jianshu&401


      • bd180dab6513:你写的文章深入浅出,丝丝入扣,难得好文,楼主有公众号没有?
      • 安卓猿:建议楼主把这个Rx系列的 所有 链接全部贴在 文章 头部
      • 顶级工程师闯天涯:非常棒,发个评论支持一下...
      • Suppender:写的非常好,是我看的最好理解的Rxjava.
        Season_zlc:@Suppender 过奖了:sunglasses:
      • 6520af87e802:楼主 我是小菜鸟一枚 这是我看到关于Rxjava讲解最通俗易懂的文章,不知道楼主是否能看见我的评论,如果看到的话请加下我QQ 995472572 有关Rxjava方面的知识想请教下你,谢谢!
      • 918772ccfba1:吊炸天!我只能用这个词语来形容了
      • bfe22fb00f77:“多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.”
        关于这一段的表述有一点的不恰当的地方,多次调用subscribeOn都是有效的。只是最终会取决于顶层的那个subscribeOn。
        在两个相邻的subscribeOn中间插入一个doOnSubscribe输入当前线程就可以测试出来。
        两个subscribeOn的使用,常用于网络请求,一个用于主线程显示loading,一个是从主线程切换回异步做网络请求。
      • X1a0Yu_:这一篇讲的没有第一篇细,但是还是谢谢分享
      • FF4081颜色的大野牛:写的太好了👍
      • Wish_xy:膜拜一下作者大大,写的很棒,简洁清晰,很容易理解学习!
      • 吃瓜小码农:看到目前为止,最通俗易懂,最详细的RxJava的教程,继续拜读
      • 仁昌居士:还是刚才的例子, 只不过我们太添加了一点东西, 先来看看运行结果: 多了个“太”字。

        还有请教大佬一下,我们可以在Activity中将这个Disposable 保存起来, 当Activity退出时, 切断它即可。能不能代码举例一下,如何保存的。
        谢谢:smile:
      • 永远向着诗和远方:强烈推荐楼主在文章结尾加一下上一篇和下一篇的链接
      • code_solve:这个博文的风格看起来真舒服。。。
      • laksg2009:Retrofit完全没用过,这篇就这里看不懂.其他都能明白
      • 我只需要:楼主很威武,太牛掰了
      • 蚂蚁上树卡:compositesubscription 写错了,应该是这个
      • 蚂蚁上树卡:CompositeSubscribtion的使用在哪里呢?请问作者
      • 27efec53a72d:谢谢作者!我会转发给给多人来学习
      • 391c7ed5c5e5:恩 写的很详细 挺:+1:
      • aca16273a847:“那如果有多个Disposable 该怎么办呢, RxJava中已经内置了一个容器CompositeDisposable, 每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的水管.”

        这一段没有讲的太清楚,我看了下源码。 add/clear皆不是静态方法。你的意思是Activity里面创建一个CompositeDisposable(容器功能的)对象,将发起网络请求的下游回调引用都保留在CompositeDisposable对象中, 在activity的生命周期 destory时候调用CompositeDisposable对象的clear方法?

      • memorycjj:现在最新添加adapter的方式 compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0' ,额外不需要添加retrofit,okhttp的支持,自己添加下converter就可以了,麻烦楼主更新下,文章写的很棒 。
      • 有没有口罩给我一个:其实rxjava中线程的切换如果flatmap和observeer 同一个线程的话它会这样执行,flatmap→observer直到flatmap结束为止;如果不是同一个线程的话,会先把flatmap的事件全部执行完在执行observer,我这样理解对吗?
        有没有口罩给我一个: @ThirKing 我明白了
      • 82d047e884b2: .subscribeOn().observeOn()同时用,运行后没反应,只用subscribeOn(),emitter,onnext都在线程池执行;如果只用observeOn(),则emitter执行在主线程,onnext执行在线程池,是因为我用的最新版本(2.1.0)的原因?
      • wscjy:作者内容确实讲得很好,让初学者看得舒服,就是本文开头用Consumer有点不知所云
        nyzs_n9001:第一篇里结尾的时候有讲到:smile:
      • 林宥之:Retrofit本身已经做了线程切换了吧?实际应用中使用Retrofit+RxJava还需要subscribeOn(Schedulers.io()) 吗?
      • 倔强的炉包:膜拜大神
      • Gunter1993:相当给力的Rj2教程,就是标题起了一二三什么的,不太方便今后查阅
      • 良哥哥好帅:mark 楼主写得相当棒,受益匪浅
      • d3552222d151:强势围观
      • FynnJason:对观察者和被观察者一直理解比较懵。比如警察和小偷,灯泡和开关,上游和下游,他们到底谁是观察者,谁是被观察者呢?请作者指点一下。谢谢:relaxed:
        Season_zlc:这个问题可以从单词意思来看, Observeable这个单词的意思是可观察的, Observer单词的意思是观察者, 关系就很明确了
      • 68ed210f942b:Observable.create(new ObservableOnSubscribe<String>() {
        @Override

        public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("hello");
        System.out.println(Thread.currentThread().getName());
        }
        }).subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Consumer<String>() {
        @Override
        public void accept(String mS) throws Exception {
        System.out.println(mS);
        System.out.println(Thread.currentThread().getName());
        }
        });
        大佬为什么这个什么都不输出
        68ed210f942b: @Season_zlc 嗯啊,对的谢谢
        Season_zlc:你是在Java 程序里跑的吧, 在主线程里加上Thread.sleep()延时, 否则主线程立马就结束了所以不会看到输出.
      • 流星留步:网络请求未完成时候,退出当前activity并未发生崩溃,而是发生的内存泄漏,activity没有被释放掉,onComplete里面的代码正常执行了,我的好像是这样的,和博主的有点差别啊?
        流星留步:产生了内存泄漏之后,我在退出Activity onDestroy中调用Disposable进行切断水管,但是切断水果之后就报错了。
        05-03 11:04:20.571 24773-25101/com.majian.rxjavam E/AndroidRuntime: FATAL EXCEPTION: RxNewThreadScheduler-1
        Process: com.majian.rxjavam, PID: 24773
        io.reactivex.exceptions.UndeliverableException: java.lang.InterruptedException
      • lzhHappy:API 接口里面的 @get(或者其它请求方式)必须要有 Url 地址,所传的字符串不能为空,否则会异常提示,baseUrl 后面没有地址的话可以加个/或者.字符串。
        @get 请求方式是没法使用@Body的
        741a982bae87:"如果在请求的过程中Activity已经退出了, 这个时候如果回到主线程去更新UI, 那么APP肯定就崩溃了" 这是什么原因?
        Veken_Fly:把baseurl后面的参数截取一段,就像我们公司,上传图片地址跟普通网络请求的地址不一样,所以只取前面一部分的,后面的用常量保存,在get和post里面再引用
        b88be24d43a8:@get 和@post 都用不了@Body , 不知道为啥
      • 译森:为什么不能单独使用doOnNext(),一定在结尾使用subscribe()?
        有没有口罩给我一个: @译森 因为doOnNext()是这样的:在onNext()之前执行
      • 一起睡个觉:RxJava执行完成之后 会自动取消订阅吗
        Season_zlc: 不会
      • 我才是张雷:确实,看完后想跳转下一章或者上一章不方便,不过不妨碍文章的精彩,感谢
      • FredKang:CompositeSubscription和CompositeDisposable感觉差不多
      • 旧人旧城旧心:看了很多rxjava的文章,你这篇讲解的很好!不像一些文章从头到尾一下就讲解完了,容易让人晕头晕脑。你这分开讲解,很让人舒服!! 支持一下!!!
        d4cc34632821:@要啥自行车c 愣头青,莽夫!这位同学是真,滴,皮!
        cc25b6aa133b:@小呢个李 我反手就是一个大招
        73ece15c815f:这就很舒服了,哈哈。。莫名跑偏
      • 暴躁的心:真心写的不错,作为一个小白我都可以理解了。
      • X1a0Yu_:好文章,楼主厉害:fist:
      • BillBian:一直感觉事件流向图,会觉得是从3-1,而不是1-3。是不是我理解的有问题:fearful:
        BillBian:@He埋心 怎样理解的?
        BillBian:@He埋心 好吧:stuck_out_tongue_winking_eye:
      • Xdjm:这章写的非常好:+1:
      • zhangdong0921:为什么我用你的标签定义接口的请求方法报错呢
        Multiple @body method annotations found.
        然后改成了
        @get("appstore/history/query?")
        Observable<HistoryInfo> gethis(@Query("key") String key, @Query("day") String day);
        这样就可以请求了
        zhangdong0921:@岁月无痕灬灬 base URL是自己在shareSDK上面拿个公共接口,都可以申请的,
        我的是 http://apicloud.mob.com/appstore/history/query appkey:1babdcc496ce8
        岁月无痕灬灬:@zhangdong0921 你知道baseurl么
      • aee30df11e01:写的很好
      • 77324d9b47bb:史上最好的rxjava2文章,没有之一
      • 左手木亽:一直以为多次obsereron切换也只有一次效果。😁😁
      • 勇敢的少年啊:思路清晰明澈👍
      • efe81c2fc5c4:好像 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的水管.这里写错了,应该是CompositeDisposable.dispose();,当然我没验证,错了不要打我:smile:
        sun_yangfei:@Season_zlc CompositeDisposable.dispose(); 和clar() 两个有什么区别?
        sun_yangfei:CompositeDisposable.dispose(); 和clar() 两个有什么区别?
        Season_zlc:很不幸你错了:v:
      • 白色乌鸦_160e:谢谢分享
      • YMindar:建议大神给这个系列做个侧边导航的条目,方便直接切换
      • Small_Cake:同样的请求连接为什么Volley就可以,Retrofit就报签名失败呢。作者遇到过吗?
        Small_Cake:
        interface RetrofitInteface {
        @post("doLogin")
        Observable<JSONObject> login(@QueryMap TreeMap<String,String> map);
        }
      • 4dd273fef994:学习到了,rxjava2 感觉和rxjava1 有很大区别啊。老记不清 observable ,observe,Subscriber还有一些类似的东东怎么破啊
        MigrationUK: @4dd273fef994 还有ObservableOnSubscribe
      • 67c15ed34d3c:楼主,代码看起来只有一半呐,有些函数的参数看不全
      • 927fd5b2da21::joy: 我能转载吗?刚好想学Rx
        Season_zlc: @Jelansty 转转转,不转不是中国人😂
      • 无极小屋:教程1的地址呢
        无极小屋: @Season_zlc 就不点😂
        Season_zlc: @角落里的箱 点击头像进主页😂
      • 南大:图文并茂,言简意赅,受益匪浅,谢谢您。
      • yangzteL:感谢楼主
      • fulai_xy:很好
      • fa16e00eeb9c:楼主牛,支持楼主
      • 欢乐的乐:最近项目在用Android Clean架构,里面有用到RxJava2,看了你的文章收益良多,以前不明白架构里面RestApi是干嘛的,看了例子内心大喊一声soga~~~。真的很谢谢你,今天下午接着看下去,嘻嘻。
        yangzteL:之前在找Rxjava 2 的文章,今天才发现我想找的在这里,感谢博主
        Season_zlc: @欢乐的乐 不客气😀
      • 巴黎没有摩天轮Li:这是继扔物线以后,我见过最好的文章了!赞!!!!!没毛病!!!!
        Season_zlc: @巴黎没有摩天轮Li 😄
      • 蜕变_99d2:建议楼主在文章结尾加一下上一篇和下一篇的链接,看起来方便:grin:
        kee_zlg:关注作者账号,里面可以看到很多东西
        aa1bfec397db:支持
        George吴逸云:哥们这个建议很好,楼主加上上下篇的连接吧,文章写得这么好,一定会有很多人来拜读的:smile:
      • 9146a69e7d04:最后数据库操作是不是没有subscribe ?
        Holly_dd20:@jblz625同学说得对
        jblz625:最后读取数据库的例子 ,方法返回体是一个Observable对象,subscribe 方法可以在 调用方法拿到返回体之后再调用吧.
        nyzs_n9001:看的好仔细,给你一个么么哒:kissing_heart:
      • d8184ca3c970:多谢楼主的文章,希望楼主建个rxjava2的群,这样方便大家交流,随时好请教大家。
      • itchenqi188:很好 ,拜读了

      本文标题:给初学者的RxJava2.0教程(二)

      本文链接:https://www.haomeiwen.com/subject/gdqlmttx.html