给初学者的RxJava2.0教程(二)

作者: Season_zlc | 来源:发表于2016-12-06 11:16 被阅读63367次

Outline

[TOC]

前言

上一节教程讲解了最基本的RxJava2的使用, 在本节中, 我们将学习RxJava强大的线程控制.

正题

还是以之前的例子, 两根水管:

RxJava

正常情况下, 上游和下游是工作在同一个线程中的, 也就是说上游在哪个线程发事件, 下游就在哪个线程接收事件.

怎么去理解呢, 以Android为例, 一个Activity的所有动作默认都是在主线程中运行的, 比如我们在onCreate中打出当前线程的名字:

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Log.d(TAG, Thread.currentThread().getName());
    }

结果便是:

D/TAG: main

回到RxJava中, 当我们在主线程中去创建一个上游Observable来发送事件, 则这个上游默认就在主线程发送事件.

当我们在主线程去创建一个下游Observer来接收事件, 则这个下游默认就在主线程中接收事件, 来看段代码:

@Override                                                                                       
protected void onCreate(Bundle savedInstanceState) {                                            
    super.onCreate(savedInstanceState);                                                         
    setContentView(R.layout.activity_main);                                                     
                                                                          
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {   
        @Override                                                                               
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {            
            Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());           
            Log.d(TAG, "emit 1");                                                               
            emitter.onNext(1);                                                                  
        }                                                                                       
    });                                                                                         
                                                                                                
    Consumer<Integer> consumer = new Consumer<Integer>() {                                      
        @Override                                                                               
        public void accept(Integer integer) throws Exception {                                  
            Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());              
            Log.d(TAG, "onNext: " + integer);                                                   
        }                                                                                       
    };                                                                                          
                                                                                                
    observable.subscribe(consumer);                                                             
}                                                                                                                                                                   

在主线程中分别创建上游和下游, 然后将他们连接在一起, 同时分别打印出它们所在的线程, 运行结果为:

D/TAG: Observable thread is : main
D/TAG: emit 1                     
D/TAG: Observer thread is :main   
D/TAG: onNext: 1                  

这就验证了刚才所说, 上下游默认是在同一个线程工作.

这样肯定是满足不了我们的需求的, 我们更多想要的是这么一种情况, 在子线程中做耗时的操作, 然后回到主线程中来操作UI, 用图片来描述就是下面这个图片:

thread.png

在这个图中, 我们用黄色水管表示子线程, 深蓝色水管表示主线程.

要达到这个目的, 我们需要先改变上游发送事件的线程, 让它去子线程中发送事件, 然后再改变下游的线程, 让它去主线程接收事件. 通过RxJava内置的线程调度器可以很轻松的做到这一点. 接下来看一段代码:

@Override                                                                                       
protected void onCreate(Bundle savedInstanceState) {                                            
    super.onCreate(savedInstanceState);                                                         
    setContentView(R.layout.activity_main);                                                     
                                                                                                
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {   
        @Override                                                                               
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {            
            Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());           
            Log.d(TAG, "emit 1");                                                               
            emitter.onNext(1);                                                                  
        }                                                                                       
    });                                                                                         
                                                                                                
    Consumer<Integer> consumer = new Consumer<Integer>() {                                      
        @Override                                                                               
        public void accept(Integer integer) throws Exception {                                  
            Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());              
            Log.d(TAG, "onNext: " + integer);                                                   
        }                                                                                       
    };                                                                                          
                                                                                                
    observable.subscribeOn(Schedulers.newThread())                                              
            .observeOn(AndroidSchedulers.mainThread())                                          
            .subscribe(consumer);                                                               
}                                                                                               

还是刚才的例子, 只不过我们太添加了一点东西, 先来看看运行结果:

 D/TAG: Observable thread is : RxNewThreadScheduler-2  
 D/TAG: emit 1                                         
 D/TAG: Observer thread is :main                       
 D/TAG: onNext: 1                                      

