美文网首页
如何使用RxJava

如何使用RxJava

作者: CyrusChan | 来源:发表于2018-09-28 14:10 被阅读4次

Hello World:

如下hello world 的例子是用Java,Groovy、等语言实现的。从字符串列表中创建一个Observable 并且用一个打印每个被这个Observable发射的String的函数来订阅它。

你可以找到额外的例子在每个 language adaptor 文件夹下。

Java

public static void hello(String... names) {
    Observable.from(names).subscribe(new Action1<String>() {

        @Override
        public void call(String s) {
            System.out.println("Hello " + s + "!");
        }

    });
}
hello("Ben", "George");
Hello Ben!
Hello George!

Groovy

def hello(String[] names) {
    Observable.from(names).subscribe { println "Hello ${it}!" }
}
hello("Ben", "George")
Hello Ben!
Hello George!

如何用RxJava来设计

为了使用你创建Observables(发射数据项),转换这些Observables 通过不同的方式去获取你感兴趣的确切的数据条目,接着观测并且对这些感兴趣的条目序列做出反应(通过实现Observers或者Subscribers ,接着将它们订阅到所产生的转换后的Observables)

创建Observables

为了创建一个Observable, 你可以通过传入一个展示Observable的行为的函数来手动的实现Observable的行为或者你可以将现有的数据结构转换为一个Observable通过使用 some of the Observable operators that are designed for this purpose.(一些Observable操作符)

创建一个Observable从已有的数据结构:

你可以使用just()或者from()函数来将对象,列表或者数组转换为可发射这些对象的Observables:

Observable<String> o = Observable.from("a", "b", "c");

def list = [5, 6, 7, 8]
Observable<Integer> o = Observable.from(list);

Observable<String> o = Observable.just("one object");

这些被转换的Observables 将同步的调用订阅他们的Subscriber的onNext()函数为每个被这些Observable发射的item,接着将会调用subscriber的onCompleted()函数。

创建一个Observable通过create()函数

通过设计你自己的Observable且用create() 函数来实现它,你可以实现异步i/o ,计算操作甚至无限的数据流。

同步Observable例子:

/**
 * This example shows a custom Observable that blocks 
 * when subscribed to (does not spawn an extra thread).
 */
def customObservableBlocking() {
    return Observable.create { aSubscriber ->
        50.times { i ->
            if (!aSubscriber.unsubscribed) {
                aSubscriber.onNext("value_${i}")
            }
        }
        // after sending all values we complete the sequence
        if (!aSubscriber.unsubscribed) {
            aSubscriber.onCompleted()
        }
    }
}

// To see output:
customObservableBlocking().subscribe { println(it) }

异步的Observable 例子:

如下的例子使用Groovy去创建一个发射75个字符串的Observable

以使示例更加清晰,它被写的很啰嗦,带有静态类型和Fun1匿名内部类的实现:

/**
 * This example shows a custom Observable that does not block
 * when subscribed to as it spawns a separate thread.
 */
def customObservableNonBlocking() {
    return Observable.create({ subscriber ->
        Thread.start {
            for (i in 0..<75) {
                if (subscriber.unsubscribed) {
                    return
                }
                subscriber.onNext("value_${i}")
            }
            // after sending all values we complete the sequence
            if (!subscriber.unsubscribed) {
                subscriber.onCompleted()
            }
        }
    } as Observable.OnSubscribe)
}

// To see output:
customObservableNonBlocking().subscribe { println(it) }

这是用Clojure同样的实现

