美文网首页RxJava
一、如何使用rxJava

一、如何使用rxJava

作者: 啥也不说了 | 来源:发表于2017-07-07 21:55 被阅读134次

原文地址:https://github.com/ReactiveX/RxJava/wiki/How-To-Use-RxJava
项目中用到的代码地址:http://git.oschina.net/brendanv/learnrxjava

先来个Hello World

为什么第一行代码都喜欢用Hello World?因为这个表示你的程序有了生命。

maven配置

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.1.1</version>
        </dependency>

Hello World

代码在first module的first包。

public class HelloWorld {
    public static void hello(String... names) {
        /**
         * Flowable是2.x版本新增的
         */
        Flowable.fromArray(names).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
        /**
         * Observable是旧版本的
         */
        Observable.fromArray(names).blockingSubscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    }

    public static void main(String[] args) {
        hello("Hello World!");
    }
}

官方文档里面使用的是RxJava1,所以有区别。所以rxjava1可以使用

如何使用RxJava

使用RxJava需要创建一些Observable(用来发射数据),以各种方式转换这些Observable获取需要的数据项,再订阅这些数据项。

创建Observable

要创建一个Observable,您可以通过将一个函数传递给具有Observable行为的create()来手动实现Observable的行为,也可以通过使用为此目的设计的一些Observable操作符将现有数据结构转换为Observable 。

使用现有的数据创建Observable

我们的hello world程序就是使用已有的数据来创建Observable的。所以可以使用fromArray()(* 1.x版本为from*)或者just()方法类转换对象,列表,数组为发射这些对象的Observable对象:

Observable<String> stringObservable = Observable.fromArray("a", "b", "c");
Observable<String> oneJust = Observable.just("one Object");
Observable<String> twoJust = Observable.just("one Object", "two Object");

这些转换后的Observable会同步调用任何订阅者的onNext()方法,然后发送这些Observable的每个数据,最后调用订阅者的onCompleted()方法。

使用create()方法创建Observable

你可以使用create创建自定义的Observable来实现异步I/O,计算操作甚至无限的数据流。

同步的Observable示例

public class SyncObservable {
    public static Observable customObservableBlocking() {
        return Observable.create(aSubcriber -> {
            for (int i = 0; i < 50; i++) {
                if (!aSubcriber.isDisposed()) {
                    aSubcriber.onNext("value:" + i);
                }
            }
            if (!aSubcriber.isDisposed()) {
                aSubcriber.onComplete();
            }
        });
    }

    public static void main(String[] args) {
        customObservableBlocking().subscribe(System.out::println);
    }
}

异步的Observable示例

package second;


import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

/**
 * @author
 * @since 2017/7/6 下午8:03
 * Purpose
 */
public class AsyncObservable {

    public static Observable asyncObservable() {
        return Observable.create(aSubscriber->{
            new Runnable(){

                @Override
                public void run() {
                    for(int i=0;i<5;i++) {
                        aSubscriber.onNext("value:"+i);
                    }
                    System.out.println("runnable"+Thread.currentThread());
                }
            }.run();
            System.out.println("observable:"+Thread.currentThread());
            aSubscriber.onComplete();
        });
    }

    public static Observable asyncNonblockingObservabel() {
        return Observable.create(aSubscribe -> {
            for(int i=0;i<5;i++) {
                aSubscribe.onNext("NonBlockingValue:" + i);
            }
        });
    }
    public static void main(String[] args) {
        asyncObservable().subscribeOn(Schedulers.io()).subscribe(System.out::println);
        asyncNonblockingObservabel().subscribeOn(Schedulers.io()).subscribe(System.out::println);
        //main方法容易在线程还没有切换的时候结束,然后就看不到结果了。这个地方卡了我好久啊
        try {
            Thread.sleep(1000*2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

会发现输出的结果是异步的:

NonBlockingValue:0
NonBlockingValue:1
value:0
value:1
value:2
value:3
value:4
NonBlockingValue:2
runnableThread[RxCachedThreadScheduler-1,5,main]
NonBlockingValue:3
NonBlockingValue:4
observable:Thread[RxCachedThreadScheduler-1,5,main]
Process finished with exit code 0

上面介绍的例子为了简介都没有做错误处理,后面的例子会有错误处理。更多信息可以看Obserbable创建Observable页面。

使用操作符转换Observable

RxJava允许链式聚合操作转换和组合Observable。
下面的代码表示忽略前两个,从剩余的里面取两个:

public static void simpleComposition() {
        asyncNonblockingObservabel().skip(2).take(2)
                .map(stringValue -> {
                    return stringValue + "_xform";
                })
                .subscribe(it -> {
                    System.out.println("onNext=>" + it);
                });
    }

下面的交互图是官网的,他发射的是17个值,我懒得画图直接复制了。原理一样。


image.png

相关文章

  • RxJava并发parallel的使用

    概述 本文不描述RxJava是什么,以及如何使用的,重点讨论如何使用RxJava实现并发。即: 区分线程切换和并发...

  • 对rxjava实现思想的个人思考

    这篇文章不是讲解rxjava如何使用,而是对其设计的思考。使用过rxjava的同学们都注意到rxjava的操作符很...

  • 一、如何使用rxJava

    原文地址:https://github.com/ReactiveX/RxJava/wiki/How-To-Use-...

  • Android框架——RxJava(一)概述与基本使用

    RxJava(一)概述与基本使用 RxJava学习系列: RxJava(一)概述与基本使用 [RxJava(二)创...

  • RxJava

    其它文章 RxJava操作符大全 1、RxJava之一——一次性学会使用RxJava RxJava简单的使用和使用...

  • 如何使用RxJava

    Hello World: 如下hello world 的例子是用Java,Groovy、等语言实现的。从字符串列表...

  • 如何使用RxJava

    如何使用RxJava Hello World 下面是一段java代码,代码创建了一个可观察者(Observable...

  • RxJava初探

    我们在学习RxJava之前要了解一下,为什么使用RxJava, 使用RxJava有什么好处 RxJava特性: 轻...

  • RxJava + Retrofit 简单使用

    RxJava接入 RxJava 简单用法 Retrofit 简单使用 RxJava + Retrofit RxJa...

  • Android框架总结

    一、网络 1、项目为MVP架构,最好使用RxJava + Retrofit RxJava使用介绍点我查...

网友评论

    本文标题:一、如何使用rxJava

    本文链接:https://www.haomeiwen.com/subject/wcnhhxtx.html