美文网首页
Rxjava(1) 创建操作

Rxjava(1) 创建操作

作者: 其勇勇 | 来源:发表于2019-07-31 17:35 被阅读0次

    文档网址:https://mcxiaoke.gitbooks.io/rxdocs/content/

    private void repeat(){
    
            /**
             *  main  1
             main  2
             main  1
             main  2
             main  1
             main  2
             main  1
             main  2
             main  1
             main  2
             */
            Observable.range(1, 2).repeat(5).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e("qwer",Thread.currentThread().getName() + "  " + integer);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
        }
    
    
        private void range(){
            Observable.range(1,10).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e("qwer",Thread.currentThread().getName() + "  " + integer);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
        }
    
        private void just(){
            Observable.just(1,"df").subscribe();
            Observable.just(1,"df",99L,new Object()).subscribe(new Observer<Object>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Object o) {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
        }
    
        private void timer_interval(){
    
            Observer<Long> observer = new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Long aLong) {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            };
    
            //延迟 1s 后执行一个任务,然后结束
            //Observable.timer(1000, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()).subscribe(new Observer<Long>() {
    
            Observable.timer(1000, TimeUnit.MILLISECONDS).
                    subscribeOn(Schedulers.newThread()).
                    observeOn(AndroidSchedulers.mainThread()).
                    subscribe(observer);
    
            //每隔 1s 执行一次任务,第一次任务执行前有 1s 的间隔,执行无限次
            Observable.interval(1000, TimeUnit.MILLISECONDS).
                    subscribeOn(Schedulers.io()).subscribe(observer);
    
            //每隔 1s 执行一次任务,立即执行第一次任务,执行无限次
            Observable.interval(0, 1000, TimeUnit.MILLISECONDS).
                    subscribeOn(Schedulers.io()).subscribe(observer);
    
            //每隔 1s 执行一次任务,立即执行第一次任务,只执行五次
            Observable.interval(0, 1000, TimeUnit.MILLISECONDS).
                    take(5).
                    subscribe(observer);
    
            //先执行一个任务,等待 1s,再执行另一个任务,然后结束
            Observable.just(0L).doOnNext(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d("qwer", "执行第一个任务");
                }
            }).delay(1000, TimeUnit.MILLISECONDS).subscribe(observer);
        }
    
        private void from(){
            List<String> data = new ArrayList<>();
            data.add("1");
            data.add("2");
            data.add("3");
            data.add("4");
    
            Observable.fromIterable(data).doOnNext(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
    
                }
            }).subscribe();
    
            Observable.fromIterable(data).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.e("qwer","accept : " + s);
                }
    
            }).dispose();
    
            Observable.fromIterable(data).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("qwer","onSubscribe");
                }
    
                @Override
                public void onNext(String s) {
                    Log.e("qwer","onNext" + s);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("qwer","onError:" + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.e("qwer","onComplete");
                }
            });
    
    
            /**
             * Observable 1
             onNext 1
             Observable 2
             onNext 2
             Observable 3
             onNext 3
             Observable 4
             onNext 4
             */
            Observable.fromIterable(data).doOnNext(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.e("qwer"," Observable " + s);
                }
            }).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(String s) {
                    Log.e("qwer"," onNext " + s);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
        }
    
        private void create(){
            Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) {
    
                    for(int i = 0;i < 13;i++){
                        if(i == 10){
                            emitter.onError(new Throwable(""));
                        }else {
                            emitter.onNext(i);
                        }
                    }
                    emitter.onComplete();
                }
            }).onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) throws Exception {
                    return 100;
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("qwer","onSubscribe");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e("qwer","onNext" + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("qwer","onError:" + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.e("qwer","onComplete");
                }
            });
        }
    

    相关文章

      网友评论

          本文标题:Rxjava(1) 创建操作

          本文链接:https://www.haomeiwen.com/subject/bzgtdctx.html