美文网首页
每天吃一点Androidの响应式编程之RxJava\RxAndr

每天吃一点Androidの响应式编程之RxJava\RxAndr

作者: 耑意儿 | 来源:发表于2018-08-28 16:11 被阅读344次
    目录:
    1、前言
    2、创建
    3、订阅
    4、线程控制:调度器Scheduler
    5、网络请求失败,重试
    
    Android小仙
    感谢以下文章提供的指导:
    给Android开发者的RxJava详解
    RxJava/RxAndroid 使用实例实践

    一、前言

    1、RxJava到底是做什么的?

    异步:
    也就是说,RxJava是一个实现异步操作的库

    2、同样是做异步,RxJava跟AsyncTask / Handler / XXX / ... 有什么区别?

    简洁:
    随着程序逻辑变得越来越复杂,依然能够保持简洁。
    逻辑简洁
    在AndroidStudio中可以实现Lambda化预览

    3、目的:“后台处理,前台回调”的异步机制

    二、创建

    2.1、创建观察者
    Observer<String> observer = new Observer<String>() {
        @Override
        public void onNext(String s) {
            Log.d(tag, "Item: " + s);
        }
    
        @Override
        public void onCompleted() {
            Log.d(tag, "Completed!");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(tag, "Error!");
        }
    };
    
    2.2、创建订阅者
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String s) {
            Log.d(tag, "Item: " + s);
        }
    
        @Override
        public void onCompleted() {
            Log.d(tag, "Completed!");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(tag, "Error!");
        }
    };
    
    
    2.3、关于订阅者与观察者

    订阅者内部实现了观察者的基本方法,从某种程度上来讲,订阅者同时扮演了观察者这个角色。

    2.4、订阅者与观察者的区别
    • onStart(): 处理事件还未触发前的准备工作
    • unSubscribe(): 取消订阅
    2.5、创建被观察者

    被观察者持有订阅者对象,当事件执行到相应阶段时,调用订阅者的方法以达到提醒的作用。

    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello");
            subscriber.onNext("Hi");
            subscriber.onNext("Aloha");
            subscriber.onCompleted();
        }
    });
    
    • create 是最基本的创造事件序列的方法
    • just(T...) 快捷创建事件序列的方法
    Observable observable = Observable.just("Hello", "Hi", "Aloha");
    // 将会依次调用:
    // onNext("Hello");
    // onNext("Hi");
    // onNext("Aloha");
    // onCompleted();
    
    • from(T[])/from(Iterable<? extends T>):将传入的数组或者Iterable拆分成具体的对象之后,依次执行发送

    三、订阅

    给 被观察者Observable 配置 观察者Observer/Subscriber

    observable.subscribe(observer);
    // 或者:
    observable.subscribe(subscriber);
    

    其实真的很别扭,按照逻辑来说应该是观察者订阅被观察者,但是从代码上来看却变成了反过来的
    !!!

    四、线程控制:调度器Scheduler

    RxJava遵循的是线程不变的原则,在哪个线程调用subscribe就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。
    如果需要切换线程,那就需要用到调度器了。

    • Scheduler.immediate():在当前线程下执行
    • Scheduler.newThread():启动新线程,并在新线程下执行操作
    • Scheduler.io():I/O操作【读写文件、读写数据库、网络信息交互等】
      newThread()效率更高,且不要在io()中做计算工作
    • Scheduler.computation():计算操作。CPU密集型计算
    • AndroidSchedulers.mainThread():在主线程执行操作。
    4.1、示例

    栗子一、

    Observable.just(1, 2, 3, 4)
         // 指定 subscribe() 发生在 IO 线程
        .subscribeOn(Schedulers.io()) 
        // 指定 Subscriber 的回调发生在主线程
        .observeOn(AndroidSchedulers.mainThread()) 
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer number) {
                Log.d(tag, "number:" + number);
            }
        });
    

    栗子二、

    int drawableRes = ...;
    ImageView imageView = ...;
    Observable.create(new OnSubscribe<Drawable>() {
        @Override
        public void call(Subscriber<? super Drawable> subscriber) {
            Drawable drawable = getTheme().getDrawable(drawableRes));
            subscriber.onNext(drawable);
            subscriber.onCompleted();
        }
    })
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(new Observer<Drawable>() {
        @Override
        public void onNext(Drawable drawable) {
            imageView.setImageDrawable(drawable);
        }
    
        @Override
        public void onCompleted() {
        }
    
        @Override
        public void onError(Throwable e) {
            Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
        }
    });
    

    五、重试

    1、重试类
    public class RetryWithDelay implements Function<Observable<Throwable>, ObservableSource<?>> {
    
        public final String TAG = this.getClass().getSimpleName();
        private final int maxRetries;
        private final int retryDelaySecond;
        private int retryCount;
    
        public RetryWithDelay(int maxRetries, int retryDelaySecond) {
            this.maxRetries = maxRetries;
            this.retryDelaySecond = retryDelaySecond;
        }
    
        @Override
        public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
            return throwableObservable
                    .flatMap(new Function<Throwable, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
                            if (++retryCount <= maxRetries) {
                                // When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
                                Log.d(TAG, "Observable get error, it will try after " + retryDelaySecond
                                        + " second, retry count " + retryCount);
                                return Observable.timer(retryDelaySecond,
                                        TimeUnit.SECONDS);
                            }
                            // Max retries hit. Just pass the error along.
                            return Observable.error(throwable);
                        }
                    });
        }
    }
    
    2、网络请求
    HttpApi.api(getActivity()).getAdImage()
                    // 表示getAdImage执行在I/O线程
                    .subscribeOn(Schedulers.io())
                    // 重试,三次,延迟1秒
                    .retryWhen(new RetryWithDelay(2,1))
                    // 在开始网络请求前的预操作
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(Disposable disposable) throws Exception {
                            getActivity().showToast("要准备开始网络请求咯!!!");
                        }
                    })
                    // 表示上面的doOnSubscribe执行在主线程
                    .subscribeOn(AndroidSchedulers.mainThread())
                    // 表示下面的观察者执行在主线程
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<List<String>>() {
                        @Override
                        public void accept(List<String> images) throws Exception {
                            adImageUrls.clear();
                            adImageUrls.addAll(images);
                            getView().setListData(images);
                        }
                        // 异常处理
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            getActivity().showToast(getActivity().getString(R.string.net_loading_failed));
                        }
                    });
    
    
    

    相关文章

      网友评论

          本文标题:每天吃一点Androidの响应式编程之RxJava\RxAndr

          本文链接:https://www.haomeiwen.com/subject/hekqwftx.html