可以看到, 上游发送事件的线程的确改变了, 是在一个叫 RxNewThreadScheduler-2的线程中发送的事件, 而下游仍然在主线程中接收事件, 这说明我们的目的达成了, 接下来看看是如何做到的.

和上一段代码相比,这段代码只不过是增加了两行代码:

.subscribeOn(Schedulers.newThread())                                              
.observeOn(AndroidSchedulers.mainThread())   

作为一个初学者的入门教程, 并不会贴出一大堆源码来分析, 因此只需要让大家记住几个要点, 已达到如何正确的去使用这个目的才是我们的目标.

简单的来说, subscribeOn() 指定的是上游发送事件的线程, observeOn() 指定的是下游接收事件的线程.

多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.

多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次.

举个例子:

 observable.subscribeOn(Schedulers.newThread())     
         .subscribeOn(Schedulers.io())              
         .observeOn(AndroidSchedulers.mainThread()) 
         .observeOn(Schedulers.io())                
         .subscribe(consumer);                      

这段代码中指定了两次上游发送事件的线程, 分别是newThread和IO线程, 下游也指定了两次线程,分别是main和IO线程. 运行结果为:

D/TAG: Observable thread is : RxNewThreadScheduler-3
D/TAG: emit 1                                       
D/TAG: Observer thread is :RxCachedThreadScheduler-1
D/TAG: onNext: 1                                    

可以看到, 上游虽然指定了两次线程, 但只有第一次指定的有效, 依然是在RxNewThreadScheduler 线程中, 而下游则跑到了RxCachedThreadScheduler 中, 这个CacheThread其实就是IO线程池中的一个.

为了更清晰的看到下游的线程切换过程, 我们加点log:

     observable.subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "After observeOn(mainThread), current thread is: " + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "After observeOn(io), current thread is : " + Thread.currentThread().getName());
                    }
                })
                .subscribe(consumer);                                                

我们在下游线程切换之后, 把当前的线程打印出来, 运行结果:

D/TAG: Observable thread is : RxNewThreadScheduler-1                                             
D/TAG: emit 1                                                                                    
D/TAG: After observeOn(mainThread), current thread is: main                                      
D/TAG: After observeOn(io), current thread is : RxCachedThreadScheduler-2                        
D/TAG: Observer thread is :RxCachedThreadScheduler-2                                             
D/TAG: onNext: 1                                 

可以看到, 每调用一次observeOn() 线程便会切换一次, 因此如果我们有类似的需求时, 便可知道如何处理了.

在RxJava中, 已经内置了很多线程选项供我们选择, 例如有

  • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
  • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
  • Schedulers.newThread() 代表一个常规的新线程
  • AndroidSchedulers.mainThread() 代表Android的主线程

这些内置的Scheduler已经足够满足我们开发的需求, 因此我们应该使用内置的这些选项, 在RxJava内部使用的是线程池来维护这些线程, 所有效率也比较高.

实践

对于我们Android开发人员来说, 经常会将一些耗时的操作放在后台, 比如网络请求或者读写文件,操作数据库等等,等到操作完成之后回到主线程去更新UI, 有了上面的这些基础, 那么现在我们就可以轻松的去做到这样一些操作.

下面来举几个常用的场景.

网络请求

Android中有名的网络请求库就那么几个, Retrofit能够从中脱颖而出很大原因就是因为它支持RxJava的方式来调用, 下面简单讲解一下它的基本用法.

要使用Retrofit,先添加Gradle配置:

    //retrofit
    compile 'com.squareup.retrofit2:retrofit:2.1.0'
    //Gson converter
    compile 'com.squareup.retrofit2:converter-gson:2.1.0'
    //RxJava2 Adapter
    compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
    //okhttp
    compile 'com.squareup.okhttp3:okhttp:3.4.1'
    compile 'com.squareup.okhttp3:logging-interceptor:3.4.1'

