RxJava入门解析(一)

作者: tiloylc | 来源:发表于2018-03-15 17:26 被阅读0次

    前言

    随着RxJava越来越火爆,很多人想入门却不知道从何做起,笔者整理了一天,总结出一些比较实用的东西,望各位看官斧正。

    一、RxJava到底是什么,有什么好处?

    关键词:异步、简洁。
    RxJava可以进行异步调用,使每一步都可以自行处理其所在的线程,典型的流式代码结构,像读一首诗一样顺畅。即使以后逻辑越来越复杂,关于Rx的代码依然很顺畅。为什么这么说,请往下看。

    二、RxJava例子

    先比较一下下方的两个代码:

      new Thread(){
                @Override
                public void run() {
                    super.run();
                    //网络请求~~~~~之后
                    {
                        runOnUiThread(new Runnable() {
                            @Override
                            public void run() {
                                ts.setText("这是通过newThread");
                            }
                        });
                    }
    
                }
            }.start();
    
      Observable.create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            //异步操作网络请求后
                            {
                                subscriber.onNext("这是通过RxJava");
                                subscriber.onCompleted();
                            }
    
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<Object>() {
                        @Override
                        public void call(Object o) {
                            ts2.setText(o.toString());
                        }
                    });
    

    同样是开启新线程,从异步线程获取数据,然后再UI线程中写入,代码量相比较,下方的要比上面的多,但是对于易读性来讲,流式的代码结构,从上向下阅读,条理清晰,操作简单,不论以后逻辑修改多么复杂,几乎不会有太大的变化,不像第一部分的代码,如果说复杂度增加的话,就会很麻烦。

    三、RxJava使用方法

    本文是基于Intelli JDEA 进行的开发,使用了gradle,只需要加一行配置引入即可:

     compile 'io.reactivex:rxjava:1.2.1'
    

    概念基础

    RxJava使用的设计模式为观察者模式,主要的步骤就是创建观察者,被观察者,订阅。比较熟悉的同学可以略过这一段。

    借一张图 读书人能叫盗么,这叫借.png

    如上图所示,观察者模式可有四个模块:
    被观察者接口类:主要提供接口声明包括订阅 解除订阅 通知功能,和保存观察者对象。
    观察者接口类:主要提供一个方法供被观察者使用,当被观察者触发了一些条件后,通知观察者。
    被观察者实现类及观察者实现类:实现以上接口。

    举个简单的例子,在家懒得做饭订了外卖,跟送外卖的说(被观察者),等你到楼下了给我打电话(订阅),等外卖到的时候,就会给你打电话,通知你去取外卖(通知),这就是一个观察者模式应用。

    使用流程

    RxJava的开发主要分为三个步骤:apple、pen、Ah~


    皮.jpg

    1、Observer (观察者)

    Observer属于观察者,我们可以看一下源码:

    public interface Observer<T> {
     
        onCompleted();
        onError(Throwable e); 
        void onNext(T t);
    
    }
    

    很简单,就三个方法,代表着完成(onCompleted),错误(onError),接收数据(onNext)。
    另外还有一个实现了Observer的抽象类Subscriber,主要方法和Observer差不多,比较大的区别就是可以使用onStart()方法和unsubscribe()方法。
    onStart():主要在事件未发出的时候调用,仅能在subscribe 所发生的线程被调用,可以做一些数据处理。
    unsubscribe():用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。主要用于当某些对象不再使用的时候,但是并没有收到onCompleted消息的时候调用,防止出现内存泄露。
    主要实现代码:

     Subscriber observer = new Subscriber() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(Object o) { 
                }
    
                @Override
                public void onStart() {
                    super.onStart();
                    System.out.println("----onStart----");
                }
            };
    

    2、Observable(被观察者)

    Observable为被观察者,由被观察者来决定是否发送消息,何时发送消息,异步操作一般都在这里进行,实现方式:

     Observable observable = Observable.create(new Observable.OnSubscribe<Object>() {
                @Override
                public void call(Subscriber<? super Object> subscriber) { 
                    subscriber.onNext("111");
                    subscriber.onCompleted();
                }
            });
    

    调用Observable的create方法来创建被观察者,其中call方法的参数即可视为第一步的Observer ,在进行订阅的时候,会直接执行call方法内的代码。
    除了create方法外,还有简单的方法来直接实现,just(T...)方法和from(T[]) 方法

     String[] txt = {"easd","dasd","dasd"};
     Observable observable = Observable.from(txt);
     Observable observable2 = Observable.just("easd","dasd","dasd");
    

    查看源码后,其实实现的方法也是create方法,只是经过了一些简单的封装,这里就简单介绍一下from方法。

       public static <T> Observable<T> from(T[] array) {
            int n = array.length;
            if (n == 0) {
                return empty();
            } else
            if (n == 1) {
                return just(array[0]);
            }
            return create(new OnSubscribeFromArray<T>(array));
        }
    

    又上面代码可知,from方法首先对数组进行了判断,然后return了一个Observable的对象,使用的是create()方法,那么OnSubscribeFromArray又做了什么呢?

    public final class OnSubscribeFromArray<T> implements OnSubscribe<T> 
    

    这是OnSubscribeFromArray实际上也是实现了OnSubscribe接口,那么我们去看一下call()方法:

     @Override
            public void request(long n) {
                if (n < 0) {
                    throw new IllegalArgumentException("n >= 0 required but it was " + n);
                }
                if (n == Long.MAX_VALUE) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        fastPath();
                    }
                } else
                if (n != 0) {
                    if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                        slowPath(n);
                    }
                }
            }
    
            void fastPath() {
                final Subscriber<? super T> child = this.child;
    
                for (T t : array) {
                    if (child.isUnsubscribed()) {
                        return;
                    }
    
                    child.onNext(t);
                }
    
                if (child.isUnsubscribed()) {
                    return;
                }
                child.onCompleted();
            }
    
            void slowPath(long r) {
                final Subscriber<? super T> child = this.child;
                final T[] array = this.array;
                final int n = array.length;
    
                long e = 0L;
                int i = index;
    
                for (;;) {
    
                    while (r != 0L && i != n) {
                        if (child.isUnsubscribed()) {
                            return;
                        }
    
                        child.onNext(array[i]);
    
                        i++;
    
                        if (i == n) {
                            if (!child.isUnsubscribed()) {
                                child.onCompleted();
                            }
                            return;
                        }
    
                        r--;
                        e--;
                    }
    
                    r = get() + e;
    
                    if (r == 0L) {
                        index = i;
                        r = addAndGet(e);
                        if (r == 0L) {
                            return;
                        }
                        e = 0L;
                    }
                }
            }
    

    主要调用了这三个方法,当数组长度等于long的最大值时,调用fastPath(),反之则 slowPath(n),两个方法的主要目的其实也就是将数组进行遍历,然后分发出去。由此可知无论是just()还是from()都可以视为与create()方法等价。

    3、Subscribe (订阅)

    最激动人心的时候到了,之前的准备已经完成了之后,就可以将观察者和被观察者进行订阅。

    observable.subscribe(observer); 
    observable.subscribe(subscriber);
    

    Ah~~~~这样关系就定下来了。subscribe方法代码比较啰嗦,就简化一下发到下面:

     static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { 
            if (subscriber == null) {
                throw new IllegalArgumentException("subscriber can not be null");
            }
            if (observable.onSubscribe == null) {
                throw new IllegalStateException("onSubscribe function can not be null."); 
            } 
            //1 !!
            subscriber.onStart();  
            try { 
             //2 !!
                RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
                return RxJavaHooks.onObservableReturn(subscriber);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                if (subscriber.isUnsubscribed()) {
                    RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
                } else {
                    try {
                        subscriber.onError(RxJavaHooks.onObservableError(e));
                    } catch (Throwable e2) {
                        Exceptions.throwIfFatal(e2);
                        RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                        RxJavaHooks.onObservableError(r);
                    
                        throw r; 
                    }
                }
                return Subscriptions.unsubscribed();
            }
        }
    

    主要就三步:
    一调用subscriber的onStart()方法,
    二调用被观察者observable的OnSubscribe的call方法,
    三将subscriber变成Subscription 返回,可以直接使用unsubscribed()方法。
    另外,subscribe方法支持不完整的回调,可以使用Action1和Action0作为回调:

    Action1<String> onNextAction = new Action1<String>() {
        // onNext()
        @Override
        public void call(String s) {
            Log.d(tag, s);
        }
    };
    Action1<Throwable> onErrorAction = new Action1<Throwable>() {
        // onError()
        @Override
        public void call(Throwable throwable) {
            // Error handling
        }
    };
    Action0 onCompletedAction = new Action0() {
        // onCompleted()
        @Override
        public void call() {
            Log.d(tag, "completed");
        }
    };
    observable.subscribe(onNextAction,onErrorAction,onCompletedAction);
    //分别对应onNext,onError,onComplete。
    
    //也可以
    observable.subscribe(onNextAction,onErrorAction);
    observable.subscribe(onNextAction);
    //在源码里都有不同的方法去对应
    

    以上所说都是关于RxJava的使用方法,但是!跟特么所说的异步没毛关系,并没有什么卵用,下面开始正片

    三、RxJava的异步调度

    这个东西专门拿出来当一个大分类来讲,在RxJava里如果没有这个东西,要他有何用。这个神奇的东西叫Scheduler,线程调度器。主要用在两个方法里,observeOn()和subscribeOn(),分别对应观察者操作线程和被观察操作线程。
    总共来讲有这么几个线程:
    Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
    Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
    Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
    Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
    还有个Schedulers.from(Executor executor),你可以自定义线程池来处理。

    是不是很吊!
    再次回到一开始放出来的代码:

     Observable.create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            //异步操作网络请求后
                            {
                                subscriber.onNext("这是通过RxJava");
                                subscriber.onCompleted();
                            }
    
                        }
                    })
                    .subscribeOn(Schedulers.io())//Observable操作放到io线程里
                    .observeOn(AndroidSchedulers.mainThread())//Observable操作放在UI线程里
                    .subscribe(new Action1<Object>() {//缺省的onNext调用
                        @Override
                        public void call(Object o) {
                            ts2.setText(o.toString());
                        }
                    });
    

    这样是不是就很容易理解了?

    今天就先到此为止,下一节主要介绍变换和Subject这个神奇的东西

    相关文章

      网友评论

        本文标题:RxJava入门解析(一)

        本文链接:https://www.haomeiwen.com/subject/rqzqqftx.html