RxJava

作者: 潜心之力 | 来源:发表于2018-01-21 20:24 被阅读0次

一、函数相应式编程

RxJava的异步操作是通过扩展的观察者模式来实现的,它有4个角色:Observable、Observer、Subscriber、Suject,其中Observable和Observer通过subscribe方法实现订阅关系,Observable就可以在需要的时候通知Observer。使用时依赖:

  • compile 'io.reactivex:rxandroid:1.2.1'
  • compile 'io.reactivex:rxjava:1.1.6'
//创建观察者
 Subscriber subscriber = new Subscriber() {
            @Override
            public void onCompleted() {
                //事件队列完结
            }

            @Override
            public void onError(Throwable e) {
                //事件队列异常
            }

            @Override
            public void onNext(Object o) {
                //普通的事件
            }

            @Override
            public void onStart() {
                //事件发送前调用,预操作
                super.onStart();
            }
        };
//创建最基本的观察者
 Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {

            }
        };
//创建被观察者
Observable observable = Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Object o) {
                //观察者与被观察者实现订阅关系
                subscriber.onNext("wjx");
                subscriber.onCompleted();
            }
        });
//just方法实现订阅关系,传入的参数是一个不定长度的数组
Observable observable = Observable.just("wjx","wjx","wjx");
//from方法实现订阅关系
String[] words ={"wjx","wjx"};
Observable observable = Observable.from(words);
//Subscribe订阅,即观察者订阅,参数是上面第一个创建的观察者
observable..subscribe(subscriber);

二、RxJava的不完整定义回调

Action0 - Action09 extends Action,参数都是泛型,不同的是参数个数,如Action0没有参数,Action09有9个参数。

//自定义Action
Action1<String> onNextAction = new Action1<String>() {
            @Override
            public void call(String s) {
            }
        };

        Action1<Throwable> onErrorAction = new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
            }
        };

        Action0 onCompletedAction = new Action0() {
            @Override
            public void call() {
            }
        };
//不完整回调,重载有多个方法
observable.subscribe(onNextAction,onErrorAction,onCompletedAction);
//interval,按固定时间发射整数序列的Observable
Observable.interval(3, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                  //  每3秒就调用call方法
            }
        });
//rang,发送指定范围的整数序列的Observable
Observable.range(0,5).subscribe(new Action1<Integer>() {
    @Override
     public void call(Integer integer) {
          // integer = {0,1,2,3,4}
     }
});
//repeat,执行N次特定数据的Observable
Observable.range(0,3)
               .repeat(2)
               .subscribe(new Action1<Integer>() {
                   @Override
                   public void call(Integer integer) {
                       // integer = { 0,1,2 }
                       // 执行6次
                   }
               });
//flatMap、cast,将Observable发射的数据转换成Observable集合
final String Host ="http://blog.csdn.net/";
        List<String> list = new ArrayList<>();
        list.add("wjx1");
        list.add("wjx2");
        list.add("wjx3");
        Observable.from(list).flatMap(new Func1<String, Observable<?>>() {
            @Override
            public Observable<?> call(String s) {
                return Observable.just(Host+s);
                // s = list.get(index);
            }
        }).cast(String.class).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e(TAG, "call: "+s);
                // s = Observable.just(Host+s);
            }
        });
//flatMapIterable,将数据包装成Iterable
Observable.just(1,2,3).flatMapIterable(new Func1<Integer, Iterable<?>>() {
            @Override
            public Iterable<Integer> call(Integer integer) {
                List<Integer> list = new ArrayList<>();
                list.add(integer+1);
                // integer = list.get(index);
                return list;
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                
            }
        });
// buffer,将源Observable转换为新的Observable
Observable.just(1,2,3,4,5,6)
        .buffer(3)
       .subscribe(new Action1<List<Integer>>() {
            @Override
            public void call(List<Integer> integers) {
                 for(Integer i :integers){
                   // buffer.size = 3;
                   // i = 1,2,3;
                }
            }
         });
// groupby
 Observable<GroupedObservable<String,String>> groupedObservableObservable =
               Observable.just("s1","s2","s3","s4","s5")
               .groupBy(new Func1<String, String>() {
                   @Override
                   public String call(String s) {
                       return s;
                   }
               });

三、过滤操作符

     //filter
     Observable.just(1,2,3,4)
               .filter(new Func1<Integer, Boolean>() {
                   @Override
                   public Boolean call(Integer integer) {
                       return integer>2;
                   }
               });
     //elementAt
      Observable.just(1,2,3,4)
                .elementAt(2)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {

                    }
                });
      //distinct
      Observable.just(1,2,3,4)
                .distinct()
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {

                    }
                });
      //skip
      Observable.just(1,2,3,4)
                .skip(2)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {

                    }
                });
        //take
        Observable.just(1,2,3,4)
                .take(2)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {

                    }
                });
        //ignoreElements
        Observable.just(1,2,3,4)
                .ignoreElements()
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Integer integer) {

                    }
                });
        //throttleFirst
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                
            }
        }).throttleFirst(200,TimeUnit.MILLISECONDS)
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                
            }
        });
        //throttleWithTimeout
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {

            }
        }).throttleWithTimeout(200,TimeUnit.MILLISECONDS)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        
                    }
                });

