美文网首页
Rxjava 简单入门

Rxjava 简单入门

作者: NullPoint3Exce | 来源:发表于2019-01-22 22:09 被阅读0次

    android第三方框架,基于事件流的链式调用,逻辑简洁,使用简单
    作用:创建、变换、线程调度等。通过使用,组合操作符,能完成大部分异步场景下的功能需求。

    依赖:

    implementaion 'io.reactivex.rxjava2:rxandroid:2.0.1'
    implementaion 'io.reactivex.rxjava2:rxjava:2.0.7'
     
    

    案例:

         {
    
            //创建一个上游 Observable:
            Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            });
            //创建一个下游 Observer
            Observer<Integer> observer = new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "subscribe");
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "error");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "complete");
                }
            };
            //建立连接
            observable.subscribe(observer);
    
    
        }
    

    操作符:

    • ConcatMap 执行多个事件是按照数据源的顺序执行

    • Map 从一种形式 转换为另外一种形式,eg: string --->int

    • FlatMap 同ConcatMap类同,不按照数据源的顺序执行

    
    {
            // 被观察者
            Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
    
                    String result = getResponseByOk();
                    Log.d(TAG, "subscribe: result=" + result);
                    emitter.onNext(result);
                    emitter.onComplete();
                }
            }).subscribeOn(Schedulers.io())  //被观察者 在线程池中调用了
                    .observeOn(AndroidSchedulers.mainThread()) //观察者在主线程中实现
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "subscribe");
                        }
    
                        @Override
                        public void onNext(String value) {
                            Log.d(TAG, "" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "error");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "complete");
                        }
                    });
    
        }
    

    flatmap的运用 concat 类同

     private Observable<String> processUrlIpByOneFlatMap() {
           return Observable.just(
                   "http://www.baidu.com/",
                   "http://www.google.com/",
                   "https://www.bing.com/")
                   .flatMap(new Func1<String, Observable<String>>() {
                       @Override
                       public Observable<String> call(String s) {
                           return createIpObservable(s);
                       }
                   })
                   .subscribeOn(Schedulers.io())
                   .observeOn(AndroidSchedulers.mainThread())
                   .subscribe(new Action1<String>() {
                       @Override
                       public void call(String s) {
                           printLog(tvLogs, "Consume Data <- ", s);
                       }
                   }, new Action1<Throwable>() {
                       @Override
                       public void call(Throwable throwable) {
                           printErrorLog(tvLogs, "throwable call()", throwable.getMessage());
                       }
                   });
       }
    
     //根据主机获取ip
       private Observable<String> createIpObservable(final String url) {
           return Observable.create(new Observable.OnSubscribe<String>() {
               @Override
               public void call(Subscriber<? super String> subscriber) {
                   try {
                       String ip = getIPByUrl(url);
                       subscriber.onNext(ip);
                       printLog(tvLogs, "Emit Data -> ",url+" : " +ip);
                   } catch (MalformedURLException e) {
                       e.printStackTrace();
                       //subscriber.onError(e);
                       subscriber.onNext(null);
                   } catch (UnknownHostException e) {
                       e.printStackTrace();
                       //subscriber.onError(e);
                       subscriber.onNext(null);
                   }
                   subscriber.onCompleted();
               }
           });
       }
    
    

    参考:https://blog.csdn.net/johnny901114/article/details/51532776

    相关文章

      网友评论

          本文标题:Rxjava 简单入门

          本文链接:https://www.haomeiwen.com/subject/woxdjqtx.html