(defn customObservableNonBlocking []
  "This example shows a custom Observable that does not block 
   when subscribed to as it spawns a separate thread.
   
  returns Observable<String>"
  (Observable/create 
    (fn [subscriber]
      (let [f (future 
                (doseq [x (range 50)] (-> subscriber (.onNext (str "value_" x))))
                ; after sending all values we complete the sequence
                (-> subscriber .onCompleted))
        ))
      ))

如下是一个从WiKipedia获取文章的例子每一篇都调用onNext函数.

(defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames]
  "Fetch a list of Wikipedia articles asynchronously.
  
   return Observable<String> of HTML"
  (Observable/create 
    (fn [subscriber]
      (let [f (future
                (doseq [articleName wikipediaArticleNames]
                  (-> subscriber (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
                ; after sending response to onnext we complete the sequence
                (-> subscriber .onCompleted))
        ))))

(-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"]) 
  (.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")))

回到Groovy,同样的功能 但是使用闭包而不是匿名内部类:

/*
 * Fetch a list of Wikipedia articles asynchronously.
 */
def fetchWikipediaArticleAsynchronously(String... wikipediaArticleNames) {
    return Observable.create { subscriber ->
        Thread.start {
            for (articleName in wikipediaArticleNames) {
                if (subscriber.unsubscribed) {
                    return
                }
                subscriber.onNext(new URL("http://en.wikipedia.org/wiki/${articleName}").text)
            }
            if (!subscriber.unsubscribed) {
                subscriber.onCompleted()
            }
        }
        return subscriber
    }
}

fetchWikipediaArticleAsynchronously("Tiger", "Elephant")
    .subscribe { println "--- Article ---\n${it.substring(0, 125)}" }

结果:

--- Article ---
 <!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Tiger - Wikipedia, the free encyclopedia</title> ...
--- Article ---
 <!DOCTYPE html>
<html lang="en" dir="ltr" class="client-nojs">
<head>
<title>Elephant - Wikipedia, the free encyclopedia</tit ...

注意:所有之上的例子为了简洁忽略了错误处理。参看如下包含错误处理的例子。

更多信息能够在 Observable和 创建 Creating Observables页面找到。

使用操作符转换Observables

RxJava允许你把操作符链在一起去进行转换和组合 Observables.

如下Groovy例子用一个预定义,异步的Observable发射75个item,跳过前10个item,拿取接下来的5个并且在订阅和打印这些item之前转换他们。

/**
 * Asynchronously calls 'customObservableNonBlocking' and defines
 * a chain of operators to apply to the callback sequence.
 */
def simpleComposition() {
    customObservableNonBlocking().skip(10).take(5)
        .map({ stringValue -> return stringValue + "_xform"})
        .subscribe({ println "onNext => " + it})
}

结果如下:

onNext => value_10_xform
onNext => value_11_xform
onNext => value_12_xform
onNext => value_13_xform
onNext => value_14_xform

如下是一个marble图表说明这个转换过程:

image.png

如下是下一个Clojure例子,消费3个异步的Observables,包括从一个到另一个的依赖关系,并且通过使用zip函数组合这三个Observables发射的item并使用map来转换它的结果来发射一个单一的响应。

(defn getVideoForUser [userId videoId]
  "Get video metadata for a given userId
   - video metadata
   - video bookmark position
   - user data
  return Observable<Map>"
    (let [user-observable (-> (getUser userId)
              (.map (fn [user] {:user-name (:name user) :language (:preferred-language user)})))
          bookmark-observable (-> (getVideoBookmark userId videoId)
              (.map (fn [bookmark] {:viewed-position (:position bookmark)})))
          ; getVideoMetadata requires :language from user-observable so nest inside map function
          video-metadata-observable (-> user-observable 
              (.mapMany
                ; fetch metadata after a response from user-observable is received
                (fn [user-map] 
                  (getVideoMetadata videoId (:language user-map)))))]
          ; now combine 3 observables using zip
          (-> (Observable/zip bookmark-observable video-metadata-observable user-observable 
                (fn [bookmark-map metadata-map user-map]
                  {:bookmark-map bookmark-map 
                  :metadata-map metadata-map
                  :user-map user-map}))
            ; and transform into a single response object
            (.map (fn [data]
                  {:video-id videoId
                   :video-metadata (:metadata-map data)
                   :user-id userId
                   :language (:language (:user-map data))
                   :bookmark (:viewed-position (:bookmark-map data))
                  })))))

响应如下:

{:video-id 78965, 
 :video-metadata {:video-id 78965, :title House of Cards: Episode 1, 
                  :director David Fincher, :duration 3365}, 
 :user-id 12345, :language es-us, :bookmark 0}

如下是一个marble图表来说明如何代码如何生成应答:

image.png

如下Groovy例子,来自于 Ben Christensen’s QCon presentation on the evolution of the Netflix API,它使用merge操作符来组合两个Observables,接着使用reduce操作符来从结果序列中构建一个单一的item,接着使用map转换该item在发射它之前。

public Observable getVideoSummary(APIVideo video) {
   def seed = [id:video.id, title:video.getTitle()];
   def bookmarkObservable = getBookmark(video);
   def artworkObservable = getArtworkImageUrl(video);
   return( Observable.merge(bookmarkObservable, artworkObservable)
      .reduce(seed, { aggregate, current -> aggregate << current })
      .map({ [(video.id.toString() : it] }))
}

这儿是一个marble图表说明代码如何使用reduce操作符来从多个Obserables组合的单一结构中获取结果。

image.png

错误处理:

如下是一个上面的改进后的关于Wikipedia的例子包括错误处理:

/*
 * Fetch a list of Wikipedia articles asynchronously, with error handling.
 */
def fetchWikipediaArticleAsynchronouslyWithErrorHandling(String... wikipediaArticleNames) {
    return Observable.create({ subscriber ->
        Thread.start {
            try {
                for (articleName in wikipediaArticleNames) {
                    if (true == subscriber.isUnsubscribed()) {
                        return;
                    }
                    subscriber.onNext(new URL("http://en.wikipedia.org/wiki/"+articleName).getText());
                }
                if (false == subscriber.isUnsubscribed()) {
                    subscriber.onCompleted();
                }
            } catch(Throwable t) {
                if (false == subscriber.isUnsubscribed()) {
                    subscriber.onError(t);
                }
            }
            return (subscriber);
        }
    });
}

注意它如何调用onError(Throwable t),如果发生错误并注意如下代码传入subscribe()(第二个函数来处理错误)

fetchWikipediaArticleAsynchronouslyWithErrorHandling("Tiger", "NonExistentTitle", "Elephant")
    .subscribe(
        { println "--- Article ---\n" + it.substring(0, 125) }, 
        { println "--- Error ---\n" + it.getMessage() })

更多信息参看 Error-Handling-Operators ,RxJava中的特殊错误处理,包括像 onErrorResumeNext()onErrorReturn() 的函数,这些函数允许Observables继续回退在遇到错误的情况下。

下面是一个示例,你可以使用这样的方法传递关于你遇到的任何异常的自定义信息。想象你有一个Observable或者级联的Observables-myObservable。并且你想拦截任何正常通过Subscriber 的onError函数的异常,用自定义的Throwable来替换这些异常。你可以通过修改myObservable用onErrorResumeNext()函数,并传入这个函数一个带有你自定义的Throwable(一个被称为error()的实用的函数将会为你生成这样一个Observable)

myModifiedObservable = myObservable.onErrorResumeNext({ t ->
   Throwable myThrowable = myCustomizedThrowableCreator(t);
   return (Observable.error(myThrowable));
});

相关文章

  • RxJava并发parallel的使用

    概述 本文不描述RxJava是什么,以及如何使用的,重点讨论如何使用RxJava实现并发。即: 区分线程切换和并发...

  • 对rxjava实现思想的个人思考

    这篇文章不是讲解rxjava如何使用,而是对其设计的思考。使用过rxjava的同学们都注意到rxjava的操作符很...

  • 如何使用RxJava

    Hello World: 如下hello world 的例子是用Java,Groovy、等语言实现的。从字符串列表...

  • 如何使用RxJava

    如何使用RxJava Hello World 下面是一段java代码,代码创建了一个可观察者(Observable...

  • Android框架——RxJava(一)概述与基本使用

    RxJava(一)概述与基本使用 RxJava学习系列: RxJava(一)概述与基本使用 [RxJava(二)创...

  • RxJava

    其它文章 RxJava操作符大全 1、RxJava之一——一次性学会使用RxJava RxJava简单的使用和使用...

  • RxJava初探

    我们在学习RxJava之前要了解一下,为什么使用RxJava, 使用RxJava有什么好处 RxJava特性: 轻...

  • RxJava + Retrofit 简单使用

    RxJava接入 RxJava 简单用法 Retrofit 简单使用 RxJava + Retrofit RxJa...

  • 一、如何使用rxJava

    原文地址:https://github.com/ReactiveX/RxJava/wiki/How-To-Use-...

  • RxJava2 使用 及 源码阅读

    RxJava2 使用 及 源码阅读 RxJava是什么?根据RxJava在GitHub上给出的描述:RxJava ...

网友评论

      本文标题:如何使用RxJava

      本文链接:https://www.haomeiwen.com/subject/rdbuoftx.html