美文网首页Android-RxJavaAndroid知识RxJava
RxJava从放弃到入门(一):基础篇

RxJava从放弃到入门(一):基础篇

作者: xiasuhuei321 | 来源:发表于2016-09-09 23:25 被阅读5363次

    写在前面

    RxJava我一直是很想用的,扔物线老师的文章我也看了一点,但是说实话,其中很多东西交错在一起,对于我来说有点难以理解。而且看很多文章总是看了前面忘后面,还有一些结合lambda讲的,说实话,我是懵逼的。在这里把我自己对于RxJava的一些理解,看到的一些好文记录下来。

    RxJava是啥

    Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序。

    好,对于C#不怎么了解的人一般不会知道LINQ是啥东东吧……这个介绍我们先选择略过。看看github上RxJava是怎样描述自己的。

    ** a library for composing asynchronous and event-based programs by using observable sequences. ** 一个在Java VM上使用可观测的序列来组成异步的、基于事件的程序的库。

    这句话对于还未用过、看过RxJava的人来说是比较难理解的,好在关于RxJava的资料非常多,我们可以站在巨人的肩膀上来总结。首先扔物线对于RxJava给出的关键词就是** 异步 **,归根到底它就是一个实现异步操作的库。而回过头来,再看一遍这个定义,我们可以看出另外两个关键词:可观测的序列、基于事件,你可能会说这不废话吗,这句话一共才几个词,都快给我说完了。没错,因为这句话概括的非常精准,让人难以再精简了。

    为啥要用RxJava

    Android中实现异步的工具还是有的,那么问题来了,对于我们Android开发者来说,为什么要用RxJava而不是本来的工具?

    Talk is cheap,下面选取部分我以前写的代码,用AsyncTask实现的加载数据的类:

     class DownloadTask extends AsyncTask<String, Integer, ArrayList<ImageBean>> {
    
            private ObjectOutputStream oos;
    
            @Override
            protected ArrayList<ImageBean> doInBackground(final String... params) {
                try {
                    String imageUrl = params[0];
                    HttpUtils.getJsonString(imageUrl, new HttpUtils.HttpCallbackListener() {
                        @Override
                        public void onFinish(String response) {
                            if (JsonUtils.readJsonImageBean(response) != null) {
                                imageList = JsonUtils.readJsonImageBean(response);
    //                            memoryCache.addArrayListToMemory("imageList", imageList);
    
                                if (count == 0) {
                                    //序列化imageList
                                    if (getActivity() != null) {
                                        File imageCache = FileUtils.getDisCacheDir(getActivity(), "ImageBean");
                                        try {
                                            oos = new ObjectOutputStream(new FileOutputStream(imageCache));
                                            oos.writeObject(imageList);
                                        } catch (Exception e) {
                                            e.printStackTrace();
                                        } finally {
                                            if (oos != null) {
                                                try {
                                                    oos.close();
                                                } catch (IOException e) {
                                                    e.printStackTrace();
                                                }
                                            }
                                        }
                                    }
                                    count++;
                                }
                            }
                        }
    
                        @Override
                        public void onError(Exception e) {
                            e.printStackTrace();
                        }
                    });
                    return imageList;
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return null;
            }
    

    上面那段代码表示先从文件中读取list,然后再从网络获取数据(初学时的代码,没有考虑好一些逻辑关系)。当我打算重构代码,看到这一段的时候,我的内心是崩溃的。虽然逻辑并不复杂,但是这些迷之缩进实在是看的蛋疼。那么如果我用RxJava重写一下上面的逻辑,会是怎样的呢?

            Observable.just(0, 1)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeOn(Schedulers.newThread())
                    .map(new Func1<Integer, String>() {
                        @Override
                        public String call(Integer integer) {
                            if (integer == TYPE_NETWORK) {
                                return getUrl(pageIndex, type);
                            }
                            return "cache";
                        }
                    }).subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                }
    
                @Override
                public void onNext(String s) {
                    if (s.equals("cache")) {
                       //加载缓存
                        
                    } else {
                       //从网络获取数据
                       onCompleted();
                        };
                    }
                }
            });
    

    以上代码实现的非常不科学,非常的不RxJava,但是在这里仅仅是作为一个示例,让你感受一下RxJava的特性:简洁。你可能会说这哪里简洁了啊?代码量不跟以前差不多吗,是的,甚至有的时候代码量还会增加一点,但是这样的代码能让你感觉到清晰的逻辑。一切逻辑都在链子里了,而且如果你使用lambda会得到更加简洁的代码。。

    这就是我们要用RxJava的原因之一了:** 简洁 **

    这里的简洁不是指代码量的少,而是指代码逻辑的简洁。而且这种优势随着逻辑的复杂而更加明显。在这里我并不会将lambda和RxJava结合在一起,一是因为自己的确不熟,二也是因为自己初接触RxJava,对于我这种入门级选手还是要先排除一些干扰项

    RxJava核心&基础

    在开始撸RxJava的代码之前,我们首先要弄清楚RxJava中的三个基本也是核心的概念:观察者(Observer)、订阅(Subscribe())和被观察者(Observable)。熟悉设计模式的你可能会立刻想到,这不就是观察者模式吗。是的,就是观察者模式。

    订阅.png

    观察者模式定义了对象间一种一对多的依赖关系,每当一个对象状态发生改变,所有依赖于它的对象都会得到通知并被自动更新。在Android中比较经典的例子有Button的点击,只有当Button被点击的时候,观察者OnClickListener在Button的点击状态发生改变时将点击事件传送给注册的OnClickListener。而对于RxJava来说也是如此,接下来我将换一种我喜欢的描述来讲解我所理解的RxJava。RxJava中Observable是发射数据的源,无论他是“热”启动还是“冷”启动,总之,他最终都是用来发射数据的。Observer则是数据接收者,而Observer和Observable则通过subscribe()(订阅)结合在一起,从而达到Observer接收Observable发射的数据的目的。

    在讲解完了RxJava的核心之后,还需要注意一些细节:
    在RxJava的文档中指出,无论哪种语言,你的观察者(Observer)需要实现以下方法的子集:

    • onNext(T item)
      Observable调用这个方法发射数据,方法的参数就是Observable发射的数据,这个方法调用次数取决于实现。

    • onError(Exception e)
      Observable遇到错误时会调用这个方法,这个调用会终止Observable,onError和以下将要介绍的onComplete是互斥的,即同一个事件序列中二者只能有一个被调用。

    • onComplete()
      正常终止

    好了,烦人的概念时间终于过去了,让我们开始愉悦的Hello World时间!

    Hello World!

    打码之前记得加上依赖:

    compile 'io.reactivex:rxjava:1.0.14'
    compile 'io.reactivex:rxandroid:1.0.1'

    我这的依赖好像还是看扔物线文的时候添加的……应该比较老了……

    话不多说直接上码:

            //创建被观察者
            Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    //回调
                    subscriber.onNext("Hello");
                    subscriber.onNext("World");
                    subscriber.onNext("!");
                }
            }).subscribe(new Subscriber<String>() {//被订阅
                @Override
                public void onCompleted() {
                    Log.e(TAG, "onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError");
                }
    
                @Override
                public void onNext(String s) {
                    Log.e(TAG, s);
                }
            });
    
    hello world.png

    刚接触到这一坨代码你可能会说卧槽这什么东西,大兄弟先别忙着走,我那么写只是为了把RxJava链式调用的特点展现在你面前,接下来让我们从Observeable和Observer的创建开始。

    • 创建Observable
      创建Observable的方式非常的多,先介绍一下非常基本的** Create **
            Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("Hello");
                    subscriber.onNext("World");
                    subscriber.onNext("!");                
                }
            })
    

    通过create()方法可以创建一个Observable对象我们是知道了,那么create()方法中的参数是什么呢?

        /**
         * Invoked when Observable.subscribe is called.
         */
        public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
            // cover for generics insanity
        }
    

    这个参数是个接口,那么很明显了——这个参数是用来干回调这事的,发射数据的时候将会用这个接口的实现类通过这个参数发射数据。而call方法则来源于其父接口Action1。call这个方法给出了一个subscriber参数,让我们看一下这个subscriber究竟是谁。

    是谁?.png

    可能你会说这不废话吗……闭着眼我都能知道这是炮姐……继续我们的话题,在这个类实现的接口里我们发现了一个看起来非常熟悉的东西** Observer **是了,这个subscriber就是一个订阅者,一个订阅者加强版。他相对于Observer主要有两点区别,一个onStart()会在订阅开始,事件发送前执行,所以你可以在这个方法里做一些准备工作。另一点是实现的另外一个接口提供的unsubscribe(),取消订阅。据扔物线的文章说,不取消订阅可能会有内存泄露的风险,关于这一点很容易理解,异步可能会由于生命周期长短问题引发内存泄漏,在这里就不多加赘述了。

    • 创建Observer
      看完了Observable的创建,我们再来看一下Observer的创建
            Observer<String> observer = new Observer<String>() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(String s) {
                    Log.e(TAG,s);
                }
            };
    

    按照老样子,点进Observer发现这个也是个接口,那么我们在使用多态创建这个方法的时候必须要实现他的三个方法。

    • 订阅
    observable.subscribe(observer);
    

    上面的代码看起来像是observable订阅了observer,但事实上这种设计是出于流式api的考虑,什么是流式api?看看我的hello world实例代码是怎么写的,那就是流式api设计的好处。整个代码看起来像是一个链子,优雅而简洁。

    好了,最基本的介绍完了,你现在可以去尝试一下你的Hello World了。不过在尝试之前,我需要纠正我上述Hello World示例代码的一个错误:RxJava文档中对于Observable的描述有这么一段话,一个形式正确的Observable必须尝试调用一次onCompleted或者调用一次onError方法。很明显,我的demo是一个使用方法错误的例子。此处对Observable和Observer的api介绍非常的少,因为我觉得一次性把文档上的方法全给你搬上来并不明智,一是用不上那么多,二是容易混淆。

    线程控制——Scheduler###

    终于要到重点了,线程控制绝对是RxJava的重点之一。在不指定线程的情况下,RxJava遵循的是线程不变的原则,在哪个线程调用subscribe(),就在哪个线程生产、消费事件。这对于大部分开发人员来说都是难以接受的事,因为如果是耗时操作可能会阻塞当前线程,这是开发者不想看到的,好在我们是可以切换线程的。下面同样是摘自的描述:

    • Schedulers.computation( ):用于计算任务,如事件循环或回调处理,不要用于IO操作,默认线程数等于处理器的数量。

    • Schedulers.from(executor):使用指定的Executor作为调度器

    • Schedulers.immediate( ):在当前线程立即开始执行任务

    • Schedulers.io( ):用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;Schedulers.io()默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器。

    • Schedulers.newThread( ):为每个任务创建一个新线程

    • Schedulers.trampoline( ):当其它排队的任务完成后,在当前线程排队开始执行

    这里只是初步的了解一下,毕竟本文定位是一篇基础级的文,以下给出一个简单的加载图片的例子。

    简单的例子

    首先明确我们要干的事:通过一个url加载一张图,恩为了演示RxJava和线程控制,我用HttpUrlConnection来做一个实例。

            Observable.create(new Observable.OnSubscribe<Bitmap>() {
                @Override
                public void call(Subscriber<? super Bitmap> subscriber) {
                    try {
                        URL url = new URL("http://img4.imgtn.bdimg.com/it/u=815679381,647288773&fm=21&gp=0.jpg");
                        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
                        connection.setRequestMethod("GET");
                        connection.setConnectTimeout(8000);
                        connection.setReadTimeout(8000);
    
                        InputStream in = connection.getInputStream();
                        Bitmap bitmap = BitmapFactory.decodeStream(in);
                        subscriber.onNext(bitmap);
                    } catch (MalformedURLException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
    
                }
            })
                    .observeOn(AndroidSchedulers.mainThread())  //指定subscriber的回调发生在UI线程
                    .subscribeOn(Schedulers.newThread())        //指定subscribe()发生在新线程
                    .subscribe(new Subscriber<Bitmap>() {
                        @Override
                        public void onCompleted() {
                            plan.setVisibility(View.GONE);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            e.printStackTrace();
                        }
    
                        @Override
                        public void onNext(Bitmap bitmap) {
                            if (bitmap != null) {
                                image.setImageBitmap(bitmap);
                                onCompleted();
                            }
                        }
                    });
        }
    
    炮姐.gif

    上面的代码写的很清楚了,我是通过observeOn(AndroidSchedulers.mainThread())指定订阅者的回调发生在主线程,因为这里给ImageView设置图片需要在主线程进行,通过subscribeOn(Schedulers.newThread())指定subscribe()发生在新线程。

    最后请忽略最后几秒的蜜汁小圆点,因为我不摸屏幕AndroidStudio的录制就会停留在加载出图片后的那一段时间,录制出来的效果非常差。我加载的这张图是非常小的,我通过限制wifi网速为5k/s来达到“加载”这个目的。

    一些反思

    本文说的并不深入,只是一篇基础,看完了这篇可能你只能写两个小demo。但是就如我上文所说的,我认为学一个东西,基础是十分重要的,只要你梳理清楚基础和关键,学习起来无疑是事半功倍的。

    我在文章最开头写的demo我为什么要说这很不“RxJava”?因为我只是传递了两个Integer类型的数,之后通过map操作符将这两个转换为String,在订阅的回调里处理这两个字符,并执行相应的逻辑。这给我的感觉就和以前写代码的感觉差不多,没有一种链式调用的爽快感,反而有一种强行用RxJava的感觉。那么RxJava的应用场景和操作符究竟有什么玄机?我会继续探索,继续分享。请期待~

    推荐资料

    给Android开发者的RxJava详解:http://gank.io/post/560e15be2dca930e00da1083#toc_14

    ReactiveX/RxJava文档中文版:https://mcxiaoke.gitbooks.io/rxdocs/content/

    大头鬼深入浅出RxJava系列:http://blog.csdn.net/lzyzsd/article/details/44094895

    iamxiarui探索RxJava系列,也是我比较推荐的入门文,我这同学总结和配图都是一流的:http://www.jianshu.com/p/856297523728

    相关文章

      网友评论

      • qmr777:rxjava2都出了,然而现在1都搞不明白😂
      • 孟威:学习一下
      • 呆T_T呆:讲的不错,很适合我这种初学者🤓🤓
        xiasuhuei321:@呆T_T呆 :sunglasses: 这一篇的目的就是讲清楚几个基本的东西,还会有下篇哟~
      • a8a6342bd3ab:感觉和抛物线的文章很像~
        b3f565d2994d:@zc623 那个作者叫 扔物线 扔物线 扔物线, :sweat:

      本文标题:RxJava从放弃到入门(一):基础篇

      本文链接:https://www.haomeiwen.com/subject/zawjettx.html