Rx学习笔记和总结

作者: 菜鸟_一枚 | 来源:发表于2016-07-23 14:21 被阅读689次

    RxAndroid学习笔记和总结

    前言

    rx系列貌似在前一阶段火起来的,也是自己接触的少,一直没有去学习,今天就趁着周六,脑补一下。

    什么是Rx

    • Rx是响应式编程的意思,本质上就是观察者设计模式,是以观察者(Observer)和订阅者(Subscriber)为基础的异步响应方式
    • 在Android编程的时候,经常使用后台线程,那么就可以使用这种方式,能够使得逻辑比较清晰明了(有的人说会增加好多的代码,但是我觉得代码的链式结构让代码看起来更加简洁明了)

    Rx模式以及有点

    优势一

    • 创建:Rx可以方便的创建事件流和数据流
    • 组合:Rx使用查询式的操作符合组合和变换数据流
    • 监听:Rx可以订阅任何可观察的数据量并执行操作

    优势二(简化代码)

    • 函数式风格:对可观察数据流使用无副作用的输入流输出函数,避免了程序里面的错综复杂的状态
    • 简化代码:Rx的操作符通常可以将复杂的难题简化成很少的几行代码(配合lambda表达式还能简化)
    • 异步错误处理:Rx提供了何时的错误处理机制
    • 轻松使用并发:Rx的Observables和Schedulers让开发着可以很方便的切换UI线程和子线程,摆脱底层的线程同步和各种并发问题

    响应式编程

    Rx提供了一系列的操作符,你可以使用它们来过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable,这些操作符让执行和符合变得非常高效。
    你可以把Observable当做Iterable的推送方式的等价物,使用Iterable,消费者从生产者那拉取数据,线程阻塞直至数据准备好,使用Observable,在数据准备好的时候,生产者将数据推送给消费者,数据可以同步或者异步的到达,方式更加灵活。

    RxJava观察者模式

    • 需求:A对象(观察者)对B对象(被观察者)的某种变化高度敏感,需要在B变化的一瞬间做出反应。
    • RxJava四个基本概念
      • Observable(被观察者)
      • Observer(观察者)
      • subscribe(订阅)
      • 事件
    • Observable和Observer通过subscribe()方法实现订阅的关系,从而Observable可以在需要的时候发出事件来通知Observer。

    关于理论的知识,网上的介绍的太多了,大家可以去看下,在文章的结尾,我也会附几篇好的文章。

    手动实现观察者模式

    首先我们需要有观察者和被观察者。

    被观察者接口(里面简单的定义添加观察者,移除观察者,通知观察者三个方法)
    public interface Watched {
        //添加观察者
        public void addWatcher(Watcher watcher);
        //移除观察者
        public void removeWatcher(Watcher watcher);
        //通知观察者
        public void notifyWathers(String str);
    }
    
    观察者接口(定义更新的方法)
    public interface Watcher {
        //数据变化进行更新
        public void update(String str);
    }
    
    
    被观察者实现类
    
    public class ConcreteWathed implements Watched {
        //观察者
        List<Watcher> mList = new ArrayList<>();
    
        @Override
        public void addWatcher(Watcher watcher) {
            mList.add(watcher);
        }
    
        @Override
        public void removeWatcher(Watcher watcher) {
            mList.remove(watcher);
        }
    
        @Override
        public void notifyWathers(String str) {
            for (Watcher w : mList) {
                w.update(str);
            }
        }
    }
    
    观察者实现类
    public class ConcreteWather implements Watcher {
        @Override
        public void update(String str) {
            System.out.println(str);
        }
    }
    
    测试类
     public static void main(String[] args){
            Watched watched = new ConcreteWathed();
            Watcher watcher1 = new ConcreteWather();
            Watcher watcher2 = new ConcreteWather();
            Watcher watcher3 = new ConcreteWather();
    
            watched.addWatcher(watcher1);
            watched.addWatcher(watcher2);
            watched.addWatcher(watcher3);
    
            watched.notifyWathers("I go");
        }
    
    输出结果
    I go
    I go
    I go
    
    

    当然了,这只是简单的实现,只要晓得原理就行,除了自己实现,官方也给我们提供了观察者与被观察者接口。只要我们去实现接口就可以了。

    利用系统提供的类和接口实现观察者模式

    被观察者

    public class XTObservable extends Observable {
    
        private int data = 0;
    
        public int getData(){
            return data;
        }
    
        public void setData(int i){
            if (this.data != i){
                this.data = i;
                setChanged();//发生改变
                notifyObservers();//通知观察者
            }
        }
    }
    
    

    观察者

    
    public class XTobserver implements Observer {
    
        public XTobserver(XTObservable observable) {
            observable.addObserver(this);
        }
    
        @Override
        public void update(Observable observable, Object o) {
            System.out.println("data is changed" + ((XTObservable) observable).getData());
        }
    }
    

    测试类

    public class Test {
    
        public static void main(String[] args) {
            XTObservable mObservable = new XTObservable();
            XTobserver mXTobserver = new XTobserver(mObservable);
            mObservable.setData(1);
            mObservable.setData(2);
            mObservable.setData(3);
        }
    }
    
    

    输出结果

    data is changed1
    data is changed2
    data is changed3
    

    上面已经手动实现观察者模式和通过系统提供类实现,当然这都不是重点,重点是Rx响应式编程

    RxAndroid使用

    一:使用前配置

    在项目工程的build.gradle文件添加这样的一句话(如果使用lambda)

     classpath 'me.tatarka:gradle-retrolambda:2.5.0'(这一句在gradle版本下面紧接着)
    

    在该module工程的build.gradle文件中添加

    apply plugin: 'me.tatarka.retrolambda'(使用lambda)在文件的第二行
    

    在buildTypes节点的下(不是节点内)添加下面一句

     compileOptions {
            sourceCompatibility JavaVersion.VERSION_1_8
            targetCompatibility JavaVersion.VERSION_1_8
        }
    

    然后在依赖中添加下面几句(没有提示一定添加的可以根据自己选择性添加)

    //rx一定添加
     compile 'io.reactivex:rxjava:1.1.0'
        compile 'io.reactivex:rxandroid:1.1.0'
        compile 'com.google.code.gson:gson:2.4'
        compile 'com.jakewharton:butterknife:7.0.1'
        compile 'com.squareup.picasso:picasso:2.5.2'
        //添加
        compile 'com.squareup.okhttp3:okhttp:3.+'
    

    至此,使用环境已经配置好了,接下来我们来简单的使用一下。

    利用create创建来使用Rx

    /**
         * 使用create方式
         */
        public static void createObserable() {
            //定义被观察者
            Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    if (!subscriber.isUnsubscribed()) { //观察者和被观察者还有订阅消息
                        subscriber.onNext("hello"); //返回的数据
                        subscriber.onNext("hi");
                        subscriber.onNext(getUserName());  //因为是传入的是字符串泛型
                        subscriber.onCompleted(); //完成
                    }
                }
            });
    
            //定义观察者
            Subscriber showSub = new Subscriber() {
                @Override
                public void onCompleted() {
                    Log.i(TAG, "onCompleted");   //用于对话框消失
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.i(TAG, e.getMessage());   //错误处理
                }
    
                @Override
                public void onNext(Object o) {
                    Log.i(TAG, o.toString());
                }
            };
    
            observable.subscribe(showSub); //两者产生订阅
        }
    
        /**
         * 可以用来写成我们的下载返回数据
         *
         * @return
         */
        public static String getUserName() {
            return "jsonName";
        }
    

    在主activity中调用,我们来看下控制台输出的结果:


    也是一个测试,打印

      /**
         * 打印的功能  链式结构,更加易于代码的可毒性
         */
        public static void createPrint() {
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    if (!subscriber.isUnsubscribed()) {
                        for (int i = 0; i < 10; i++) {
                            subscriber.onNext(i);
                        }
                        subscriber.onCompleted();
                    }
                }
            }).subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    Log.i(TAG, "onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.i(TAG, e.getMessage());
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.i(TAG, "result--->:" + integer);
                }
            });
        }
    
    

    看下控制台结果


    from函数

     /**
         * 使用在被观察者,返回的对象一般都是数据类型
         * 它接收一个集合作为输入,然后每次输出一个元素给subscriber
         */
        public static void from() {
            Integer[] items = {1, 2, 3, 4, 5, 6, 7, 8};
            Observable onservable = Observable.from(items);
            onservable.subscribe(new Action1() {
                @Override
                public void call(Object o) {
                    Log.i(TAG, o.toString());
                }
            });
        }
    

    控制台结果


    interval函数

    /**
         * 指定某一时刻进行数据发送
         * interval()函数的两个参数:一个指定两次发射的时间间隔,另一个是用到的时间单位
         */
        public static void interval() {
            Integer[] items = {1, 2, 3, 4};
            Observable observable = Observable.interval(1, 1, TimeUnit.SECONDS);
            observable.subscribe(new Action1() {
                @Override
                public void call(Object o) {
                    Log.i(TAG, o.toString());
                }
            });
        }
    

    just函数

      /**
         * 假如我们只有3个独立的AppInfo对象并且我们想把他们转化为Observable并填充到RecyclerView的item中:
         * 这里我们有两个数组,然后通过转化为Observable组成一个item
         */
        public static void just() {
            Integer[] items1 = {1, 2, 3, 4};
            Integer[] items2 = {2, 4, 6, 8};
    
            Observable observable = Observable.just(items1, items2);
            observable.subscribe(new Subscriber<Integer[]>() {
                @Override
                public void onCompleted() {
                    Log.i(TAG, "onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.i(TAG, e.getMessage());
                }
    
                @Override
                public void onNext(Integer[] integers) {
                    for (int i = 0; i < integers.length; i++) {
                        Log.i(TAG, "result--->" + i);
                    }
                }
            });
        }
    

    输出结果:


    range函数

      /**
         * 指定输出数据的范围
         */
        public static void range() {
            Observable observable = Observable.range(1, 4);
            observable.subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    Log.i(TAG, "onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.i(TAG, e.getMessage());
                }
    
                @Override
                public void onNext(Integer o) {
                    Log.i(TAG, "next---->" + o);
                }
            });
        }
    

    输出结果:


    filter函数

     /**
         * 使用过滤功能  发送消息的时候,先过滤在发送
         */
        public static void filter() {
            Observable observable = Observable.just(1, 2, 3, 4, 5, 6);
            observable.filter(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer o) {
                    return o < 5;
                }
            }).observeOn(Schedulers.io()).subscribe(new Subscriber() {
                @Override
                public void onCompleted() {
                    Log.i(TAG, "onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.i(TAG, e.getMessage());
                }
    
                @Override
                public void onNext(Object o) {
                    Log.i(TAG, o.toString());
                }
            });
        }
    
    

    输出结果:



    好了,几个常用到的函数已经介绍完了,接下来就用几个例子来说验证一下吧。

    使用Rx+OkHttp下载图片

    Rx下载的封装
     /**
         * 声明一个被观察者对象,作为结果返回
         */
        public Observable<byte[]> downLoadImage(String path) {
            return Observable.create(new Observable.OnSubscribe<byte[]>() {
                @Override
                public void call(Subscriber<? super byte[]> subscriber) {
                    if (!subscriber.isUnsubscribed()) {  //存在订阅关系
                        //访问网络操作
                        //请求体
                        Request request = new Request.Builder().url(path).get().build();
                        //异步回调
                        mOkHttpClient.newCall(request).enqueue(new Callback() {
                            @Override
                            public void onFailure(Call call, IOException e) {
                                subscriber.onError(e);
                            }
    
                            @Override
                            public void onResponse(Call call, Response response) throws IOException {
                                if (response.isSuccessful()) {
                                    byte[] bytes = response.body().bytes();
                                    if (bytes != null) {
                                        subscriber.onNext(bytes);  //返回结果
                                    }
                                }
                                subscriber.onCompleted();  //访问完成
                            }
                        });
    
                    }
                }
            });
        }
    
    在使用的时候调用
       //使用HTTP协议获取数据
                mUtils.downLoadImageOne(url)
                        .subscribeOn(Schedulers.io())  //在子线程请求
                        .observeOn(AndroidSchedulers.mainThread()) //结果返回到主线程这一步很厉害啊,不用我们去用handler或者async切换线程了
                        // 主要我们去调用一下代码,就已经帮我们切换好了线程,是不是感觉有点很厉害啊
                        .subscribe(new Subscriber<byte[]>() {
                    @Override
                    public void onCompleted() {
                        Log.i(TAG,"onCompleted");//对话框消失
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        Log.i(TAG,e.getMessage());
                    }
    
                    @Override
                    public void onNext(byte[] bytes) {
                        Bitmap bitmap = BitmapFactory.decodeByteArray(bytes,0,bytes.length);
                        mImageView.setImageBitmap(bitmap);
                    }
                });
    

    Rx+okhttp实现登录

      /**
         * 
         * @param url  登录地址
         * @param params  请求参数
         * @return   后台返回的数据
         */
        public Observable<String> login(String url, Map<String, String> params) {
    
            return Observable.create((Observable.OnSubscribe<String>) subscriber -> {
                if (!subscriber.isUnsubscribed()) {
                    //创建formbody
                    FormBody.Builder builder = new FormBody.Builder();
                    if (params != null && !params.isEmpty()) {
                        //循环获取body中的数据
                        for (Map.Entry<String, String> entry : params.entrySet()) {
                            builder.add(entry.getKey(), entry.getValue());
                        }
                    }
                    //请求体
                    RequestBody requestBody = builder.build();
                    Request request = new Request.Builder().url(url).post(requestBody).build();
                    mOkHttpClient.newCall(request).enqueue(new Callback() {
                        @Override
                        public void onFailure(Call call, IOException e) {
                            subscriber.onError(e);
                        }
    
                        @Override
                        public void onResponse(Call call, Response response) throws IOException {
                            if (response.isSuccessful()) {
                                //交给观察者处理数据
                                subscriber.onNext(response.body().string());
                            }
                            //完成的回调
                            subscriber.onCompleted();
                        }
                    });
                }
            });
        }
    
    登录调用
      Map<String, String> params = new HashMap<String, String>();
                params.put("username", userName.getText().toString().trim());
                params.put("password", passWord.getText().toString().trim());
                mUtils.login(url, params).subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        Log.i(TAG, "onCompleted");
                    }
    
                    @Override
                    public void onError(Throwable e) {
                        Log.i(TAG, e.getMessage());
                    }
    
                    @Override
                    public void onNext(String s) {
                        if (JsonUtils.parse(s)) {
                            Intent intent = new Intent(LoginActivity.this, ContentActivity.class);
                            startActivity(intent);
                        }
                    }
                });
    

    如果有想需要代码的,可以看这里,所有代码已经传至github。https://github.com/wuyinlei/RxAndroidDemo
    好了,就先介绍到这里吧,这里在给大家推荐几篇比较好的博文还有。

    推荐博文

    推荐git

    结语

    Rx使用还是挺方便的,不过需要一定的学习成本,谨慎使用(嘿嘿)

    相关文章

      网友评论

        本文标题:Rx学习笔记和总结

        本文链接:https://www.haomeiwen.com/subject/xdkwjttx.html