四、组合操作符

        //startWith
        Observable.just(3,4,5).startWith(1,2)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        
                    }
                });
        //merge
        Observable<Integer> observable2 = Observable.just(1,2,3)
                .subscribeOn(Schedulers.io());
        Observable<Integer> observable3 = Observable.just(4,5,6);
        Observable.merge(observable2,observable3).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {

            }
        });
        //concat
        Observable<Integer> observable1 = Observable.just(1,2,3)
                .subscribeOn(Schedulers.io());
        Observable<Integer> observable2 = Observable.just(4,5,6);
        Observable.concat(observable1,observable2).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {

            }
        });
        //zip
        Observable<Integer> observable1 = Observable.just(1,2,3)
                .subscribeOn(Schedulers.io());
        Observable<Integer> observable2 = Observable.just(4,5,6);
        Observable.zip(observable1,observable2).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {

            }
        });
        //zip
        Observable<Integer> observable1 = Observable.just(1,2,3)
                .subscribeOn(Schedulers.io());
        Observable<Integer> observable2 = Observable.just(4,5,6);
        Observable.zip(observable1, observable2, new Func2<Integer, Integer, Object>() {
            @Override
            public Object call(Integer integer, Integer integer2) {
                return null;
            }
        }).subscribe(new Action1<Object>() {
            @Override
            public void call(Object o) {
                
            }
        });
        //combineLatest
        Observable<Integer> observable1 = Observable.just(1,2,3)
                .subscribeOn(Schedulers.io());
        Observable<Integer> observable2 = Observable.just(4,5,6);
        Observable.combineLatest(observable1, observable2, new Func2<Integer, Integer, Object>() {
            @Override
            public Object call(Integer integer, Integer integer2) {
                return null;
            }
        }).subscribe(new Action1<Object>() {
            @Override
            public void call(Object o) {
                
            }
        });

五、辅助操作符

        //delay
        Observable.create(new Observable.OnSubscribe<Long>() {
            @Override
            public void call(Subscriber<? super Long> subscriber) {
                
            }
        }).delay(2,TimeUnit.SECONDS).subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                
            }
        });
        //doOnNext
        Observable.just(1,2)
                .doOnNext(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        
                    }
                }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {

            }
        });
        //subscribeOn、observeOn
        Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onCompleted();
            }
        });
        observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        
                    }
                });
        //timeout
        Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                
            }
        }).timeout(200,TimeUnit.MILLISECONDS,Observable.just(10,11));
        observable.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                
            }
        });

六、错误处理操作符

        //catch
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onCompleted();
            }
        }).onErrorReturn(new Func1<Throwable, Integer>() {
            @Override
            public Integer call(Throwable throwable) {
                return 8;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {

            }
        });
        // retry
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onCompleted();
            }
        }).retry(2).subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {

            }
        });

七、条件操作符和布尔操作符

        // all
        Observable.just(1,2,3,4)
                .all(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer<3;
                    }
                }).subscribe(new Subscriber<Boolean>() {
            @Override
            public void onCompleted() {
                
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Boolean aBoolean) {

            }
        });
        // contains
        Observable.just(1,2,3).contains(1)
                .subscribe(new Action1<Boolean>() {
                    @Override
                    public void call(Boolean aBoolean) {

                    }
                });
        // isEmpty
        Observable.just(1,2,3).()
                .subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean aBoolean) {

            }
        });
      // amb
     Observable.amb(Observable.just(1,2,3).delay(2,TimeUnit.SECONDS),
                Observable.just(4,5,6)).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {

            }
        });
        // defaultIfEmpty
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onCompleted();
            }
        }).defaultIfEmpty(3).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {

            }
        });

八、转换操作符

        // toList
        Observable.just(1,2,3).toList().subscribe(new Action1<List<Integer>>() {
            @Override
            public void call(List<Integer> integers) {
                
            }
        });
        // toSortedList
        Observable.just(3,1,2).toSortedList().subscribe(new Action1<List<Integer>>() {
            @Override
            public void call(List<Integer> integers) {

            }
        });
        // toMap
        Observable.just("s1","s2","s3").toMap(new Func1<String, String>() {
            @Override
            public String call(String s) {
                return s;
            }
        }).subscribe(new Action1<Map<String, String>>() {
            @Override
            public void call(Map<String, String> stringStringMap) {
                
            }
        });

相关文章

网友评论

      本文标题:RxJava

      本文链接:https://www.haomeiwen.com/subject/sxukoxtx.html