RxJava

作者: allencaicai | 来源:发表于2017-06-12 20:45 被阅读0次

    http://www.jianshu.com/p/fe08ce770c15

    什么是Rx

    Rx是响应式编程的意思,本质上就是观察者设计模式,是以观察者(Observer)和订阅者(Subscriber)为基础的异步响应方式

    在Android编程的时候,经常使用后台线程,那么就可以使用这种方式,能够使得逻辑比较清晰明了(有的人说会增加好多的代码,但是我觉得代码的链式结构让代码看起来更加简洁明了)

    Rx模式以及有点

    优势一

    创建:Rx可以方便的创建事件流和数据流

    组合:Rx使用查询式的操作符合组合和变换数据流

    监听:Rx可以订阅任何可观察的数据量并执行操作

    优势二(简化代码)

    函数式风格:对可观察数据流使用无副作用的输入流输出函数,避免了程序里面的错综复杂的状态

    简化代码:Rx的操作符通常可以将复杂的难题简化成很少的几行代码(配合lambda表达式还能简化)

    异步错误处理:Rx提供了何时的错误处理机制

    轻松使用并发:Rx的Observables和Schedulers让开发着可以很方便的切换UI线程和子线程,摆脱底层的线程同步和各种并发问题

    响应式编程

    Rx提供了一系列的操作符,你可以使用它们来过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable,这些操作符让执行和符合变得非常高效。

    你可以把Observable当做Iterable的推送方式的等价物,使用Iterable,消费者从生产者那拉取数据,线程阻塞直至数据准备好,使用Observable,在数据准备好的时候,生产者将数据推送给消费者,数据可以同步或者异步的到达,方式更加灵活。

    RxJava观察者模式

    需求:A对象(观察者)对B对象(被观察者)的某种变化高度敏感,需要在B变化的一瞬间做出反应。

    RxJava四个基本概念

    Observable(被观察者)

    Observer(观察者)

    subscribe(订阅)

    事件

    Observable和Observer通过subscribe()方法实现订阅的关系,从而Observable可以在需要的时候发出事件来通知Observer。

    手动实现观察者模式

    首先我们需要有观察者和被观察者。

    被观察者接口(里面简单的定义添加观察者,移除观察者,通知观察者三个方法)

    publicinterfaceWatched{//添加观察者publicvoidaddWatcher(Watcher watcher);//移除观察者publicvoidremoveWatcher(Watcher watcher);//通知观察者publicvoidnotifyWathers(String str);}

    观察者接口(定义更新的方法)

    publicinterfaceWatcher{//数据变化进行更新publicvoidupdate(Stringstr);}

    被观察者实现类

    publicclassConcreteWathedimplementsWatched{//观察者List mList =newArrayList<>();@OverridepublicvoidaddWatcher(Watcher watcher){        mList.add(watcher);    }@OverridepublicvoidremoveWatcher(Watcher watcher){        mList.remove(watcher);    }@OverridepublicvoidnotifyWathers(String str){for(Watcher w : mList) {            w.update(str);        }    }}

    观察者实现类

    publicclassConcreteWatherimplementsWatcher{    @Overridepublicvoidupdate(Stringstr) {        System.out.println(str);    }}

    测试类

    publicstaticvoid main(String[] args){        Watched watched =newConcreteWathed();        Watcher watcher1 =newConcreteWather();        Watcher watcher2 =newConcreteWather();        Watcher watcher3 =newConcreteWather();        watched.addWatcher(watcher1);        watched.addWatcher(watcher2);        watched.addWatcher(watcher3);        watched.notifyWathers("I go");    }

    输出结果

    IgoIgoIgo

    当然了,这只是简单的实现,只要晓得原理就行,除了自己实现,官方也给我们提供了观察者与被观察者接口。只要我们去实现接口就可以了。

    利用系统提供的类和接口实现观察者模式

    被观察者

    publicclassXTObservableextendsObservable{privateintdata =0;publicintgetData(){returndata;    }publicvoidsetData(inti){if(this.data != i){this.data = i;            setChanged();//发生改变notifyObservers();//通知观察者}    }}

    观察者

    publicclassXTobserverimplementsObserver{publicXTobserver(XTObservable observable){        observable.addObserver(this);    }@Overridepublicvoidupdate(Observable observable, Object o){        System.out.println("data is changed"+ ((XTObservable) observable).getData());    }}

    测试类

    publicclassTest{publicstaticvoid main(String[] args) {        XTObservable mObservable =newXTObservable();        XTobserver mXTobserver =newXTobserver(mObservable);        mObservable.setData(1);        mObservable.setData(2);        mObservable.setData(3);    }}

    输出结果

    datais changed1datais changed2datais changed3

    上面已经手动实现观察者模式和通过系统提供类实现,当然这都不是重点,重点是Rx响应式编程

    RxAndroid使用

    一:使用前配置

    在项目工程的build.gradle文件添加这样的一句话(如果使用lambda)

    classpath'me.tatarka:gradle-retrolambda:2.5.0'(这一句在gradle版本下面紧接着)

    在该module工程的build.gradle文件中添加

    applyplugin:'me.tatarka.retrolambda'(使用lambda)在文件的第二行

    在buildTypes节点的下(不是节点内)添加下面一句

    compileOptions {        sourceCompatibility JavaVersion.VERSION_1_8targetCompatibility JavaVersion.VERSION_1_8}

    然后在依赖中添加下面几句(没有提示一定添加的可以根据自己选择性添加)

    //rx一定添加compile'io.reactivex:rxjava:1.1.0'compile'io.reactivex:rxandroid:1.1.0'compile'com.google.code.gson:gson:2.4'compile'com.jakewharton:butterknife:7.0.1'compile'com.squareup.picasso:picasso:2.5.2'//添加compile'com.squareup.okhttp3:okhttp:3.+'

    至此,使用环境已经配置好了,接下来我们来简单的使用一下。

    利用create创建来使用Rx

    /**

    * 使用create方式

    */publicstaticvoidcreateObserable(){//定义被观察者Observable observable = Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){if(!subscriber.isUnsubscribed()) {//观察者和被观察者还有订阅消息subscriber.onNext("hello");//返回的数据subscriber.onNext("hi");                    subscriber.onNext(getUserName());//因为是传入的是字符串泛型subscriber.onCompleted();//完成}            }        });//定义观察者Subscriber showSub =newSubscriber() {@OverridepublicvoidonCompleted(){                Log.i(TAG,"onCompleted");//用于对话框消失}@OverridepublicvoidonError(Throwable e){                Log.i(TAG, e.getMessage());//错误处理}@OverridepublicvoidonNext(Object o){                Log.i(TAG, o.toString());            }        };        observable.subscribe(showSub);//两者产生订阅}/**    * 可以用来写成我们的下载返回数据    *    *@return*/publicstaticStringgetUserName(){return"jsonName";    }

    在主activity中调用,我们来看下控制台输出的结果:

    也是一个测试,打印

    /**

    * 打印的功能  链式结构,更加易于代码的可毒性

    */publicstaticvoidcreatePrint(){        Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){if(!subscriber.isUnsubscribed()) {for(inti =0; i <10; i++) {                        subscriber.onNext(i);                    }                    subscriber.onCompleted();                }            }        }).subscribe(newSubscriber() {@OverridepublicvoidonCompleted(){                Log.i(TAG,"onCompleted");            }@OverridepublicvoidonError(Throwable e){                Log.i(TAG, e.getMessage());            }@OverridepublicvoidonNext(Integer integer){                Log.i(TAG,"result--->:"+ integer);            }        });    }

    看下控制台结果

    from函数

    /**

    * 使用在被观察者,返回的对象一般都是数据类型

    * 它接收一个集合作为输入,然后每次输出一个元素给subscriber

    */publicstaticvoidfrom(){        Integer[] items = {1,2,3,4,5,6,7,8};        Observable onservable = Observable.from(items);        onservable.subscribe(newAction1() {            @Overridepublicvoidcall(Object o){                Log.i(TAG, o.toString());            }        });    }

    控制台结果

    interval函数

    /**

    * 指定某一时刻进行数据发送

    * interval()函数的两个参数:一个指定两次发射的时间间隔,另一个是用到的时间单位

    */publicstaticvoidinterval(){        Integer[] items = {1,2,3,4};        Observable observable = Observable.interval(1,1, TimeUnit.SECONDS);        observable.subscribe(newAction1() {@Overridepublicvoidcall(Object o){                Log.i(TAG, o.toString());            }        });    }

    just函数

    /**

    * 假如我们只有3个独立的AppInfo对象并且我们想把他们转化为Observable并填充到RecyclerView的item中:

    * 这里我们有两个数组,然后通过转化为Observable组成一个item

    */publicstaticvoidjust(){        Integer[] items1 = {1,2,3,4};        Integer[] items2 = {2,4,6,8};        Observable observable = Observable.just(items1, items2);        observable.subscribe(newSubscriber() {@OverridepublicvoidonCompleted(){                Log.i(TAG,"onCompleted");            }@OverridepublicvoidonError(Throwable e){                Log.i(TAG, e.getMessage());            }@OverridepublicvoidonNext(Integer[] integers){for(inti =0; i < integers.length; i++) {                    Log.i(TAG,"result--->"+ i);                }            }        });    }

    输出结果:

    range函数

    /**

    * 指定输出数据的范围

    */publicstaticvoidrange() {        Observable observable = Observable.range(1,4);        observable.subscribe(newSubscriber() {            @OverridepublicvoidonCompleted() {Log.i(TAG,"onCompleted");            }            @OverridepublicvoidonError(Throwable e) {Log.i(TAG, e.getMessage());            }            @OverridepublicvoidonNext(Integero) {Log.i(TAG,"next---->"+ o);            }        });    }

    输出结果:

    filter函数

    /**

    * 使用过滤功能  发送消息的时候,先过滤在发送

    */publicstaticvoidfilter() {        Observable observable = Observable.just(1,2,3,4,5,6);        observable.filter(newFunc1() {            @OverridepublicBooleancall(Integero) {returno <5;            }        }).observeOn(Schedulers.io()).subscribe(newSubscriber() {            @OverridepublicvoidonCompleted() {Log.i(TAG,"onCompleted");            }            @OverridepublicvoidonError(Throwable e) {Log.i(TAG, e.getMessage());            }            @OverridepublicvoidonNext(Object o) {Log.i(TAG, o.toString());            }        });    }

    输出结果:

    好了,几个常用到的函数已经介绍完了,接下来就用几个例子来说验证一下吧。

    使用Rx+OkHttp下载图片

    Rx下载的封装

    /**

    * 声明一个被观察者对象,作为结果返回

    */publicObservable downLoadImage(String path) {returnObservable.create(newObservable.OnSubscribe(){@Overridepublicvoidcall(Subscriber subscriber){if(!subscriber.isUnsubscribed()) {//存在订阅关系//访问网络操作//请求体Request request =newRequest.Builder().url(path).get().build();//异步回调mOkHttpClient.newCall(request).enqueue(newCallback() {@OverridepublicvoidonFailure(Call call, IOException e){                            subscriber.onError(e);                        }@OverridepublicvoidonResponse(Call call, Response response)throwsIOException{if(response.isSuccessful()) {byte[] bytes = response.body().bytes();if(bytes !=null) {                                    subscriber.onNext(bytes);//返回结果}                            }                            subscriber.onCompleted();//访问完成}                    });                }            }        });    }

    在使用的时候调用

    //使用HTTP协议获取数据mUtils.downLoadImageOne(url)                    .subscribeOn(Schedulers.io())//在子线程请求.observeOn(AndroidSchedulers.mainThread())//结果返回到主线程这一步很厉害啊,不用我们去用handler或者async切换线程了// 主要我们去调用一下代码,就已经帮我们切换好了线程,是不是感觉有点很厉害啊.subscribe(newSubscriber() {@OverridepublicvoidonCompleted(){                    Log.i(TAG,"onCompleted");//对话框消失}@OverridepublicvoidonError(Throwable e){                    Log.i(TAG,e.getMessage());                }@OverridepublicvoidonNext(byte[] bytes){                    Bitmap bitmap = BitmapFactory.decodeByteArray(bytes,0,bytes.length);                    mImageView.setImageBitmap(bitmap);                }            });

    Rx+okhttp实现登录

    /**

    *

    * @param url  登录地址

    * @param params  请求参数

    * @return  后台返回的数据

    */publicObservable login(Stringurl,Mapparams) {returnObservable.create((Observable.OnSubscribe) subscriber -> {if(!subscriber.isUnsubscribed()) {//创建formbodyFormBody.Builder builder =newFormBody.Builder();if(params!=null&& !params.isEmpty()) {//循环获取body中的数据for (Map.Entry entry :params.entrySet()) {                        builder.add(entry.getKey(), entry.getValue());                    }                }//请求体RequestBody requestBody = builder.build();                Request request =newRequest.Builder().url(url).post(requestBody).build();                mOkHttpClient.newCall(request).enqueue(newCallback() {                    @OverridepublicvoidonFailure(Call call, IOException e) {                        subscriber.onError(e);                    }                    @OverridepublicvoidonResponse(Call call, Response response) throws IOException {if(response.isSuccessful()) {//交给观察者处理数据subscriber.onNext(response.body().string());                        }//完成的回调subscriber.onCompleted();                    }                });            }        });    }

    登录调用

    Mapparams=newHashMap();params.put("username", userName.getText().toString().trim());params.put("password", passWord.getText().toString().trim());            mUtils.login(url,params).subscribeOn(Schedulers.io())                    .observeOn(AndroidSchedulers.mainThread()).subscribe(newSubscriber() {                @OverridepublicvoidonCompleted() {Log.i(TAG,"onCompleted");                }                @OverridepublicvoidonError(Throwable e) {Log.i(TAG, e.getMessage());                }                @OverridepublicvoidonNext(Strings) {if(JsonUtils.parse(s)) {                        Intent intent =newIntent(LoginActivity.this, ContentActivity.class);                        startActivity(intent);                    }                }            });

    如果有想需要代码的,可以看这里,所有代码已经传至github。https://github.com/wuyinlei/RxAndroidDemo

    相关文章

      网友评论

          本文标题:RxJava

          本文链接:https://www.haomeiwen.com/subject/vshrqxtx.html