随后定义Api接口:

public interface Api {
    @GET
    Observable<LoginResponse> login(@Body LoginRequest request);

    @GET
    Observable<RegisterResponse> register(@Body RegisterRequest request);
}

接着创建一个Retrofit客户端:

private static Retrofit create() {
            OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
            builder.readTimeout(10, TimeUnit.SECONDS);
            builder.connectTimeout(9, TimeUnit.SECONDS);

            if (BuildConfig.DEBUG) {
                HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
                interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
                builder.addInterceptor(interceptor);
            }

            return new Retrofit.Builder().baseUrl(ENDPOINT)
                    .client(builder.build())
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .build();
}

发起请求就很简单了:

        Api api = retrofit.create(Api.class);
        api.login(request)
             .subscribeOn(Schedulers.io())               //在IO线程进行网络请求
             .observeOn(AndroidSchedulers.mainThread())  //回到主线程去处理请求结果
            .subscribe(new Observer<LoginResponse>() {
                @Override
                public void onSubscribe(Disposable d) {}

                @Override
                public void onNext(LoginResponse value) {}

                @Override
                public void onError(Throwable e) {
                    Toast.makeText(mContext, "登录失败", Toast.LENGTH_SHORT).show();
                }

                @Override
                public void onComplete() {
                    Toast.makeText(mContext, "登录成功", Toast.LENGTH_SHORT).show();
                }
            });

看似很完美, 但我们忽略了一点, 如果在请求的过程中Activity已经退出了, 这个时候如果回到主线程去更新UI, 那么APP肯定就崩溃了, 怎么办呢, 上一节我们说到了Disposable , 说它是个开关, 调用它的dispose()方法时就会切断水管, 使得下游收不到事件, 既然收不到事件, 那么也就不会再去更新UI了. 因此我们可以在Activity中将这个Disposable 保存起来, 当Activity退出时, 切断它即可.

那如果有多个Disposable 该怎么办呢, RxJava中已经内置了一个容器CompositeDisposable, 每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的水管.

读写数据库

上面说了网络请求的例子, 接下来再看看读写数据库, 读写数据库也算一个耗时的操作, 因此我们也最好放在IO线程里去进行, 这个例子就比较简单, 直接上代码:

