美文网首页
Android RxJava 学习笔记

Android RxJava 学习笔记

作者: EmMper | 来源:发表于2019-01-04 19:59 被阅读0次

    参考文章:给 Android 开发者的 RxJava 详解,本文的部分内容参考自该文章。
    注:下文中demo部分代码采用Kotlin语言编写,RxJava源码部分为Java语言,RxJava lib版本为2.0。

    RxJava是什么

    • RxJava在GitHub上的自我介绍为:"a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库);
    • 可理解为:一个实现异步操作的库。

    RxJava的优势

    • 简化逻辑(链式调用,无嵌套)
    • 代码示例:
    // 将某文件夹下的所有png文件转化为bitmap并进行渲染
    Observable.fromArray(*folders) // 观察文件夹数组
            .flatMap { Observable.fromArray(*it.listFiles()) } // 转换为文件夹中的文件
            .filter { it.name.endsWith(".png") } // 筛选其中的png文件
            .map { getBitmapFromFile(it) } // 将file转换为bitmap
            .subscribeOn(Schedulers.io()) // 事件产生在io线程
            .observeOn(AndroidSchedulers.mainThread()) // 事件消费在主线程
            .subscribe { imageCollectorView.addImage(bitmap) }// 消费事件,渲染bitmap
    

    如果你对RxJava不是很熟悉,看上面这段代码就会有一种“虽然不知道你在说啥,但是感觉很屌的样子”的感觉。没错,就是要让你感受到它很diao,然后,我来让你看懂这段代码到底是在干嘛。

    API介绍

    概念

    • 异步实现原理:观察者模式
    • RxJava的观察者模式包含以下四个概念:
      • 被观察者:Observable
      • 观察者:Observer
      • 订阅:subscribe
      • 事件
    • RxJava将每个事件单独处理,并把所有事件作为一个队列。
    • RxJava事件回调:
      • onNext():普通事件;
      • onComplete(): 事件队列完结。当不回再有新的OnNext()发出时,触发onComplete()
      • onError():事件队列异常。当事件队列中的事件处理过程中出现异常时,触发onError(),并终止队列。
      • 在同一个事件队列中onComplete()onError()只会有一个触发,并且触发后队列结束。

    基本使用

    创建Observer

    Observer即观察者,决定了事件触发时的行为。RxJava中的Observer接口的实现方式:

    val observer: Observer<String> = object : Observer<String> {
        override fun onComplete() {
            // do something..
        }
    
        override fun onSubscribe(d: Disposable) {
            // do something..
        }
    
        override fun onNext(t: String) {
            // do something..
        }
    
        override fun onError(e: Throwable) {
            // do something..
        }
    }
    
    创建Observable

    Observable即被观察者,决定了什么时候触发事件以及触发怎样的事件。RxJava使用create()方法创建Observable,并定义其触发规则:

    val observable: Observable<String> = Observable.create<String>(object : ObservableOnSubscribe<String> {
        override fun subscribe(emitter: ObservableEmitter<String>) {
            emitter.onNext("Hello")
            emitter.onNext("Hi")
            emitter.onComplete()
        }
    })
    // Kotlin语言中,推荐使用Lambda表达式将类型为接口的参数缩进为方法块
    // 如下所示,两种代码实际上是一样的
    val observable: Observable<String> = Observable.create<String> { it ->
        it.onNext("Hello")
        it.onNext("Hi")
        it.onComplete()
    }
    

    这里传入了一个ObservableOnSubscribe对象作为参数,当Observable被订阅的时候,ObservableOnSubscribesubscribe()方法会被调用,然后依次触发两次onNext()方法和一次onCompleted()
    基于create()方法,RxJava还提供了其他用来快捷创建时间队列的方法:

    • just(T...):将传入的参数依次发送:
    val observable: Observable<String> = Observable.just("Hello", "Hi")
    // 会依次调用:
    // onNext("Hello");
    // onNext("Hi");
    // onCompleted();
    
    • fromArray/fromIterable/fromCallable/fromFuture/fromPublisher:将传入的数组或Iterable等的item依次发送出来:
    val stringArr = arrayOf("Hello", "Hi")
    val observable = Observable.fromArray(*stringArr)
    // 会依次调用:
    // onNext("Hello");
    // onNext("Hi");
    // onCompleted();
    
    Subscribe:订阅

    创建了ObservableObserver之后,在用subscribe()方法为它们之间建立订阅关系。代码如下:

    observable.subscribe(observer)
    

    这里RxJava为了流式 API 的设计,subscribe()方法看起来是被观察者订阅了观察者,而非普通的观察者模式中观察者订阅了被观察者。

    RxJava还提供了以下subscribe()方法的重载:

    observable.subscribe { 
        // onNext
    }
    observable.subscribe ({
        // onNext
    }, {
        // onError
    })
    observable.subscribe ({
        // onNext
    }, {
        // onError
    }, {
        // onComplete
    })
    
    • 实例:打印输出学生名称
    val students = arrayOf(Student("李明"), Student("黄翔"))
    Observable.fromArray(*students)
            .subscribe { it:Student! ->
                Log.i(TAG, it.name)
            }
    // 输出结果为:
    // 李明
    // 黄翔
    

    在以上代码中,事件默认在同一个线程(UI线程)产生以及消费。观察者模式通常是处理异步操作时使用的。要用RxJava实现异步,就要用到RxJava中的Scheduler(后面会详细讲)。
    我们可以发现,当事件序列中的的操作对象为多个对象或者一个数组时,就好像是对操作对象进行了循环,而循环的时间复杂度就是该操作对象中对象的个数,或者数组的长度,循环内执行的操作就是事件序列中各事件中所执行的操作。听起来很抽象,直接来看以下代码。其实操作对象为单个对象时,也可以看作是在进行时间复杂度为1的一次循环。

    // 将上一段代码转化为循环语法
    val students = arrayOf(Student("李明"), Student("黄翔"))
    for(item in students) {
        Log.i(TAG, item.name)
    }
    // 输出结果为:
    // 李明
    // 黄翔
    

    那为啥不直接用循环语法而要用RxJava呢?仅仅在这样一个例子中,看起来循环语法似乎是更为简洁,但是,如果是多层循环嵌套,或是要进行线程切换,RxJava的流式结构无疑更为清晰。后文中会涉及到相关功能。

    线程调度(Scheduler)
    • Scheduler的使用
      RxJava的线程调度主要通过Scheduler实现。如以下代码:
    Observable.just("Hello", "Hi")
            .subscribeOn(Schedulers.newThread()) // 事件产生在子线程
            .observeOn(AndroidSchedulers.mainThread()) // 事件消费在主线程
            .subscribe { it: String! ->
                Toast.makeText(context, it, Toast.LENGTH_SHORT).show()
            }
    

    以上代码即为简单的线程调度使用实例。在RxJava中,为使用者提供了以下几种Scheduler

    1. Schedulers.immediate():当前线程,不会进行线程转换。RxJava中默认的Scheduler
    2. Schedulers.newThread():启用新线程。
    3. Schedulers.io():进行I/O操作的线程。拥有一个无上限的线程池,可复用空线程,比newThread()更有效率。
    4. Schedulers.computation():进行计算(CPU密集型计算)操作的线程。拥有固定大小的线程池,数量为CPU核数。
    5. AndroidSchedulers.mainThread():Android特有的主线程。

    在RxJava中,通过subscribeOn()observeOn()两个方法,根据传入的Scheduler参数进行线程切换。subscribeOn()指定了被观察者Observable运行所在的线程,即ObservableOnSubscribesubscribe()方法执行时所处的线程,observeOn()指定了Observer中的事件所在的线程。若为指定,则默认为当前线程,不会切换
    需要注意的是,一个Observable队列中,调用多次subscribeOn()方法,Observable会运行在第一次调用的subscribeOn()方法中;而observeOn()方法可多次调用,一次observeOn()方法被调用,会将队列中之后的事件所处的线程切换为传入的Scheduler的线程,直到调用下一个observeOn()被调用或者直到队列结束。
    Talk is cheep, show me the code:

    // 将若干字符串打印、存库并显示toast
    Observable.just("Hello", "Hi")
            .subscribeOn(Schedulers.newThread()) // 新建线程
            .subscribeOn(AndroidSchedulers.mainThread()) // 主线程
            .doOnNext { it: String! ->
                Log.i(TAG, it) // 在刚才新建的线程中输出
            }
            .observeOn(Schedulers.io()) // 切换至io线程
            .doOnNext { it: String! ->
                StringDao.getInstance().saveString(it) // 在io线程进行数据库存储操作
            }
            .observeOn(AndroidSchedulers.mainThread()) // 切换至主线程
            .subscribe { it: String! ->
                Toast.makeText(context, it, Toast.LENGTH_SHORT).show() // 在主线程操作view层,显示吐司
            }
    
    • 几个特殊的方法:
    1. doOnSubscribe():在subscribe()发生时执行

    根据上文可知,队列中多次调用subscribeOn()方法似乎只有第一次调用时起作用,那么调用多次subscribeOn()方法看起来是个没什么用而且很蠢的做法,但是,有一种例外:doOnSubscribe()。默认情况下,该方法执行在subscribe()发生的线程;若在调用doOnSubscribe()方法之后再次调用subscribeOn()的话,则doOnSubscribe()会执行在其之后第一次调用的subscribeOn()所指定的线程:

    Observable.just("Hello", "Hi")
            .subscribeOn(Schedulers.newThread()) // 新建线程
            .doOnSubscribe{ it: String! ->
                Log.i(TAG, it) // 在主线程中输出
            }
            .subscribeOn(AndroidSchedulers.mainThread()) // 主线程
            .subscribeOn(Schedulers.io())
            .doOnNext { it: String! ->
                Log.i(TAG, it) // 在新建的线程中输出
            }
            .observeOn(AndroidSchedulers.mainThread()) // 切换至主线程
            .subscribe { it: String! ->
                Toast.makeText(context, it, Toast.LENGTH_SHORT).show() // 在主线程操作view层,显示吐司
            }
    
    1. doOnNext():onNext()事件触发时调用
    2. doOnComplete():onComplete()事件触发时调用
    3. doOnError():onError()事件触发时调用
    4. doFinally():Observable队列结束时调用

    以上方法都可以通过observeOn()方法指定线程,以doOnNext()为例:

    Observable.just("Hello", "Hi")
            .subscribeOn(Schedulers.newThread()) // 新建线程
            .subscribeOn(AndroidSchedulers.mainThread()) // 主线程
            .doOnNext { it: String! ->
                // onNext()事件触发时,在新建的线程中输出
                Log.i(TAG, it) 
            }
            .observeOn(Schedulers.io()) // 切换至io线程
            .doOnNext { it: String! ->
                // onNext()事件触发时,在io线程进行数据库存储操作
                StringDao.getInstance().saveString(it) 
            }
            .observeOn(AndroidSchedulers.mainThread()) // 切换至主线程
            .subscribe { it: String! ->
                // Observer中的onNext()事件,在主线程操作view层,显示吐司
                Toast.makeText(context, it, Toast.LENGTH_SHORT).show()
            }
    
    变换

    所谓变换,是指RxJava能将事件序列的操作对象变换为其他对象的功能。该功能十分强大,是RxJava的核心共呢功能之一。

    • map():
    // 将json转化为对象后存库
    val json: String = studentJson  
    Observable.just(json)
        .map(object : Function<String, Student> {
            override fun apply(t: String): Student {
                // 将student的json字符串转换为student对象
                return Gson().fromJson(t, Student::class.java)
            }
        })
        .observeOn(Schedulers.io())
        .subscribe(object : Consumer<Student> {
            // 操作对象变为student
            override fun accept(t: Student?) {
                // onNext
                // 将student对象存库
                StudentDao.saveStudent(t)
            }
        })
    

    这段代码可用lambda表达式简化为:

    val json: String = studentJson
    Observable.just(json)
        .map { it: String ->
            Gson().fromJson(it, Student::class.java)
        }
        .observeOn(Schedulers.io())
        .subscribe { it: Student ->
            // onNext
            StudentDao.saveStudent(it)
        }
    

    map()方法,传入一个Function接口对象。在该接口的apply()方法中,return转换后的对象。此后该事件序列的操作对象,都会变为转换后的对象。这是RxJava中最常用的变换方法。

    • flatMap():
      map()方法类似,flatMap()方法传进一个Function接口对象,并在该接口的apply()方法中,返回一个转换后的Observable对象,即返回一个新的事件序列。代码如下:
    val json: String = studentJson
    Observable.just(json)
        .flatMap(object : Function<String, Observable<Student>> {
            override fun apply(t: String): Observable<Student> {
                return Observable.just(Gson().fromJson(t, Student::class.java))
            }
        })
        .subscribe(object : Consumer<Student> {
            override fun accept(t: Student?) {
                StudentDao.saveStudent(t)
            }
        })
    
    // 用lambda表达式可简化为:
    Observable.just(json)
        .flatMap{ it : String ->
            Observable.just(Gson().fromJson(it, Student::class.java))
        }
        .subscribe{ it : Student ->
            StudentDao.saveStudent(it)
        }
    

    看到这你可能会疑惑,那这样不就和map()一样了吗,只是返回的对象不同,用法其实都一样,看起来还比map()方法更麻烦。没错,因为flatMap()根本不是这么用的,虽然这样写也没有问题。接下来详细说明二者的使用场景之间的区别。
    前文说过Observable的操作对象如果是多个,或者是数组类型,则事件序列中的操作就像是对所有操作对象进行了一次循环,而flatMap()方法,则是对该事件序列进行了嵌套循环。也就是说,map()方法的转换只能是一对一(不会增加循环的层数)的,而flatMap()方法的转换是一对多(会增加循环的层数)的。示例代码如下:

    // 将所有学生的所有课程打印输出:
    val students = arrayOf(Student("李明"), Student("黄翔"))
    Observable.fromArray(*students)
        .flatMap{ it : Student ->
            // courses的类型为ArrayList<Course>
            // Java中ArrayList.toArray()方法的返回值为Object[]
            Observable.fromArray(*it.courses.toArray())
        }
        .map { it : Any ->
            // Kotlin中的Any就是Java中的Object
            // 将Object强转为Course
            it as Course
        }
        .subscribe { it : Course ->
            Log.i(TAG, it.name)
        }
    

    如果对比循环语法,则应该是这样:

    val students = arrayOf(Student("李明"), Student("黄翔"))
    for (student in students) {
        for(course in student.courses) {
            Log.i(TAG, course.name)
        } 
    }
    

    你可能又会问了,这看起来比循环语法要复杂得多嘛。当然,这段代码是双层循环,并且循环嵌套之间没有其他复杂逻辑。但是在实际应用中就不一定是这样了,循环可能有多层,而嵌套之间也可能包含复杂的逻辑,比如线程切换。在这种情况下,每一层嵌套的缩进,都会降低代码的可读性。而用RxJava看起来代码行数很多,可是逻辑自上而下很清晰,代码可读性很高。

    筛选(filter)

    filter()类似于if语法(不是if-else语法),传入一个Predicate接口对象,并在接口test()方法的返回值中,将需要过滤掉的返回false,需要留下的返回true。比如如下示例:

    // 输出学生中所有男生的姓名
    Observable.fromArray(*students)
        .filter(object : Predicate<Student> {
            override fun test(t: Student): Boolean {
                return t.sex.equals("男")
            }
        })
        .subscribe(object : Consumer<Student> {
            override fun accept(t: Student?) {
                Log.i(TAG, t?.name)
            }
        })
    
    // 用lambda表达式可简化为
    Observable.fromArray(*students)
        .filter { it : Student ->
            it.sex.equals("男")
        }
        .subscribe{ it : Student ->
            Log.i(TAG, it.name)
        }
    
    // 用if语法可写为
    Observable.fromArray(*students)
        .subscribe{ it : Student ->
            if(it.sex.equals("男")) {
                Log.i(TAG, it.name)
            }
        }
    

    使用场景

    由于RxJava是一个实现异步操作的库,而且实现原理是通过观察者模式。所以,理论上,RxJava可以用于任何异步操作或观察者模式的使用场景,只是需要添加对RxJava的支持,不然可能无法直接使用。

    与Retrofit结合使用

    • 什么是Retrofit?

    Retrofit官网对自己的描述是:Type-safe HTTP client for Android and Java by Square, Inc.即:由Square公司出品的用于Android和Java的类型安全的HTTP客户端。对于Android而言,就是一个网络请求库。
    若对Retrofit不了解的可以跳过这部分内容,想要了解的也可以自行了解。

    • 如何与RxJava结合使用

    Retrofit添加对RxJava的支持,可以将原本的相应回调Callback以转换为Observable的形式。具体如下:
    使用Callback(以下代码为已对Retrofit二次封装后的代码):

    // 在ApiService接口中定义api
    @FormUrlEncoded
    @POST("/user")
    fun getUser(@Field("userid") uid: Int) : Call<UserResponseModel>
    
    // 使用api
    val call = ApiService.getInstance().getUser(userId)
    call.enqueue(object : Callback<UserResponseModel> {
        override fun onFailure(call: Call<UserResponseModel>, t: Throwable) {
                // 请求失败回调
            }
    
        override fun onResponse(call: Call<UserResponseModel>, response: Response<UserResponseModel>) {
                // 请求成功回调
            }
    })
    

    使用Observable

    // 创建Retrofit实例时添加对RxJava的支持
    val retrofit = Retrofit.Builder()
                        .baseUrl(baseUrl)
                        .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 适配RxJava2.x版本
                        .build()
    
    // 在ApiService接口中定义Api时将返回类型改为Observable
    @FormUrlEncoded
    @POST("/user")
    fun getUser(@Field("userid") uid: Int) : Observable<UserResponseModel>
    
    // 使用Observable进行api请求
    ApiService.getInstance().getUser(userId)
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe({ it : UserResponseModel ->
                            // onNext,成功回调
                        }, { it : Throwable ->
                            // onError,失败回调
                        })
    

    这样就可以使用Observable处理网络请求了,更可以通过flatMap()方法进行请求链的调用,比如:

    // 在ApiService接口中定义Api
    // 登录
    @FormUrlEncoded
    @POST("/user")
    fun getUser(@Field("userid") uid : Int) : Observable<UserResponseModel>
    // 获取token
    @FormUrlEncoded
    @POST("/token")
    fun getToken() : Observable<TokenResponseModel>
    
    
    // 先调用登录api,在调用获取token api
    ApiService.getInstance().getToken()
                        .subscribeOn(Schedulers.io())
                        .flatMap { it : TokenResponseModel ->
                            saveToken(it)
                            getUser(it.userId)
                        }
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe({ it :UserResponseModel ->
                            // onNext,成功回调
                        }, { it : Throwable ->
                            // onError,失败回调
                        })
    

    将请求链通过Observable序列实现,代码逻辑是不是变得清晰了很多?
    当然,还有很多其他用法,这里就不一一列举了。

    与RxBinding结合使用

    • 什么是RxBinding

    RxBinding是ButterKnife作者JakeWharton写的一个基于RxJava的View注入框架。
    该框架将View的监听事件,例如View.OnClickListenerView.OnTouchListenerRecyclerView.OnScrollListener等监听事件,通过Observable的方式进行回调。

    • 怎么使用

    以点击事件为例:

    RxView.clicks(view)
        .subscribe { it : Unit ->
            // do something
        }
    

    RxJava中还有一个api叫做throttleFirst(),该api会在每次事件触发后的一段时间内屏蔽新的事件,可用于去抖动过滤。比如用在下方代码中,可以防止连击启动多个Activity

    RxView.clicks(view)
        .throttleFirst(300, TimeUnit. MILLISECONDS) // 300毫秒抖动过滤
        .subscribe { it : Unit ->
            NewActivity.start(view.context)
        }
    

    其他异步操作、观察者模式使用场景

    这里就不再举例啦。
    只要是异步操作或者观察者模式的使用场景,理论上都可以使用RxJava,只是需要自己手动适配一下。而上文中的Retrofit和RxBinding,在源码中都有对RxJava进行适配。感兴趣的话可以去Github上将源码下载下来进行研究。

    总结

    这是我第一篇严格意义上的技术方向的博客。从业快三年了,看了不少前辈们的文章,最近在学习RxJava的过程中,就想要自己将学习的东西记录下来,方便以后温故知新,也能造福后来的旅者。
    由于这篇文章除了借鉴了大神的文章外,还加入了许多自己的见地,所以可能有些地方理解的不准确,或者排版混乱,有意见者,还望多多指正、不吝赐教。
    希望这篇文章能真的帮助到各位正走在学习路上的人。

    相关文章

      网友评论

          本文标题:Android RxJava 学习笔记

          本文链接:https://www.haomeiwen.com/subject/tpyulqtx.html