RxJava与RxKotlin项目实践

作者: YamLee | 来源:发表于2018-01-26 13:39 被阅读62次

    What is Rx?

    基本概念

    RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

    按照官方介绍RxJava是一个在JVM平台上以Observable序列来处理异步和时间的代码库

    Netflix发起

    Why Rx?

    现在Android社区都在广泛的推广和使用RxJava,但是为什么用它以及使用它能带来什么样的好处,我们通过一个具体的案例来研究

    用户登录系统需求

    假设现在我们有一个微信用户第三方登录功能的需求,这个需求呢很简单,就是实现一个登录界面,用户点击登录跳转到微信进行用户认证,拿到微信的openId然后再接入到自己后台服务器进行登录。这里我们设计一个AccountManager类,这个类里面有2个方法,首先是requestWxOpenId()方法用于跳转微信获得微信的openId,另外一个就是通过openId登录认证的login()

    interface AccountManager {
        fun requestWxOpenId(): String
    
        fun login(wxOpenId: String): User
    }
    

    传统代码实现

    因为AccountManager这些方法都是服务器访问的接口,是个异步任务,如果直接调用堵塞主线程肯定是不行的,所以我们会封装一个callback出来,回调方法用来处理当接口请求结束后的操作,因为接口请求有可能成功也有可能失败,所以回调接口中包含了两个方法onSuccessonFaillure

    interface OpenIdCallback {
        fun onSuccess(openId: String)
        fun onFailure()
    }
    
    interface AccountCallback {
        fun onSuccess(resultl: String)
    
        fun onFailure()
    }
    
    interface AccountManager {
        fun requestWxOpenId(callback: OpenIdCallback): String
    
        fun login(wxOpenId: String, callback: AccountCallback)
    }
    

    AccountManager的实现很简单首先调用requestWxOpenId方法获取到openId后再调用login方法获取用户信息,调用代码如下所示

     private fun clickLogin() {
            val accountManager = AccountManagerIml()
            accountManager.requestWxOpenId(object : OpenIdCallback {
                override fun onSuccess(openId: String) {
                    accountManager.login(openId, object : AccountCallback {
                        override fun onSuccess(user: User) {
                            if (!isActivityDestroyed) {
                                runOnUiThread {
                                    //update UI
                                }
                            }
                        }
    
                        override fun onFailure() {
                            //handle error
                        }
    
                    })
                }
    
                override fun onFailure() {
                    //handle error
                }
            })
        }
    

    可以看到整套流程下来,有几个痛点,一个是每一个异步请求我们都需要生命一个callback借口,另外一个是callback嵌套使得代码不清晰,如果再嵌套多几层会使得代码很难看,这就是有名的回调地狱(callback hell),再就是像在Android应用开发中需要在主线程操作UI和相关组件生命周期的判断,这就需要在各个回调中加上各种判断。为了解决这些问题Rx应运而生

    使用RX的实现

    为了解决上述提到哪些问题,我们看看Rx怎么解决的,首先为了解决不同的callback问题,Rx定义了统一的回调接口Observer

    public interface Observer<T> {
        void onCompleted();
    
        void onError(Throwable e);
    
        void onNext(T t);
    }
    

    通过泛型包装来统一回调接口,Observer回调中封装了三个方法onErro用来处理错误情况,onNext用来处理正常情况下返回结果的回调,onComplete表示整个流程结束的回调。

    通过统一泛型回调接口解决了声明多个回调的问题,那么如何解决回调嵌套的问题呢,因为回调嵌套产生的原因是由于数据的链式处理,如果把数据处理都能直接反映链式处理上,最终结果返回给最终的observer就行了,这个Rx使用Observable和对应的操作符来处理链式数据处理

    Observable表示一个数据源,在AccountManager的方法中,我们对应的接口就需要修改

    interface AccountManager {
        fun requestWxOpenId(): Observable<String>
    
        fun login(wxOpenId: String):Observable<User>
    }
    

    我们调用代码就只需要一个subscriber就行了

        private fun clickLogin(){
            val accountManager = AccountManagerIml2()
            accountManager.requestWxOpenId()
                    .flatMap { accountManager.login(it) }
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(object: Subscriber<User>() {
                        override fun onNext(t: User?) {
                            //update ui
                        }
    
                        override fun onCompleted() {
                        }
    
                        override fun onError(e: Throwable?) {
                        }
                    })
        }
    

    看到上面的代码是不是变得清晰了,如果使用RxKotlin,会更加清晰

        private fun clickLogin() {
            val accountManager = AccountManagerIml2()
            accountManager.requestWxOpenId()
                    .flatMap { accountManager.login(it) }
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeBy(
                            onError = {},
                            onNext = {
                                //update ui 
                            })
        }
    

    可以看到,使用RxJava来封装这些异步操作后,整个代码跟同步操作的代码都非常接近,结构很清晰和明了

    What Rx inside is?

    要想更好的使用RxJava,就有必要更好更深入的了解RxJava的api,这一部分将深入了解RxJava和RxKotlin的api和部分实现原理

    RxJava1 Api探究

    事件源

    Observable

    Observable是最为常用的数据源,Observable对象提供了常用的创建方法,create,just,from等方法

    Observable.just("hello")
    
    Observable.from(arrayOf(1, 2, 3, 4, 5))
    
    Observable.create(Observable.OnSubscribe<String> { t ->
            try {
                t.onNext("hello")
                t.onStart()
            } catch (e: Exception) {
                t.onError(e)
            }
        })
    
    

    Completable

    Completable数据源只会触发onComplete和onError回调,通过from()方法可以创建Completable数据源

    Completable.fromAction { println("action done") }
    

    Single

    Single数据源只会发射一条数据,Single类也有create,just,from等方法用来床架Single数据源对象,subscribe方法只有onSuccess和onError,比方说如下代码

        Single.just(1)
                .subscribeBy(
                        onSuccess ={ println(it) },
                        onError = {}
                )
    

    操作符

    操作符可以说是是RxJava中的核心了,通过不同的操作符来实现数据的拼接,转换等等操作从而达到最终的数据结果,RxJava提过了很多的操作符,这里就不一一讲解,主要列出一些常用的

    map

    用于数据转换,常见的应用场景比方说id应设成名称

    Observable.just(1, 2, 3, 4)
               .map {
                   when (it) {
                       1 -> "Jane"
                       2 -> "Mike"
                       3 -> "Mary"
                       4 -> "tom"
                       else -> "noName"
                   }
               }
               .subscribeBy(onNext = { println(it) })
    

    输出内容为

    Jane
    Mike
    Mary
    tom
    

    flatMap

    flatMap操作类似于Map,但是Map转换是T -> D,但是flatMap是T -> Observable<D>

    fun getInfoById(id: Int): Observable<Int> {
        //模拟耗时请求
        return Observable.just(id)
    }
    
    Observable.just(1, 2, 3, 4)
                .flatMap { getInfoById(it) }
                .subscribeBy(onNext = { println(it)})
               
    

    这里getInfoById我们假设是一个网络请求或者数据库操作之类的耗时操作,它返回Observable<Int>对象,最后subscriber中onNext中接受到的还是Int数据

    输出结果

    1
    2
    3
    4
    

    filter

    filter用于过滤数据

    Observable.just(1,2,3,4)
                .filter {  it > 2 }
                .subscribeBy(onNext = { println(it)})
    

    输出结果

    3
    4
    

    ....

    RxJava中操作符可以说是非常非常多,想了解更多可以查看官方文档Rx操作符

    线程调度

    线程调度在RxJava可以说是非常精简,简单到一两行代码来实现线程切换

        Observable.just(1,2,3,4)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeBy(onNext = { println(it)})
    

    上面代码中通过切换android ui线程和io线程是的回调方法都运行在ui线程,Observable数据源操作在io线程中运行

    Transformer

    我们在使用RxJava的过程中,通常都会用到线程切换,耗时操作在io线程,subscriber回调在主线程

        Observable.just(1,2,3,4)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeBy(onNext = { println(it)})
    

    对于多个地方都总是要重复调用subscirbeOn和observeOn这两个方法,我们会想到去封装一下,比方说增加一个方法

    fun schedule(observable: Observable<Any>): Observable<Any> {
        return observable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
    }
    

    这样我们就可以这样调用

    schedule(observable)
                .subscribeBy(onNext = { println(it)})
    

    这样做虽然加了一层封装,但是打破了整个Observable的链式调用,这就显得不太好看

    好在RxJava有Transformer,通过与Observable的compose()操作符使用来组合Observable操作

    Transformer就是一个Func1<Observable<T>, Observable<R>>,可以通过它将一种类型的Observable转换成另一种类型的Observable

    fun <T> applySchedulers(): Observable.Transformer<T, T> {
        return Observable.Transformer { observable ->
            observable.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
        }
    }
    

    我们创建一个applySchedulers方法返回一个Transformer对象,这时候我们就可以直接链式切换线程了

    observable.compose(applySchedulers())
                .subscribeBy(onNext = { println(it)})
    

    Subject

    Subject是一个很特殊的接口,它即实现了Observable接口又实现了Observer接口,也就是说它既是一个数据发射方又是一个数据接收方

    AsyncSubject

    AsyncSubject注册到的Observer只会在onComplete回调之后接收到最后一个onNext发射出来的数据

      // observer不会接收到数据,因为subject没有调用onComplete
      AsyncSubject<Object> subject = AsyncSubject.create();
      subject.subscribe(observer);
      subject.onNext("one");
      subject.onNext("two");
      subject.onNext("three");
    
      // observer只会接收到最后一个数据,也就是“three”
      AsyncSubject<Object> subject = AsyncSubject.create();
      subject.subscribe(observer);
      subject.onNext("one");
      subject.onNext("two");
      subject.onNext("three");
      subject.onCompleted();
    

    BehaviorSubject

    注册到BehaviorSubject的observer只会接收到subscribe之前发射的最后一个item和之后发射的所有item

      // observer会接收到所有4个事件(包括"default").
      BehaviorSubject<Object> subject = BehaviorSubject.create("default");
      subject.subscribe(observer);
      subject.onNext("one");
      subject.onNext("two");
      subject.onNext("three");
    
      // observer will receive the "one", "two" and "three" events, but not "default" and "zero"
      BehaviorSubject<Object> subject = BehaviorSubject.create("default");
      subject.onNext("zero");
      subject.onNext("one");
      subject.subscribe(observer);
      subject.onNext("two");
      subject.onNext("three");
    
      // observer will receive only onCompleted
      BehaviorSubject<Object> subject = BehaviorSubject.create("default");
      subject.onNext("zero");
      subject.onNext("one");
      subject.onCompleted();
      subject.subscribe(observer);
    
      // observer will receive only onError
      BehaviorSubject<Object> subject = BehaviorSubject.create("default");
      subject.onNext("zero");
      subject.onNext("one");
      subject.onError(new RuntimeException("error"));
      subject.subscribe(observer);
    

    PublishSubject

    observer会接收到subscribe后所有发射出来的item

      PublishSubject<Object> subject = PublishSubject.create();
      // observer1 will receive all onNext and onCompleted events
      subject.subscribe(observer1);
      subject.onNext("one");
      subject.onNext("two");
      // observer2 will only receive "three" and onCompleted
      subject.subscribe(observer2);
      subject.onNext("three");
      subject.onCompleted();
    

    TestSubject

    TestSubject是专门为单元测试开发而来的

    @Test
    public void cancelShouldUnsubscribe() {
        TestSubject<String> observable = TestSubject.create(new TestScheduler());
        assertFalse(observable.hasObservers());
    
        T future = toFuture(observable);
        assertTrue(observable.hasObservers());
    
        future.cancel(true);
    
        assertFalse(observable.hasObservers());
    }
    

    RxJava2

    Reactive Streams

    Reactive Streams是一套在JVM上建立的响应式标准协议,旨在标准化以无阻塞式back pressre的形式来处理异步事件流协议,类似于网络的标准http协议一样,只是Reactive Streams是在JVM和JavaScript平台上的响应式协议

    规范概览

    • Publisher
    public interface Publisher<T> {
           public void subscribe(Subscriber<? super T> s);
    }
    

    Publisher接口用来定义发射事件对象,通过subscribe方法接收想要接收事件的subscriber

    • Subscriber
    public interface Subscriber<T> {
        public void onSubscribe(Subscription s);
        public void onNext(T t);
        public void onError(Throwable t);
        public void onComplete();
    }
    
    • Subscription
    public interface Subscription {
        public void request(long n);
        public void cancel();
    }
    
    • Processor
    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }
    

    Java9 Flow

    Reactive Streams定义了规范,JDK也实现了这个规范,在最新的Java9中引入了一个java.util.concurrent.Flow模块,这就意味着如果你使用Java9,可以直接引用Flow模块来应用响应式api,而不必须引入第三方的RxJava包

    api变动

    数据源
    • Observable和Flowable两种数据源,Observable是没有back pressure支持的,Flowable是需要设置back pressure选项的,也就是说对于事件发射方是否会产生back pressure问题需要使用者来决定,使用Flowable就必须要指定back pressure模式,不指定会直接报错
    • Single数据源的subscriber在RxJava2中改为SingleObserver,增加了onSubscribe方法
    interface SingleObserver<T> {
        void onSubscribe(Disposable d);
        void onSuccess(T value);
        void onError(Throwable error);
    }
    
    • Completable的Subscriber在RxJava2中改为CompletableObserver
    interface CompletableObserver<T> {
        void onSubscribe(Disposable d);
        void onComplete();
        void onError(Throwable error);
    }
    
    • Maybe: Maybe是在RxJava2中新加的一种数据类型,他是Single和Completable的结合体Maybe可能会发射0或者1个事件或者error
    Subject和Processor

    在RxJava1中的Subject,在RxJava2中新增了Processor,Processor与Subject类似,Processor支持back pressure而Subject不支持back pressure

    Subscriber和Subscription

    Subscriber增加了一个onSubscribe(Subscription)的回调方法,因为原有的Observable在subscribe一个Subscriber的时候会返回Subscription,但是在RxJava2中subscribe方法返回的是void,所以对应在Subscriber中增加了Subscription的回调方法

    其他
    • RxJava2中不再接收直接发射null,例如Observable.just(null)如果发射的对象是null,在RxJava2中会直接报错或者将error传到onError中
    • Schedules将immediate改为trampoline
    • 增加了test的操作符

    RxJava2还有许多变更,这里未能一一讲述,更详细文档请参考官方What's-different-in-2.0

    RxKotlin

    RxKotlin是基于RxJava轻量级框架,在基于Kotlin的项目中,本身可以直接使用RxJava,但是RxKotlin通过增加各类扩展函数和规范使得RxJava在Kotlin项目中更易使用和标准化

    import io.reactivex.rxkotlin.subscribeBy
    import io.reactivex.rxkotlin.toObservable
    
    fun main(args: Array<String>) {
    
        val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
    
        list.toObservable() // extension function for Iterables
                .filter { it.length >= 5 }
                .subscribeBy(  // named arguments for lambda Subscribers
                        onNext = { println(it) },
                        onError =  { it.printStackTrace() },
                        onComplete = { println("Done!") }
                )
    
    }
    

    completable扩展

    fun Action0.toCompletable(): Completable = Completable.fromAction(this)
    fun <T> Callable<T>.toCompletable(): Completable = Completable.fromCallable { this.call() }
    fun <T> Future<T>.toCompletable(): Completable = Completable.fromFuture(this)
    fun (() -> Any).toCompletable(): Completable = Completable.fromCallable(this)
    

    completable扩展主要是为各种对象增加toCompletable()的转换方法

    observables扩展

    observabled扩展方法主要是对各个对象增加toObservable对象,包括各个Array对象,iterator对象,Sequence对象等

    operators扩展

    fun <T> Observable<Observable<T>>.mergeAll() = flatMap { it }
    
    fun <T> Observable<Observable<T>>.concatAll() = concatMap { it }
    
    fun <T> Observable<Observable<T>>.switchLatest() = switchMap { it }
    
    fun <T> Observable<Observable<T>>.switchOnNext(): Observable<T> = Observable.switchOnNext(this)
    

    操作符主要增加了对Observable<Observable<T>>操作的扩展方法

    single扩展

    fun <T> T.toSingle(): Single<T> = Single.just(this)
    fun <T> Future<T>.toSingle(): Single<T> = Single.from(this)
    fun <T> Callable<T>.toSingle(): Single<T> = Single.fromCallable { this.call() }
    

    single扩展增加了Future,Callable和T的toSingle方法

    subscribers扩展

    subscribers增加了subscribeBy及其相应的重构方法,使用方法可参考如下代码

    list.toObservable() // extension function for Iterables
                .filter { it.length >= 5 }
                .subscribeBy(  // named arguments for lambda Subscribers
                        onNext = { println(it) },
                        onError =  { it.printStackTrace() },
                        onComplete = { println("Done!") }
                )
    

    subscription扩展

    subscription增加了两个扩展方法

    operator fun CompositeSubscription.plusAssign(subscription: Subscription) = add(subscription)
    
    fun Subscription.addTo(compositeSubscription: CompositeSubscription) : Subscription {
        compositeSubscription.add(this)
        return this
    }
    

    一个是kotlin符号重载,可以直接使用+=将subscription添加到CompositeSubscription中

    subscription += observable.subscribe{}
    

    另外一个是在Subscription中增加了addTo方法

    list.toObservable()
                .subscribeBy(  // named arguments for lambda Subscribers
                        onNext = { println(it) },
                        onError =  { it.printStackTrace() },
                        onComplete = { println("Done!") }
                )
                .addTo(compositeSubscription)
    

    How to apply Rx?

    Android Project Architecture

    [图片上传失败...(image-6365a7-1516945103127)]

    上图是Google IO 2017上新提出的Android Architecture,旨在帮助开发者如何更快更稳定的基于此架构开发应用

    从上面的架构图中,我们可以看到数据源的处理统一是通过Repository来封装,不管是数据库的数据还是服务器接口的数据,通过通过Repository封装再以LiveData的形式提供,注意这里的LiveData其实可以理解为RxJava的Observable。所以对于Android项目结构中数据源的封装,建议都采用Repository的形式,如果采用Android Architecture那可以采用LiveData,如果采用RxJava就以Observable来组合Repository

    Rx应用

    Retrofit

    Retrofit是一个基于RestFul Api的Java框架,我们可以直接使用Observable返回

    public interface GitHubService {
      @GET("users/{user}/repos")
      Observable<List<Repo>> listRepos(@Path("user") String user);
    }
    

    定义接口请求后,我们就可以通过Retrofit获取service实例做请求

    Retrofit retrofit = new Retrofit.Builder()
        .baseUrl("https://api.github.com/")
        .build();
    
    GitHubService service = retrofit.create(GitHubService.class);
    
    service.listRepost("test")
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeBy(onNext={})
    

    Retrofit具体介绍可查看其项目https://github.com/square/retrofit 这里就不一一介绍了

    RxBinding

    RxBinding是一个将Android UI控件事件转换为Rx事件的Java库

    Button button = ...;
    RxView.clickEvents(button) 
        .subscribe(new Action1<ViewClickEvent>() {
            @Override
            public void call(ViewClickEvent event) {
                // Click handling
            }
        });
    

    上面的代码中就是通过RxBinding将按钮的点击事件转换为Observable发射,这么做就可以通过一些操作符来显示快速点击或屏蔽指定的点击动作等等

    RxLifecycle

    RxLifecycle是一个管理Android Activity或Fragment组件周期的应用库

    myObservable
        .compose(RxLifecycle.bind(lifecycle))
        .subscribe();
    

    通过RxLifecycle的绑定到指定生命周期来处理比方说在Activity
    destroy的时候unsubscribe,在Activity onCreate的时候进行绑定

    RxBus

    RxBus是一个通过RxJava来实现时间总线的开源库,使用方式与大多是EventBus库类似,只是内部是通过RxJava来处理事件,这里就不做篇幅讲解了,有兴趣的同学可以参考https://github.com/AndroidKnife/RxBus

    注意事项

    • 在使用RxJava的时候,有时候会遇到小伙伴如下代码
    fun getUser(uId: String, subscriber: Subscriber<User>)
    

    在调用是传入Subscriber,这种方式还是传统的callback形式,这种方式完全背弃了Rx的初衷,这样修正会更贴切

    fun getUser(uId: String):Observable<User>
    
    • 在使用RxJava的时候忽视back pressure问题,可能会产生莫名的问题,需要注意实际事件发射场景是否会产生back pressure然后采用正确的操作符或者Flowable源来处理

    总结

    至此,整个RxJava的学习过程结束了,本篇文章站在RxJava应用的角度去分析,没有太过深入源码解析,这也不是本篇文章的目的,后续可能会再出一些源码解析类的文章,再就是随着Kotlin coroutine(协程)出现,对于异步任务的影响有哪些也是可以探索的地方。

    相关文章

      网友评论

        本文标题:RxJava与RxKotlin项目实践

        本文链接:https://www.haomeiwen.com/subject/mpoiaxtx.html