RxAndroid

作者: 虫儿飞ZLEI | 来源:发表于2018-12-02 13:32 被阅读0次

    使用

    1.简单使用(类似Rx1)

    • 创建被观察者:
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    e.onNext("onNext");
                    e.onError(Exception);
                    e.onComplete();
                }
    });
    
    
    • 创建观察者:
    Observer<String> observer = new Observer<String>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    //这个地方可以拿到水管的开关d,可以在适当的时候用d.dispose();来切断连接
                }
    
                @Override
                public void onNext(@NonNull String s) {
                    Log.e(TAG, "onNext: "+s);
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            };
    
    • 将观察者绑定到被观察者:
    observable.subscribe(observer);
    
    • 把代码连起来就是链式操作
    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "subscribe");
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "error");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "complete");
                }
            });
    

    2.注意:
    2.1. observable的onNext、onComplete、onError发送会在observer的onNext、onComplete、onError接受。
    2.2. 当上游发送了一个onError或者oncomplete后, 上游onError或者oncomplete之后的事件将继续发送, 而下游不再继续接收事件。
    2.3. subscribe有多个重载方法,可以选择观察者之关注onSubscribe、onNext、onComplete、onError哪一个或者那几个方法

        public final Disposable subscribe() {}
        public final Disposable subscribe(Consumer<? super T> onNext) {}
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
        public final void subscribe(Observer<? super T> observer) {}
    
    使用起来就像这样
    
    observable.subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String s) throws Exception {
    
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
    
                }
            });
    

    关于线程

    (默认上下游也就是观察者和被观察者实在同一个线程的)

    1.RxJava提供的线程:

    Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
    Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
    Schedulers.newThread() 代表一个常规的新线程
    AndroidSchedulers.mainThread() 代表Android的主线程
    

    2.改变线程的两个方法:

    .subscribeOn(Schedulers.newThread())  
     - 指定上游(被观察者)的线程,多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.                                            
    .observeOn(AndroidSchedulers.mainThread())
     - 指定下游(观察者)的线程,每调用一次observeOn() , 下游的线程就会切换一次.
    

    3.例子:读写数据库:

    public Observable<List<Record>> readAllRecords() {
            return Observable.create(new ObservableOnSubscribe<List<Record>>() {
                @Override
                public void subscribe(ObservableEmitter<List<Record>> emitter) throws Exception {
                    Cursor cursor = null;
                    try {
                        cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{});
                        List<Record> result = new ArrayList<>();
                        while (cursor.moveToNext()) {
                            result.add(Db.Record.read(cursor));
                        }
                        emitter.onNext(result);
                        emitter.onComplete();
                    } finally {
                        if (cursor != null) {
                            cursor.close();
                        }
                    }
                }
            }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
        }
    

    操作符

    其他操作符在本人githubRx下有一个Rxutils文件,那里面是rx1的操作符,2应该也有。

    1. FlatMap,
      这个返回一个 ObservableSource(相当于一个Observable),将一个Observable的结果转换为另一个Observable
      例子(结合retrofit先注册再登陆):
    retrofit的接口操作
    public interface Api {
        @GET
        Observable<LoginResponse> login(@Body LoginRequest request);
    
        @GET
        Observable<RegisterResponse> register(@Body RegisterRequest request);
    }
    
    
    ----------
    
    api.register(new RegisterRequest())            //发起注册请求
                    .subscribeOn(Schedulers.io())               //在IO线程进行网络请求
                    .observeOn(AndroidSchedulers.mainThread())  //回到主线程去处理请求注册结果
                    .doOnNext(new Consumer<RegisterResponse>() {
                        @Override
                        public void accept(RegisterResponse registerResponse) throws Exception {
                            //先根据注册的响应结果去做一些操作
                        }
                    })
                    .observeOn(Schedulers.io())                 //回到IO线程去发起登录请求
                    .flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
                        @Override
                        public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
                            return api.login(new LoginRequest());//这里返回的是一个Observable
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())  //回到主线程去处理请求登录的结果
                    .subscribe(new Consumer<LoginResponse>() {
                        @Override
                        public void accept(LoginResponse loginResponse) throws Exception {
                            Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Toast.makeText(MainActivity.this, "登录失败", Toast.LENGTH_SHORT).show();
                        }
                    });
    

    2.zip操作符
    将两个或多个Observable发来的事件转换为一个Observable,再发送出去。
    应用场景:比如一个界面需要展示用户的一些信息, 而这些信息分别要从两个服务器接口中获取, 而只有当两个都获取到了之后才能进行展示, 这个时候就可以用Zip了。
    例子:

    Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {         
        @Override                                                                                      
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {                   
            Log.d(TAG, "emit 1");                                                                      
            emitter.onNext(1);                                                                         
            Thread.sleep(1000);                                                                        
    
            Log.d(TAG, "emit 2");                                                                      
            emitter.onNext(2);                                                                         
            Thread.sleep(1000);                                                                        
    
            Log.d(TAG, "emit 3");                                                                      
            emitter.onNext(3);                                                                         
            Thread.sleep(1000);                                                                        
    
            Log.d(TAG, "emit 4");                                                                      
            emitter.onNext(4);                                                                         
            Thread.sleep(1000);                                                                        
    
            Log.d(TAG, "emit complete1");                                                              
            emitter.onComplete();                                                                      
        }                                                                                              
    }).subscribeOn(Schedulers.io());                                                                   
    
    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {           
        @Override                                                                                      
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {                    
            Log.d(TAG, "emit A");                                                                      
            emitter.onNext("A");                                                                       
            Thread.sleep(1000);                                                                        
    
            Log.d(TAG, "emit B");                                                                      
            emitter.onNext("B");                                                                       
            Thread.sleep(1000);                                                                        
    
            Log.d(TAG, "emit C");                                                                      
            emitter.onNext("C");                                                                       
            Thread.sleep(1000);                                                                        
    
            Log.d(TAG, "emit complete2");                                                              
            emitter.onComplete();                                                                      
        }                                                                                              
    }).subscribeOn(Schedulers.io());                                                                   
    
    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {               
        @Override                                                                                      
        public String apply(Integer integer, String s) throws Exception {                              
            return integer + s;                                                                        
        }                                                                                              
    }).subscribe(new Observer<String>() {                    
        @Override                                                                                      
        public void onSubscribe(Disposable d) {                                                        
            Log.d(TAG, "onSubscribe");                                                                 
        }                                                                                              
    
        @Override                                                                                      
        public void onNext(String value) {                                                             
            Log.d(TAG, "onNext: " + value);                                                            
        }                                                                                              
    
        @Override                                                                                      
        public void onError(Throwable e) {                                                             
            Log.d(TAG, "onError");                                                                     
        }                                                                                              
    
        @Override                                                                                      
        public void onComplete() {                                                                     
            Log.d(TAG, "onComplete");                                                                  
        }                                                                                              
    });
    

    OOM水缸 Backpressure Flowable等

    1.简介:这个问题的由来是因为上下游发送接收速度不匹配导致。
    如果上下游在同一个线程,那么上游发一个要等下游处理完成以后才会继续发送。
    如果上下游在不同的线程,那么如果上游一直发,下游却处理的非常慢,上游会把多余的事件放到一个水缸里面,当这个水缸满了以后就会OOM了。
    手动解决:
    一是从数量上进行治理, 减少发送进水缸里的事件:
    可以用filter去滤掉一些数据,只让一部分进入水缸,但是这种,数据会丢失。
    二是从速度上进行治理, 减缓事件发送进水缸的速度
    可以让上游发送数据的时候,做一定的延时,给下游充分的处理时间。

    2.Flowable与Backpressure
    注意:使用Flowable时上下游从Observable和Observer变成了Flowable和Subscriber。

    Backpressure策略:(Flowable的默认水缸大小为128)
    BackpressureStrategy.ERROR:这种方式会在出现上下游流速不均衡的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException。
    BackpressureStrategy.BUFFER:使用一个新水缸,这个水缸大小无限制。
    BackpressureStrategy.DROP:当事件用不下时,多余的事件直接丢弃掉了。
    BackpressureStrategy.LATEST:当水缸满了以后还是会存新的事件,但是相对较老的那个事件会被抛弃。(水缸就是一个队列)

    对于Flowable.create可以在参数里面设置Backpressure策略,对于Flowable.interval这些没有这个参数可以
    onBackpressureBuffer()
    onBackpressureDrop()
    onBackpressureLatest()

    Subscription.cancel():切断水管和Disposable.dispose()效果产不多。

    Subscription.request(Long.MAX_VALUE);表明下游想要处理的事件数量,这里穿进去的int值,上游就会从水缸中发出来对应的个数,然后发完以后如果下游要继续处理事件,还可以在onNext里面在request一次,这样上游又从水缸里面取出事件发过来。

    emitter.requested()上游可以这样拿到下游请求的数量。上游每发送一个事件,这个数量就减一,下游每调用一次Subscription.request(n),就在这个数量的基础上加上n。

    Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG, "emit 1");
                    emitter.onNext(1);
                    Log.d(TAG, "emit 2");
                    emitter.onNext(2);
                    Log.d(TAG, "emit 3");
                    emitter.onNext(3);
                    Log.d(TAG, "emit complete");
                    emitter.onComplete();
                }
            }, BackpressureStrategy.ERROR); //增加了一个参数
    
            Subscriber<Integer> downstream = new Subscriber<Integer>() {
    
                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    s.request(Long.MAX_VALUE);  //注意这句代码
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
    
                }
    
                @Override
                public void onError(Throwable t) {
                     Log.w(TAG, "onError: ", t);
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            };
    
            upstream.subscribe(downstream);
    

    最后一个例子:异步线程读取文本文件,读一行打印一行。

     public static void main(String[] args) {
            practice1();
            try {
                Thread.sleep(10000000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void practice1() {
            Flowable
                    .create(new FlowableOnSubscribe<String>() {
                        @Override
                        public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                            try {
                                FileReader reader = new FileReader("test.txt");
                                BufferedReader br = new BufferedReader(reader);
    
                                String str;
    
                                while ((str = br.readLine()) != null && !emitter.isCancelled()) {
                                    while (emitter.requested() == 0) {
                                        if (emitter.isCancelled()) {
                                            break;
                                        }
                                    }
                                    emitter.onNext(str);
                                }
    
                                br.close();
                                reader.close();
    
                                emitter.onComplete();
                            } catch (Exception e) {
                                emitter.onError(e);
                            }
                        }
                    }, BackpressureStrategy.ERROR)
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.newThread())
                    .subscribe(new Subscriber<String>() {
    
                        @Override
                        public void onSubscribe(Subscription s) {
                            mSubscription = s;
                            s.request(1);
                        }
    
                        @Override
                        public void onNext(String string) {
                            System.out.println(string);
                            try {
                                Thread.sleep(2000);
                                mSubscription.request(1);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            System.out.println(t);
                        }
    
                        @Override
                        public void onComplete() {
                        }
                    });
        }
    

    相关文章

      网友评论

          本文标题:RxAndroid

          本文链接:https://www.haomeiwen.com/subject/btejcqtx.html