RxJava

作者: 潜心之力 | 来源:发表于2018-01-21 20:24 被阅读0次

    一、函数相应式编程

    RxJava的异步操作是通过扩展的观察者模式来实现的,它有4个角色:Observable、Observer、Subscriber、Suject,其中Observable和Observer通过subscribe方法实现订阅关系,Observable就可以在需要的时候通知Observer。使用时依赖:

    • compile 'io.reactivex:rxandroid:1.2.1'
    • compile 'io.reactivex:rxjava:1.1.6'
    //创建观察者
     Subscriber subscriber = new Subscriber() {
                @Override
                public void onCompleted() {
                    //事件队列完结
                }
    
                @Override
                public void onError(Throwable e) {
                    //事件队列异常
                }
    
                @Override
                public void onNext(Object o) {
                    //普通的事件
                }
    
                @Override
                public void onStart() {
                    //事件发送前调用,预操作
                    super.onStart();
                }
            };
    
    //创建最基本的观察者
     Observer<String> observer = new Observer<String>() {
                @Override
                public void onCompleted() {
                    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(String s) {
    
                }
            };
    
    //创建被观察者
    Observable observable = Observable.create(new Observable.OnSubscribe() {
                @Override
                public void call(Object o) {
                    //观察者与被观察者实现订阅关系
                    subscriber.onNext("wjx");
                    subscriber.onCompleted();
                }
            });
    
    //just方法实现订阅关系,传入的参数是一个不定长度的数组
    Observable observable = Observable.just("wjx","wjx","wjx");
    
    //from方法实现订阅关系
    String[] words ={"wjx","wjx"};
    Observable observable = Observable.from(words);
    
    //Subscribe订阅,即观察者订阅,参数是上面第一个创建的观察者
    observable..subscribe(subscriber);
    

    二、RxJava的不完整定义回调

    Action0 - Action09 extends Action,参数都是泛型,不同的是参数个数,如Action0没有参数,Action09有9个参数。

    //自定义Action
    Action1<String> onNextAction = new Action1<String>() {
                @Override
                public void call(String s) {
                }
            };
    
            Action1<Throwable> onErrorAction = new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                }
            };
    
            Action0 onCompletedAction = new Action0() {
                @Override
                public void call() {
                }
            };
    //不完整回调,重载有多个方法
    observable.subscribe(onNextAction,onErrorAction,onCompletedAction);
    
    //interval,按固定时间发射整数序列的Observable
    Observable.interval(3, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                      //  每3秒就调用call方法
                }
            });
    
    //rang,发送指定范围的整数序列的Observable
    Observable.range(0,5).subscribe(new Action1<Integer>() {
        @Override
         public void call(Integer integer) {
              // integer = {0,1,2,3,4}
         }
    });
    
    //repeat,执行N次特定数据的Observable
    Observable.range(0,3)
                   .repeat(2)
                   .subscribe(new Action1<Integer>() {
                       @Override
                       public void call(Integer integer) {
                           // integer = { 0,1,2 }
                           // 执行6次
                       }
                   });
    
    //flatMap、cast,将Observable发射的数据转换成Observable集合
    final String Host ="http://blog.csdn.net/";
            List<String> list = new ArrayList<>();
            list.add("wjx1");
            list.add("wjx2");
            list.add("wjx3");
            Observable.from(list).flatMap(new Func1<String, Observable<?>>() {
                @Override
                public Observable<?> call(String s) {
                    return Observable.just(Host+s);
                    // s = list.get(index);
                }
            }).cast(String.class).subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    Log.e(TAG, "call: "+s);
                    // s = Observable.just(Host+s);
                }
            });
    
    //flatMapIterable,将数据包装成Iterable
    Observable.just(1,2,3).flatMapIterable(new Func1<Integer, Iterable<?>>() {
                @Override
                public Iterable<Integer> call(Integer integer) {
                    List<Integer> list = new ArrayList<>();
                    list.add(integer+1);
                    // integer = list.get(index);
                    return list;
                }
            }).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    
                }
            });
    
    // buffer,将源Observable转换为新的Observable
    Observable.just(1,2,3,4,5,6)
            .buffer(3)
           .subscribe(new Action1<List<Integer>>() {
                @Override
                public void call(List<Integer> integers) {
                     for(Integer i :integers){
                       // buffer.size = 3;
                       // i = 1,2,3;
                    }
                }
             });
    
    // groupby
     Observable<GroupedObservable<String,String>> groupedObservableObservable =
                   Observable.just("s1","s2","s3","s4","s5")
                   .groupBy(new Func1<String, String>() {
                       @Override
                       public String call(String s) {
                           return s;
                       }
                   });
    

    三、过滤操作符

         //filter
         Observable.just(1,2,3,4)
                   .filter(new Func1<Integer, Boolean>() {
                       @Override
                       public Boolean call(Integer integer) {
                           return integer>2;
                       }
                   });
    
         //elementAt
          Observable.just(1,2,3,4)
                    .elementAt(2)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
    
                        }
                    });
    
          //distinct
          Observable.just(1,2,3,4)
                    .distinct()
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
    
                        }
                    });
    
          //skip
          Observable.just(1,2,3,4)
                    .skip(2)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
    
                        }
                    });
    
            //take
            Observable.just(1,2,3,4)
                    .take(2)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
    
                        }
                    });
    
            //ignoreElements
            Observable.just(1,2,3,4)
                    .ignoreElements()
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onCompleted() {
                            
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
    
                        }
                    });
    
            //throttleFirst
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    
                }
            }).throttleFirst(200,TimeUnit.MILLISECONDS)
            .subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    
                }
            });
    
            //throttleWithTimeout
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
    
                }
            }).throttleWithTimeout(200,TimeUnit.MILLISECONDS)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            
                        }
                    });
    

    四、组合操作符

            //startWith
            Observable.just(3,4,5).startWith(1,2)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            
                        }
                    });
    
            //merge
            Observable<Integer> observable2 = Observable.just(1,2,3)
                    .subscribeOn(Schedulers.io());
            Observable<Integer> observable3 = Observable.just(4,5,6);
            Observable.merge(observable2,observable3).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
    
                }
            });
    
            //concat
            Observable<Integer> observable1 = Observable.just(1,2,3)
                    .subscribeOn(Schedulers.io());
            Observable<Integer> observable2 = Observable.just(4,5,6);
            Observable.concat(observable1,observable2).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
    
                }
            });
    
            //zip
            Observable<Integer> observable1 = Observable.just(1,2,3)
                    .subscribeOn(Schedulers.io());
            Observable<Integer> observable2 = Observable.just(4,5,6);
            Observable.zip(observable1,observable2).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
    
                }
            });
    
            //zip
            Observable<Integer> observable1 = Observable.just(1,2,3)
                    .subscribeOn(Schedulers.io());
            Observable<Integer> observable2 = Observable.just(4,5,6);
            Observable.zip(observable1, observable2, new Func2<Integer, Integer, Object>() {
                @Override
                public Object call(Integer integer, Integer integer2) {
                    return null;
                }
            }).subscribe(new Action1<Object>() {
                @Override
                public void call(Object o) {
                    
                }
            });
    
            //combineLatest
            Observable<Integer> observable1 = Observable.just(1,2,3)
                    .subscribeOn(Schedulers.io());
            Observable<Integer> observable2 = Observable.just(4,5,6);
            Observable.combineLatest(observable1, observable2, new Func2<Integer, Integer, Object>() {
                @Override
                public Object call(Integer integer, Integer integer2) {
                    return null;
                }
            }).subscribe(new Action1<Object>() {
                @Override
                public void call(Object o) {
                    
                }
            });
    

    五、辅助操作符

            //delay
            Observable.create(new Observable.OnSubscribe<Long>() {
                @Override
                public void call(Subscriber<? super Long> subscriber) {
                    
                }
            }).delay(2,TimeUnit.SECONDS).subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    
                }
            });
    
            //doOnNext
            Observable.just(1,2)
                    .doOnNext(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            
                        }
                    }).subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
    
                }
            });
    
            //subscribeOn、observeOn
            Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                    subscriber.onCompleted();
                }
            });
            observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            
                        }
                    });
    
            //timeout
            Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    
                }
            }).timeout(200,TimeUnit.MILLISECONDS,Observable.just(10,11));
            observable.subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    
                }
            });
    

    六、错误处理操作符

            //catch
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onCompleted();
                }
            }).onErrorReturn(new Func1<Throwable, Integer>() {
                @Override
                public Integer call(Throwable throwable) {
                    return 8;
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {
                    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
    
                }
            });
    
            // retry
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onCompleted();
                }
            }).retry(2).subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {
                    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
    
                }
            });
    

    七、条件操作符和布尔操作符

            // all
            Observable.just(1,2,3,4)
                    .all(new Func1<Integer, Boolean>() {
                        @Override
                        public Boolean call(Integer integer) {
                            return integer<3;
                        }
                    }).subscribe(new Subscriber<Boolean>() {
                @Override
                public void onCompleted() {
                    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(Boolean aBoolean) {
    
                }
            });
    
            // contains
            Observable.just(1,2,3).contains(1)
                    .subscribe(new Action1<Boolean>() {
                        @Override
                        public void call(Boolean aBoolean) {
    
                        }
                    });
    
            // isEmpty
            Observable.just(1,2,3).()
                    .subscribe(new Action1<Boolean>() {
                @Override
                public void call(Boolean aBoolean) {
    
                }
            });
    
          // amb
         Observable.amb(Observable.just(1,2,3).delay(2,TimeUnit.SECONDS),
                    Observable.just(4,5,6)).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
    
                }
            });
    
            // defaultIfEmpty
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onCompleted();
                }
            }).defaultIfEmpty(3).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
    
                }
            });
    

    八、转换操作符

            // toList
            Observable.just(1,2,3).toList().subscribe(new Action1<List<Integer>>() {
                @Override
                public void call(List<Integer> integers) {
                    
                }
            });
    
            // toSortedList
            Observable.just(3,1,2).toSortedList().subscribe(new Action1<List<Integer>>() {
                @Override
                public void call(List<Integer> integers) {
    
                }
            });
    
            // toMap
            Observable.just("s1","s2","s3").toMap(new Func1<String, String>() {
                @Override
                public String call(String s) {
                    return s;
                }
            }).subscribe(new Action1<Map<String, String>>() {
                @Override
                public void call(Map<String, String> stringStringMap) {
                    
                }
            });
    

    相关文章

      网友评论

          本文标题:RxJava

          本文链接:https://www.haomeiwen.com/subject/sxukoxtx.html