美文网首页
如何使用RxJava

如何使用RxJava

作者: young4444 | 来源:发表于2018-01-16 16:47 被阅读0次

    如何使用RxJava

    Hello World

    下面是一段java代码,代码创建了一个可观察者(Observable)用于发送一个字符串数组。然后去订阅(subscribe)它,并把结果打印出来。有关可观察者和订阅的概念请看观察者模式来了解。

    //example
    public static void hello(String... names) {
        Observable.from(names).subscribe(new Action1<String>() {
    
            @Override
            public void call(String s) {
                System.out.println("Hello " + s + "!");
            }
    
        });
    }
    //input
    hello("Ben", "George");
    //output
    Hello Ben!
    Hello George!
    

    如何使用RxJava

    要使用使用Rxjava,首先要创建可观察者(就是发送数据的那个Observable),可以通过不同的Observable操作符(just()、from()、create()等),获取不同返回值的Observable,来获取你想要的精确的数据(比如Array、Iterable之类的),然后创建一个观察者(Observer)来订阅Observable,观察它的变化并对此做出反应。

    创建Observable

    可以通过内置的方法方便的创建Observables,比如create(),或者其他将现有数据转化成
    Observables的方法

    基于已存在的数据结构常见Observable

    可以使用just()from()来把对象、对象数组转化成Observable。

    Observable<String> o = Observable.fromArray("a", "b", "c");
    
    Observable<Integer> o2 = Observable.fromArray(5,6,7,8);
    
    Observable<String> o3 = Observable.just("one object");
    

    转换出来的Observable将会同步调用所有订阅这个Observable的Subscriber的onNext(),获取到Observable发送的所有数据,最后同步调用所有Subscriber的onCompleted()

    通过create方法创建Observable

    你可以通过create()来实现异步I/O(输入/输出)、计算操作或者生成无限大的数据流等目的。

    Ps:
    通常我们在系统级别说线程的blocked,是说线程操作io,被暂停了,这种线程由linux内核来唤醒(io设备报告数据来了,内核把block的线程放进可运行的进程队列,依次得到处理器时间)

    同步的Observable例子

    /**
     * This example shows a custom Observable that blocks 
     * when subscribed to (does not spawn an extra thread).
     */
    def customObservableBlocking() {
        return Observable.create { aSubscriber ->
            50.times { i -> //=>for(i=0;i<50;i++)
                if (!aSubscriber.unsubscribed) {
                    aSubscriber.onNext("value_${i}")
                }
            }
            // after sending all values we complete the sequence
            if (!aSubscriber.unsubscribed) {
                aSubscriber.onCompleted()
            }
        }
    }
    
    // To see output:
    customObservableBlocking().subscribe { println(it) }
    //
    //如果执行两次customObservableBlocking(),同一线程里面只能同时存在一个Observable的实例,所以当运行两次customObservableBlocking,第二次的customObservableBlocking的Observable的创建会处于block状态。
    

    异步的Observable例子

    以下代码使用Groovy来创建一个Observable对象来发送75个字符串。它的编写非常简单,使用了静态类型和函数匿名内部类的实现,以使示例更加清晰。

    /**
     * This example shows a custom Observable that does not block
     * when subscribed to as it spawns a separate thread.
     */
    def customObservableNonBlocking() {
        return Observable.create({ subscriber ->
            Thread.start {
                for (i in 0..<75) {
                    if (subscriber.unsubscribed) {
                        return
                    }
                    subscriber.onNext("value_${i}")
                }
                // after sending all values we complete the sequence
                if (!subscriber.unsubscribed) {
                    subscriber.onCompleted()
                }
            }
        } as Observable.OnSubscribe)
    }
    
    // To see output:
    customObservableNonBlocking().subscribe { println(it) }
    //如果执行两次customObservableBlocking(),同一线程里面只能同时存在一个Observable的实例,所以当运行两次customObservableBlocking,两次的Observable在不同的线程,就不会发生block。
    

    下面是在Clojure下,使用Funture来完成的相同功能的代码。

    (defn customObservableNonBlocking []
      "This example shows a custom Observable that does not block 
       when subscribed to as it spawns a separate thread.
       
      returns Observable<String>"
      (Observable/create 
        (fn [subscriber]
          (let [f (future 
                    (doseq [x (range 50)] (-> subscriber (.onNext (str "value_" x))))
                    ; after sending all values we complete the sequence
                    (-> subscriber .onCompleted))
            ))
          ))
    
    ; To see output
    (.subscribe (customObservableNonBlocking) #(println %))
    

    下面是一个从维基百科获取文章的例子,并在获取到每一篇文章后调用onNext。

    (defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
      "Fetch a list of Wikipedia articles asynchronously.
      
       return Observable<String> of HTML"
      (Observable/create 
        (fn [subscriber]
          (let [f (future
                    (doseq [articleName wikipediaArticleNames]
                      (-> subscriber (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
                    ; after sending response to onnext we complete the sequence
                    (-> subscriber .onCompleted))
            ))))
    
    //run
    (-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"]) 
    (.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")))
    

    回到Groovy,相同的代码长这样:

    /*
     * Fetch a list of Wikipedia articles asynchronously.
     */
    def fetchWikipediaArticleAsynchronously(String... wikipediaArticleNames) {
        return Observable.create { subscriber ->
            Thread.start {
                for (articleName in wikipediaArticleNames) {
                    if (subscriber.unsubscribed) {
                        return
                    }
                    subscriber.onNext(new URL("http://en.wikipedia.org/wiki/${articleName}").text)
                }
                if (!subscriber.unsubscribed) {
                    subscriber.onCompleted()
                }
            }
            return subscriber
        }
    }
    
    fetchWikipediaArticleAsynchronously("Tiger", "Elephant")
        .subscribe { println "--- Article ---\n${it.substring(0, 125)}" }
    
    //Result
    --- Article ---
     <!DOCTYPE html>
    <html lang="en" dir="ltr" class="client-nojs">
    <head>
    <title>Tiger - Wikipedia, the free encyclopedia</title> ...
    --- Article ---
     <!DOCTYPE html>
    <html lang="en" dir="ltr" class="client-nojs">
    <head>
    <title>Elephant - Wikipedia, the free encyclopedia</tit ...
    

    注意,上面所有的例子都忽略了错误处理,为了简洁。更完整的信息可以在Observable创建Observable页面找到。

    通过操作符创建Observable

    RxJava允许你通过操作符转换和生成Observable。

    以下Groovy示例使用了之前定义的异步Observable发送了75个数据,但是调用skip(10)跳过前十个数据,然后调用take(5)只发送前5个数据,并且用map...在打印前把它们转化为特定字符串。

    Ps:map方法的作用就是在onNext()之前截取数据,并对数据进行一定的处理,比如说下面示例map()获取到的数据第一个数据为"value_10",调用map后就变成"value_10_xform"。

    /**
     * Asynchronously calls 'customObservableNonBlocking' and defines
     * a chain of operators to apply to the callback sequence.
     */
    def simpleComposition() {
        customObservableNonBlocking().skip(10).take(5)
            .map({ stringValue -> return stringValue + "_xform"})
            .subscribe({ println "onNext => " + it})
    }
    
    //result
    onNext => value_10_xform
    onNext => value_11_xform
    onNext => value_12_xform
    onNext => value_13_xform
    onNext => value_14_xform
    
    
    //上面的示例翻译成java就是:
    public class Hello {
        public static void main(String... args) {
             customObservableNonBlocking().skip(10).take(5).map(new Function<String, Object>() {
                @Override
                public Object apply(String s) throws Exception {
                    return s + "_xform";
                }
            }).subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object o) throws Exception {
                    System.out.println(o);
                }
            });
        }
    
        private static Observable<String> customObservableNonBlocking() {
            return Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
                    new Thread() {
                        @Override
                        public void run() {
                            super.run();
                            for (int i = 0; i < 75; i++) {
                                emitter.onNext("value_"+i);
                            }
                            emitter.onComplete();
                        }
                    }.start();
                }
            });
        }
    }
    
    

    以下是ReactiveX特有的大理石图表(marble diagram),清晰地表现了以上流程:


    Composition.1.png

    以下Clojure示例消耗了三个异步Observable,包括从一个到另一个的依赖性,通过zip操作符来将三个Observables的每一个项目组合起来,发出一个单一的返回对象,然后通过map转换获取到的数据的类型。

    (defn getVideoForUser [userId videoId]
      "Get video metadata for a given userId
       - video metadata
       - video bookmark position
       - user data
      return Observable<Map>"
        (let [user-observable (-> (getUser userId)
                  (.map (fn [user] {:user-name (:name user) :language (:preferred-language user)})))
              bookmark-observable (-> (getVideoBookmark userId videoId)
                  (.map (fn [bookmark] {:viewed-position (:position bookmark)})))
              ; getVideoMetadata requires :language from user-observable so nest inside map function
              video-metadata-observable (-> user-observable 
                  (.mapMany
                    ; fetch metadata after a response from user-observable is received
                    (fn [user-map] 
                      (getVideoMetadata videoId (:language user-map)))))]
              ; now combine 3 observables using zip
              (-> (Observable/zip bookmark-observable video-metadata-observable user-observable 
                    (fn [bookmark-map metadata-map user-map]
                      {:bookmark-map bookmark-map 
                      :metadata-map metadata-map
                      :user-map user-map}))
                ; and transform into a single response object
                (.map (fn [data]
                      {:video-id videoId
                       :video-metadata (:metadata-map data)
                       :user-id userId
                       :language (:language (:user-map data))
                       :bookmark (:viewed-position (:bookmark-map data))
                      })))))
    
    //response 
    {:video-id 78965, 
     :video-metadata {:video-id 78965, :title House of Cards: Episode 1, 
                      :director David Fincher, :duration 3365}, 
     :user-id 12345, :language es-us, :bookmark 0}
    

    图例如下:


    Composition.2.png

    如果用java代码借助上文中的customObservableNonBlocking()来演示Rxjava的zip操作符,就是如下显示:

    private static void testForJava() {
            Observable.zip(customObservableNonBlocking(), customObservableNonBlocking(), new BiFunction<String, String, Object>() {
    
                @Override
                public Object apply(String s, String s2) throws Exception {
    
                    return "s:" + s + ";s2:" + s2;
                }
            }).subscribe(new Consumer<Object>() {
    
                @Override
                public void accept(Object o) throws Exception {
                    System.out.println(o);
                }
            });
        }
    
    //input
    testForJava();
    //output
    s:value_0;s2:value_0
    s:value_1;s2:value_1
    s:value_2;s2:value_2
    s:value_3;s2:value_3
    ...
    //结论
    zip可以用于将多个ObServable的返回结果结合成一个。比如
    

    以下Groovy示例取自本。克里森的关于Netflix API进化的演讲。。它通过merge操作符把两个Observable相结合,然后通过reduce操作符在生成的序列中构造单个项目,然后在数据发送前使用map改变数据的类型。

    public Observable getVideoSummary(APIVideo video) {
       def seed = [id:video.id, title:video.getTitle()];
       def bookmarkObservable = getBookmark(video);
       def artworkObservable = getArtworkImageUrl(video);
       return( Observable.merge(bookmarkObservable, artworkObservable)
          .reduce(seed, { aggregate, current -> aggregate << current })
          .map({ [(video.id.toString() : it] }))
    }
    

    图例如下:


    Composition.3.png

    以下示例是用Java演示的有关RxJava的merge操作符和reduce操作符用法:

    private static void test4(){
            String[] args1=new String[]{"张的欣1","张的欣2","张的欣3","张的欣4"};
            String[] args2=new String[]{"春晓1","春晓2","春晓3","春晓4"};
            //相同的数组可以进行合并
            Observable<String> mergeObservable=Observable.merge(Observable.fromArray(args1),Observable.fromArray(args2));
            
            //merge后输出:
            //张的欣1
            //张的欣2
            //张的欣3
            //张的欣4
            //春晓1
            //春晓2
            //春晓3
            //春晓4
    
            Maybe<String> maybe=mergeObservable.reduce(new BiFunction<String, String, String>() {
                @Override
                public String apply(String s, String s2) throws Exception {
                    return s.length()>s2.length()?s:s2;
                }
            });
            maybe.subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println(s);
                }
            });
    
            //reduce后输出:
            //张的欣4
        }
    
    //结论:
    1.merge的作用相当于Observable之间的Append。
    2.reduce相当于筛选。
    

    错误处理

    以下上文中抓取维基百科上文章的一个例子,现在加上错误处理。

    /*
     * Fetch a list of Wikipedia articles asynchronously, with error handling.
     */
    def fetchWikipediaArticleAsynchronouslyWithErrorHandling(String... wikipediaArticleNames) {
        return Observable.create({ subscriber ->
            Thread.start {
                try {
                    for (articleName in wikipediaArticleNames) {
                        if (true == subscriber.isUnsubscribed()) {
                            return;
                        }
                        subscriber.onNext(new URL("http://en.wikipedia.org/wiki/"+articleName).getText());
                    }
                    if (false == subscriber.isUnsubscribed()) {
                        subscriber.onCompleted();
                    }
                } catch(Throwable t) {
                    if (false == subscriber.isUnsubscribed()) {
                        subscriber.onError(t);
                    }
                }
                return (subscriber);
            }
        });
    }
    

    如果出现了错误,以下代码将通过订阅第二个方法来调用onError()。

    fetchWikipediaArticleAsynchronouslyWithErrorHandling("Tiger", "NonExistentTitle", "Elephant")
        .subscribe(
            { println "--- Article ---\n" + it.substring(0, 125) }, 
            { println "--- Error ---\n" + it.getMessage() })
    

    更多有关RxJava的错误处理操作符的信息请参见Error-Handling-Operators页面,其中包括像onErrorResumeNext()onErrorReturn()这样可以让Observable在遭遇错误后
    回调的方法。

    以下示例说明了如何使用这样的方法来抛出异常。 假设你有一个或者一组的Observable — myObservable。然后你想用自己自定义的Throwable类替代默认Throwable拦截所有会回调Subscriber的onError()的异常。你可以通过重写myObservable的onErrorResumeNext() 来调用OnError()。然后把自定义错误作为参数调用一个名为error()的方法。

    myModifiedObservable = myObservable.onErrorResumeNext({ t ->
       Throwable myThrowable = myCustomizedThrowableCreator(t);
       return (Observable.error(myThrowable));
    });
    

    参考原文:
    1.How-To-Use-RxJava
    2.Why should we use RxJava on Android

    相关文章

      网友评论

          本文标题:如何使用RxJava

          本文链接:https://www.haomeiwen.com/subject/djteoxtx.html