public Observable<List<Record>> readAllRecords() {
        return Observable.create(new ObservableOnSubscribe<List<Record>>() {
            @Override
            public void subscribe(ObservableEmitter<List<Record>> emitter) throws Exception {
                Cursor cursor = null;
                try {
                    cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{});
                    List<Record> result = new ArrayList<>();
                    while (cursor.moveToNext()) {
                        result.add(Db.Record.read(cursor));
                    }
                    emitter.onNext(result);
                    emitter.onComplete();
                } finally {
                    if (cursor != null) {
                        cursor.close();
                    }
                }
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }

好了本次的教程就到这里吧, 后面的教程将会教大家如何使用RxJava中强大的操作符. 通过使用这些操作符可以很轻松的做到各种吊炸天的效果. 敬请期待.

相关文章

  • RXjave总结

    文章 给初学者的RxJava2.0教程(一)给初学者的RxJava2.0教程(二)

  • RxJava

    教程 给初学者的RxJava2.0教程(一) 给初学者的RxJava2.0教程(二) 给初学者的RxJava2.0...

  • RxJava2.0的使用

    这里的讲解比较简单,易懂 给初学者的RxJava2.0教程(一) :基本工作原理给初学者的RxJava2.0教程(...

  • rx - 收藏集 - 掘金

    给初学者的 RxJava2.0 教程 (二) - Android - 掘金作者博客 http://www.jian...

  • Rxjava2

    Season_zl给初学者的RxJava2.0教程 ObservableEmitter emitter 1....

  • RxJava整理

    给初学者的RxJava2.0教程 ObservableEmitter 上游可以发送无限个onNext, 下游也可以...

  • RxJava 3.x系列(一)简介及其Observable订阅

    优先参考:给初学者的RxJava2.0教程 1-10系列 RxJava 3.x系列(二)操作符 & 背压(Back...

  • test RxJava

    参考自: 给初学者的RxJava2.0教程(一) http://www.jianshu.com/p/464fa02...

  • Rxjava介绍<1>

    Rxjava github地址给初学者的RxJava2.0教程------水管系列手把手教你使用 RxJava 2...

  • Android

    大佬们的传送门: Season_zlc RxJava2 : 1.给初学者的RxJava2.0教程(一)

网友评论

  • 光羽隼:牛批
  • 永远改不完的bug的Coder:2018.10.16 第二篇打卡~
  • 鲨鱼尤德曼:如果不是用链式.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()) ,而是用observable.subscribeOn(Schedulers.newThread());
    observable.observeOn(AndroidSchedulers.mainThread()) ;
    线程都是主线程
  • 罗密欧与猪过夜126:写得不错,看了好几遍了!!!:sunglasses:
  • doudousha:写的很通俗易懂!看了就是豁然开朗的感觉!
  • sakurekid:好评
  • whd_Alive:写的讲真通俗易懂,看了一堆教程,要么是出现一堆名词,要么大段大段分析源码,楼主的这个系列真是很适合我这种小白学习,多谢
  • 断金岁月:楼主 那个CompositeDisposable是内置的 有什么方法能得到这个容器啊 你提了一下 也没说怎么获取
    Season_zlc:直接new一个对象就行了呀:joy:
  • 杨晓是大V:写的真的很赞
  • a18274eec0a2:定义接口和创建客户端的时候,你少贴了东西啊!对于小白的我……懵逼了
    26992c6cd4c4:米兔 直接懵逼了
    Jay_Lwp:定义四个空的类LoginRequest等等...
  • e1ddbd1bfac4:这个教程应该被定为 中文官方教程,写的太好了
  • fulai_xy:请问doOnNext是运行在上游还是下游的??
    e67494fc13df:肯定是下游啊
    BugFree张瑞:下游啊,该方法用来在消费者真正处理事件之前做一下其他处理
  • b8a354b32d9e:CompositeDisposable从哪里获取呢?
    吕氏春秋i:直接new CompositeDisposable mDisposable = new CompositeDisposable();
    mDisposable.add( do something... );
  • 聆世冷暖:很有帮助
  • 千江明月:很容易懂,写的不错
  • 小新哥的大梦想:楼主大大,身边好多朋友推荐你这个RxJava2系列入门,真的是循序渐进,很容易理解,非常感谢!!!楼主有没有打算把RxJava2+Retrofit2+Okhttp3+Mvp写个项目开源下呢,想学习下你的封装思想。。:relaxed:
    小新哥的大梦想:最近打算先看完你这个系列再做个总结,楼主有微信公众号吗?
  • nyzs_n9001:其实我更想知道,retrofit和Rxjava一起用了之后,不关注retrofit请求到的返回数据吗?只在onComplete()和onError()里就能知道是否"登录成功"吗?不是服务器返回 成功码 才成功登录吗?
    2746e6b709f0:作者这只是模拟情景吧,这个toast只是例子而已,真正的登陆逻辑当然得你自己通过返回数据来判断了
  • b88be24d43a8:声明接口
    public interface Api {
    @post
    Observable<LoginResponse> login(@Body LoginRequest request);

    @post
    Observable<RegisterResponse> register(@Body RegisterRequest request);
    }

    调用方法
    api.login(new LoginRequest())
    .subscribeOn(Schedulers.io()) //在IO线程进行网络请求
    .observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求结果
    .subscribe(new Observer<LoginResponse>() {

    @Override
    public void onSubscribe(Disposable d) {
    mCd = new CompositeDisposable();
    mCd.add(d);
    }

    @Override
    public void onNext(LoginResponse value) {
    }

    @Override
    public void onError(Throwable e) {
    Toast.makeText(LoginActivity.this, "登录失败", Toast.LENGTH_SHORT).show();
    }

    @Override
    public void onComplete() {
    Toast.makeText(LoginActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
    }
    });

    break;
    }

    报错
    java.lang.IllegalArgumentException: Missing either @post URL or @URL parameter.
    for method Api.login
    为什么@Body 报错 , @Url就可以呢
    nyzs_n9001:其实我更想知道,retrofit和Rxjava一起用了之后,不关注retrofit请求到的返回数据吗?只在onComplete()和onError()里就能知道是否"登录成功"吗?不是服务器返回 成功码 才成功登录吗?
  • 23c26d4259b4:谢谢作者的有心编辑,真的是看过最好的RxJava系列的文章,很用心,谢谢你!
  • c61089d34a4c:请问
    @get
    Observable<RegisterResponse> register(@Body RegisterRequest request);
    里的RegisterResponse 是什么
  • 4b24b4c0fd7e:挺不错的,作者对RxJava线程调度原理进行了浅显易懂的讲解。
  • x耶律:我忍不住的给你一个赞:smiley:
  • ec2612df5cb8:感谢楼主分享:heart:
  • 我是龙俊:写的很好啊 循序渐进 楼主棒棒哒💯:+1: :+1: :+1:
  • Carson带你学安卓:请问一下,AndroidSchedulers调度器不是属于RxAndroid的么?能来个人解答下么
    叨码:是
  • 9d13dac0b326:教程一960个喜欢了,教程二就339个了,说明坚持下来的人少了,我就要坚持跟着您老人家学习下去,在此膜拜大佬!!!
    winelx:因为直接关注作者那还用一篇篇点,很麻烦的\(^o^)/~
  • 安久哲丶_1ab2:支持大神,写的很不错!
  • 被时光移动的城:看完文章,想给你你生猴子
  • smile_30ba:因为数据格式是固定的,我把网络请求封装在在BaseActivity里了,网络请求时CompositeDisposable.add(),在baseActivity的ondestory()方法时CompositeDisposable.clear() ,这样会不会有问题,比如后面打开的页面关闭,会把前面页面还没请求完的也给关掉了:pray:
    Season_zlc:不会
  • 七七就是七七:楼主你有试过demo吗,floatMap加了delay运行,不会打印出任何东西,还有就是不加delay的时候flatMap打印出来也并不是无序的
  • bf2770eeec88:不错不错,收藏了。

    推荐下,分库分表中间件 Sharding-JDBC 源码解析 17 篇:http://www.yunai.me/categories/Sharding-JDBC/?jianshu&401


  • bd180dab6513:你写的文章深入浅出,丝丝入扣,难得好文,楼主有公众号没有?
  • 安卓猿:建议楼主把这个Rx系列的 所有 链接全部贴在 文章 头部
  • 顶级工程师闯天涯:非常棒,发个评论支持一下...
  • Suppender:写的非常好,是我看的最好理解的Rxjava.
    Season_zlc:@Suppender 过奖了:sunglasses:
  • 6520af87e802:楼主 我是小菜鸟一枚 这是我看到关于Rxjava讲解最通俗易懂的文章,不知道楼主是否能看见我的评论,如果看到的话请加下我QQ 995472572 有关Rxjava方面的知识想请教下你,谢谢!
  • 918772ccfba1:吊炸天!我只能用这个词语来形容了
  • bfe22fb00f77:“多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.”
    关于这一段的表述有一点的不恰当的地方,多次调用subscribeOn都是有效的。只是最终会取决于顶层的那个subscribeOn。
    在两个相邻的subscribeOn中间插入一个doOnSubscribe输入当前线程就可以测试出来。
    两个subscribeOn的使用,常用于网络请求,一个用于主线程显示loading,一个是从主线程切换回异步做网络请求。
  • X1a0Yu_:这一篇讲的没有第一篇细,但是还是谢谢分享
  • 6c3fd4cc938f:写的太好了👍
  • Wish_xy:膜拜一下作者大大,写的很棒,简洁清晰,很容易理解学习!
  • eb9cb9744c2b:看到目前为止,最通俗易懂,最详细的RxJava的教程,继续拜读
  • 仁昌居士:还是刚才的例子, 只不过我们太添加了一点东西, 先来看看运行结果: 多了个“太”字。

    还有请教大佬一下,我们可以在Activity中将这个Disposable 保存起来, 当Activity退出时, 切断它即可。能不能代码举例一下,如何保存的。
    谢谢:smile:
  • 永远向着诗和远方:强烈推荐楼主在文章结尾加一下上一篇和下一篇的链接
  • code_solve:这个博文的风格看起来真舒服。。。
  • laksg2009:Retrofit完全没用过,这篇就这里看不懂.其他都能明白
  • 我只需要:楼主很威武,太牛掰了
  • 蚂蚁上树卡:compositesubscription 写错了,应该是这个
  • 蚂蚁上树卡:CompositeSubscribtion的使用在哪里呢?请问作者
  • 27efec53a72d:谢谢作者!我会转发给给多人来学习
  • 391c7ed5c5e5:恩 写的很详细 挺:+1:
  • aca16273a847:“那如果有多个Disposable 该怎么办呢, RxJava中已经内置了一个容器CompositeDisposable, 每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的水管.”

    这一段没有讲的太清楚,我看了下源码。 add/clear皆不是静态方法。你的意思是Activity里面创建一个CompositeDisposable(容器功能的)对象,将发起网络请求的下游回调引用都保留在CompositeDisposable对象中, 在activity的生命周期 destory时候调用CompositeDisposable对象的clear方法?

  • memorycjj:现在最新添加adapter的方式 compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0' ,额外不需要添加retrofit,okhttp的支持,自己添加下converter就可以了,麻烦楼主更新下,文章写的很棒 。
  • 有没有口罩给我一个:其实rxjava中线程的切换如果flatmap和observeer 同一个线程的话它会这样执行,flatmap→observer直到flatmap结束为止;如果不是同一个线程的话,会先把flatmap的事件全部执行完在执行observer,我这样理解对吗?
    有没有口罩给我一个: @ThirKing 我明白了
  • 82d047e884b2: .subscribeOn().observeOn()同时用,运行后没反应,只用subscribeOn(),emitter,onnext都在线程池执行;如果只用observeOn(),则emitter执行在主线程,onnext执行在线程池,是因为我用的最新版本(2.1.0)的原因?
  • wscjy:作者内容确实讲得很好,让初学者看得舒服,就是本文开头用Consumer有点不知所云
    nyzs_n9001:第一篇里结尾的时候有讲到:smile:
  • 林宥之:Retrofit本身已经做了线程切换了吧?实际应用中使用Retrofit+RxJava还需要subscribeOn(Schedulers.io()) 吗?
  • 倔强的炉包:膜拜大神
  • Gunter1993:相当给力的Rj2教程,就是标题起了一二三什么的,不太方便今后查阅
  • 良哥哥好帅:mark 楼主写得相当棒,受益匪浅
  • d3552222d151:强势围观
  • FynnJason:对观察者和被观察者一直理解比较懵。比如警察和小偷,灯泡和开关,上游和下游,他们到底谁是观察者,谁是被观察者呢?请作者指点一下。谢谢:relaxed:
    Season_zlc:这个问题可以从单词意思来看, Observeable这个单词的意思是可观察的, Observer单词的意思是观察者, 关系就很明确了
  • 68ed210f942b:Observable.create(new ObservableOnSubscribe<String>() {
    @Override

    public void subscribe(ObservableEmitter<String> e) throws Exception {
    e.onNext("hello");
    System.out.println(Thread.currentThread().getName());
    }
    }).subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.newThread())
    .subscribe(new Consumer<String>() {
    @Override
    public void accept(String mS) throws Exception {
    System.out.println(mS);
    System.out.println(Thread.currentThread().getName());
    }
    });
    大佬为什么这个什么都不输出
    68ed210f942b: @Season_zlc 嗯啊,对的谢谢
    Season_zlc:你是在Java 程序里跑的吧, 在主线程里加上Thread.sleep()延时, 否则主线程立马就结束了所以不会看到输出.
  • 流星留步:网络请求未完成时候,退出当前activity并未发生崩溃,而是发生的内存泄漏,activity没有被释放掉,onComplete里面的代码正常执行了,我的好像是这样的,和博主的有点差别啊?
    流星留步:产生了内存泄漏之后,我在退出Activity onDestroy中调用Disposable进行切断水管,但是切断水果之后就报错了。
    05-03 11:04:20.571 24773-25101/com.majian.rxjavam E/AndroidRuntime: FATAL EXCEPTION: RxNewThreadScheduler-1
    Process: com.majian.rxjavam, PID: 24773
    io.reactivex.exceptions.UndeliverableException: java.lang.InterruptedException
  • lzhHappy:API 接口里面的 @get(或者其它请求方式)必须要有 Url 地址,所传的字符串不能为空,否则会异常提示,baseUrl 后面没有地址的话可以加个/或者.字符串。
    @get 请求方式是没法使用@Body的
    741a982bae87:"如果在请求的过程中Activity已经退出了, 这个时候如果回到主线程去更新UI, 那么APP肯定就崩溃了" 这是什么原因?
    Veken_Fly:把baseurl后面的参数截取一段,就像我们公司,上传图片地址跟普通网络请求的地址不一样,所以只取前面一部分的,后面的用常量保存,在get和post里面再引用
    b88be24d43a8:@get 和@post 都用不了@Body , 不知道为啥
  • 译森:为什么不能单独使用doOnNext(),一定在结尾使用subscribe()?
    有没有口罩给我一个: @译森 因为doOnNext()是这样的:在onNext()之前执行
  • 一起睡个觉:RxJava执行完成之后 会自动取消订阅吗
    Season_zlc: 不会
  • 我才是张雷:确实,看完后想跳转下一章或者上一章不方便,不过不妨碍文章的精彩,感谢
  • FredKang:CompositeSubscription和CompositeDisposable感觉差不多
  • 旧人旧城旧心:看了很多rxjava的文章,你这篇讲解的很好!不像一些文章从头到尾一下就讲解完了,容易让人晕头晕脑。你这分开讲解,很让人舒服!! 支持一下!!!
    d4cc34632821:@要啥自行车c 愣头青,莽夫!这位同学是真,滴,皮!
    cc25b6aa133b:@小呢个李 我反手就是一个大招
    73ece15c815f:这就很舒服了,哈哈。。莫名跑偏
  • 暴躁的心:真心写的不错,作为一个小白我都可以理解了。
  • X1a0Yu_:好文章,楼主厉害:fist:
  • BillBian:一直感觉事件流向图,会觉得是从3-1,而不是1-3。是不是我理解的有问题:fearful:
    BillBian:@He埋心 怎样理解的?
    BillBian:@He埋心 好吧:stuck_out_tongue_winking_eye:
  • Xdjm:这章写的非常好:+1:
  • zhangdong0921:为什么我用你的标签定义接口的请求方法报错呢
    Multiple @body method annotations found.
    然后改成了
    @get("appstore/history/query?")
    Observable<HistoryInfo> gethis(@Query("key") String key, @Query("day") String day);
    这样就可以请求了
    zhangdong0921:@岁月无痕灬灬 base URL是自己在shareSDK上面拿个公共接口,都可以申请的,
    我的是 http://apicloud.mob.com/appstore/history/query appkey:1babdcc496ce8
    岁月无痕灬灬:@zhangdong0921 你知道baseurl么
  • aee30df11e01:写的很好
  • 77324d9b47bb:史上最好的rxjava2文章,没有之一
  • 左手木亽:一直以为多次obsereron切换也只有一次效果。😁😁
  • 勇敢的少年啊:思路清晰明澈👍
  • efe81c2fc5c4:好像 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的水管.这里写错了,应该是CompositeDisposable.dispose();,当然我没验证,错了不要打我:smile:
    sun_yangfei:@Season_zlc CompositeDisposable.dispose(); 和clar() 两个有什么区别?
    sun_yangfei:CompositeDisposable.dispose(); 和clar() 两个有什么区别?
    Season_zlc:很不幸你错了:v:
  • 6e242ece1811:谢谢分享
  • YMindar:建议大神给这个系列做个侧边导航的条目,方便直接切换
  • Small_Cake:同样的请求连接为什么Volley就可以,Retrofit就报签名失败呢。作者遇到过吗?
    Small_Cake:
    interface RetrofitInteface {
    @post("doLogin")
    Observable<JSONObject> login(@QueryMap TreeMap<String,String> map);
    }
  • 4dd273fef994:学习到了,rxjava2 感觉和rxjava1 有很大区别啊。老记不清 observable ,observe,Subscriber还有一些类似的东东怎么破啊
    MigrationUK: @4dd273fef994 还有ObservableOnSubscribe
  • 67c15ed34d3c:楼主,代码看起来只有一半呐,有些函数的参数看不全
  • 927fd5b2da21::joy: 我能转载吗?刚好想学Rx
    Season_zlc: @Jelansty 转转转,不转不是中国人😂
  • 无极小屋:教程1的地址呢
    无极小屋: @Season_zlc 就不点😂
    Season_zlc: @角落里的箱 点击头像进主页😂
  • e798ad090fa7:图文并茂,言简意赅,受益匪浅,谢谢您。
  • yangzteL:感谢楼主
  • fulai_xy:很好
  • fa16e00eeb9c:楼主牛,支持楼主
  • 欢乐的乐:最近项目在用Android Clean架构,里面有用到RxJava2,看了你的文章收益良多,以前不明白架构里面RestApi是干嘛的,看了例子内心大喊一声soga~~~。真的很谢谢你,今天下午接着看下去,嘻嘻。
    yangzteL:之前在找Rxjava 2 的文章,今天才发现我想找的在这里,感谢博主
    Season_zlc: @欢乐的乐 不客气😀
  • 巴黎没有摩天轮Li:这是继扔物线以后,我见过最好的文章了!赞!!!!!没毛病!!!!
    Season_zlc: @巴黎没有摩天轮Li 😄
  • 蜕变_99d2:建议楼主在文章结尾加一下上一篇和下一篇的链接,看起来方便:grin:
    kee_zlg:关注作者账号,里面可以看到很多东西
    aa1bfec397db:支持
    George吴逸云:哥们这个建议很好,楼主加上上下篇的连接吧,文章写得这么好,一定会有很多人来拜读的:smile:
  • 9146a69e7d04:最后数据库操作是不是没有subscribe ?
    Holly_dd20:@jblz625同学说得对
    jblz625:最后读取数据库的例子 ,方法返回体是一个Observable对象,subscribe 方法可以在 调用方法拿到返回体之后再调用吧.
    nyzs_n9001:看的好仔细,给你一个么么哒:kissing_heart:
  • d8184ca3c970:多谢楼主的文章,希望楼主建个rxjava2的群,这样方便大家交流,随时好请教大家。
  • itchenqi188:很好 ,拜读了

本文标题:给初学者的RxJava2.0教程(二)

本文链接:https://www.haomeiwen.com/subject/gdqlmttx.html