美文网首页
Rxjava 简单入门

Rxjava 简单入门

作者: NullPoint3Exce | 来源:发表于2019-01-22 22:09 被阅读0次

android第三方框架,基于事件流的链式调用,逻辑简洁,使用简单
作用:创建、变换、线程调度等。通过使用,组合操作符,能完成大部分异步场景下的功能需求。

依赖:

implementaion 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementaion 'io.reactivex.rxjava2:rxjava:2.0.7'
 

案例:

     {

        //创建一个上游 Observable:
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });
        //创建一个下游 Observer
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "subscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "error");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "complete");
            }
        };
        //建立连接
        observable.subscribe(observer);


    }

操作符:

  • ConcatMap 执行多个事件是按照数据源的顺序执行

  • Map 从一种形式 转换为另外一种形式,eg: string --->int

  • FlatMap 同ConcatMap类同,不按照数据源的顺序执行


{
        // 被观察者
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                String result = getResponseByOk();
                Log.d(TAG, "subscribe: result=" + result);
                emitter.onNext(result);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io())  //被观察者 在线程池中调用了
                .observeOn(AndroidSchedulers.mainThread()) //观察者在主线程中实现
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "subscribe");
                    }

                    @Override
                    public void onNext(String value) {
                        Log.d(TAG, "" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "error");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "complete");
                    }
                });

    }

flatmap的运用 concat 类同

 private Observable<String> processUrlIpByOneFlatMap() {
       return Observable.just(
               "http://www.baidu.com/",
               "http://www.google.com/",
               "https://www.bing.com/")
               .flatMap(new Func1<String, Observable<String>>() {
                   @Override
                   public Observable<String> call(String s) {
                       return createIpObservable(s);
                   }
               })
               .subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Action1<String>() {
                   @Override
                   public void call(String s) {
                       printLog(tvLogs, "Consume Data <- ", s);
                   }
               }, new Action1<Throwable>() {
                   @Override
                   public void call(Throwable throwable) {
                       printErrorLog(tvLogs, "throwable call()", throwable.getMessage());
                   }
               });
   }

 //根据主机获取ip
   private Observable<String> createIpObservable(final String url) {
       return Observable.create(new Observable.OnSubscribe<String>() {
           @Override
           public void call(Subscriber<? super String> subscriber) {
               try {
                   String ip = getIPByUrl(url);
                   subscriber.onNext(ip);
                   printLog(tvLogs, "Emit Data -> ",url+" : " +ip);
               } catch (MalformedURLException e) {
                   e.printStackTrace();
                   //subscriber.onError(e);
                   subscriber.onNext(null);
               } catch (UnknownHostException e) {
                   e.printStackTrace();
                   //subscriber.onError(e);
                   subscriber.onNext(null);
               }
               subscriber.onCompleted();
           }
       });
   }

参考:https://blog.csdn.net/johnny901114/article/details/51532776

相关文章

网友评论

      本文标题:Rxjava 简单入门

      本文链接:https://www.haomeiwen.com/subject/woxdjqtx.html