美文网首页
平行流、字符串、实用操作符

平行流、字符串、实用操作符

作者: CyrusChan | 来源:发表于2018-09-29 15:40 被阅读51次

    介绍:

    2.0.5版本引入的ParallelFlowable API 允许并行执行一系列选择的操作符,例如 map,filter,concatMap,flatMap,collect,reduce 等等。 注意:对于Flowable(一个特定的子域语言)这是一个并行的模块而不是新的响应基础类型。

    因此,几个典型的操作符例如take,skip 和许多其他的是不可用的且这儿没有ParallelObservable ,因为 像期望那样内部的并行操作符队列不发生洪泛,背压是至关重要的,我们希望并行是因为单线程处理数据是缓慢的。

    最简单的方式进入并行世界是使用 Flowable.parallel:

    ParallelFlowable<Integer> source = Flowable.range(1, 1000).parallel();
    

    默认情况下,并行级别被设置为可用的cpu数量。

    (Runtime.getRuntime().availableProcessors()) 和 从原序列预取的数量 被设置到Flowable.bufferSize() (128).两者都可以通过parallel()的重载函数来指定。

    ParallelFlowable 遵循Flowable同样的异步参数原则,因此, parallel() 本身不引入源序列的异步消费,但仅准备并行流,异步通过runOn(Scheduler) 操作符被定义。

    ParallelFlowable<Integer> psource = source.runOn(Schedulers.io());
    

    并行等级(ParallelFlowable.parallelism()) 不必匹配Scheduler的并行等级。runOn操作符将使用和并行源定义的一样多的Scheduler.Worker实例。 这允许ParallelFlowable 为CPU密集的任务工作通过Schedulers.computation(),阻塞IO 通过Schedulers.io() 绑定任务且通过TestScheduler单元测试。你也可以指定预取数量在runOn上。

    一旦必要的平行操作已经被应用,你可以通过ParallelFlowable.sequential()操作符返回连续的Flowable

    Flowable<Integer> result = psource.filter(v -> v % 3 == 0).map(v -> v * v).sequential();
    

    注意: sequential 不保证在并行运算符之间流动的值之间的顺序。

    String Observables

    StringObservable 类包含一些函数,这些函数代表一些操作符,特别是处理基于字符串的序列和流的Obseravable.这些包括:

    • byLine( )— 通过把源序列处理为一个流且以换行符为分割来把一个Observable的字符串转换成行的Observable
    • decode( )— 把一个多字节字符转换成一个发射遵守字符边界的字节数组的Observable
    • encode( )— 把一个发射字符串的Observable转换成一个遵守源字符串中多字节字符的字符边界的字节数组的Observable.
    • from( )— 转换一个字符流或者一个Reader 为一个发射字节数组或者字符串的Observable.
    • join( )— 把一个发射字符串序列的Observable转换成一个发射一个单一字符串,该字符串是由那些字符串序列拼接而成。
    • split( )— 把一个Observable的字符串转换成一个Observable,这个Observable把源序列当做一个流,且该流是以特定正则边界分割开
    • stringConcat( )— 把一个发射一系列字符串的Observable转换成一个发射由上述字符串序列拼接成的单一字符串的Observable

    Transforming Observables

    这部分说明你可以转换被Observable发射的item的操作符

    • map( )— 通过对每一个被Observable发射的item应用一个函数来转换他们
    • flatMap( )****, ****concatMap( )****, and ****flatMapIterable( ) —把被Observable发射的多个item转换成多个Observable或多个Iterable,接着压成一个单一的Observable.
    • switchMap( )— 把被Observable发射的多个item转换成多个Observable,并且镜像那些最近被转换的Observable发射的item
    • scan( )— 对每一个被Observable发射的item值应用一个函数,且发射后续的每一个值。
    • groupBy( )— 把一个Observable划分成一系列通过key组织的Observable,这些Observable从原始的Observable发射成群的item。
    • buffer( )— 周期性的从一个Observable聚集item成一个bundle,且发射这些bundle而不是一次发射一个item。
    • window( )— 周期性的从一个Obseravble再细分item成一个Observable window 且发射这些Window而不是一次发射一个item.
    • cast( )— 在再次发射他们之前,把来自于源Observable的所有item转换成特殊的类型。

    Observable Utility Operators

    这部分列出了不同的为Observable工作的实用操作符。

    • materialize( )— 把一个Observable转换一系列通知。
    • dematerialize( )— 把一个物化的Obsrevable回退到非物化的形式
    • timestamp( )— 为每一个被Observable发出的item附上时间戳
    • serialize( )— 强制一个Observable执行序列化调用
    • cache( )— 记住被Observable发射的item序列且为未来的订阅者发射相同的序列。
    • observeOn( )— 指定Subscriber应该在哪个Scheduler观察Observable.
    • subscribeOn( )— 当一个订阅被执行的时候,指定一个Observable应该使用哪个Scheduler。
    • doOnEach( )— 无论何时Observable发射一个item,注册一个action.去执行
    • doOnNext( )— 就在Observable传入onNext事件顺流而下之前,注册一个action去执行
    • doAfterNext( ) —就在Observable传入onNext事件顺流而下之后,注册一个action去执行
    • doOnCompleted( )— 当一个Observable成功完成,注册一个action去执行
    • doOnError( )— 当一个Observable带错误完成,注册一个action去执行
    • doOnTerminate( )— 在一个Observable结束之前,不管成功或是出错,注册一个action去调用。
    • doAfterTerminate( )— 在一个Observable结束之后,不管成功或是出错,注册一个action去调用。
    • doOnSubscribe( )— 当一个Observabler订阅一个Observable,注册一个action去调用。
    • 1.xdoOnUnsubscribe( ) — 当一个Observable取消订阅一个Observable,注册一个action去调用
    • finallyDo( )— 当一个Observable完成,注册一个action去执行。
    • doFinally( )— 当一个Observable结束或者被处理,注册一个action去执行
    • delay( )— 从一个Obseravble按一定数量向将来转移一些发射对象
    • delaySubscription( )— 持有一个Subscriber的订阅请求一段指定的时间在传入它到源Observable之前。
    • timeInterval( )— 在源Observable的两个连续的发射之间暂停一段时间
    • using( )— 创建一个与Observable相同生命周期的可支配的资源
    • single( )— 如果Observable再发射一个item后完成了,那么返回这个item,否则抛出一个异常
    • singleOrDefault( ) —如果Observable再发射一个item后完成了,那么返回这个item,否则返回默认item
    • repeat( )— 创建一个重复发射特殊item或者item序列的 Observable
    • repeatWhen( ) —创建一个重复发射特殊item或者item序列的 Observable,取决于第二个Observable的发射

    Plugins

    插件允许你从几个方面改变Rxjava默认的行为

    通过改变一系列默认的计算,i/o 和新线程调度器

    通过注册RxJava可能遇到的特别错误的处理器

    通过注册能够注意到几个常规RxJava活动的发生的函数

    RxJavaHooks

    新的RxJavaHooks允许你连接到 Observable、Single、Completable类型及被Schedulers返回的Scheduler们 的生命周期 且为不可交付的问题提供一个全方位的解决方案。

    你现在可以在运行期改变这些hook 且不必再通过系统参数准备hook。因此用户可能仍要依赖于旧的hook系统,RxjavaHooks 默认委托给旧的Hook。

    RxJavaHook 有不同种类的hook的setter和getter

    image.png

    读取和改变这些hook是线程安全的

    你也可以通过clear()清除所有的hook 或者 通过reset()重置到默认的行为(委托给旧的RxJavaPlugin系统)

    例子:

    RxJavaHooks.setOnObservableCreate(o -> { 
        System.out.println("Creating " + o.getClass());
        return o; 
    });
    try {
        Observable.range(1, 10)
        .map(v -> v * 2)
        .filter(v -> v % 4 == 0)
        .subscribe(System.out::println);
    } finally {
        RxJavaHooks.reset();
    }
    

    此外,RxJavaHook 提供所谓的装配跟踪特性.这嵌入一个自定义Observable,Single,Completable到他们捕捉目前栈迹的链中 当这些操作符被实例化的时候(装配时间)。无论何时 一个错误信号通过onError被发出,这些附加到装配时间的栈轨迹的中间件最终造成这个异常。这可能帮助定位代码库的问题序列。

    Example:

    RxJavaHooks.enableAssemblyTracking();
    try {
        Observable.empty().single()
        .subscribe(System.out::println, Throwable::printStackTrace);
    } finally {
       RxJavaHooks.resetAssemblyTracking();
    }
    

    这将打印像下面的结果:

    java.lang.NoSuchElementException
    at rx.internal.operators.OnSubscribeSingle(OnSubscribeSingle.java:57)
    ...
    Assembly trace:
    at com.example.TrackingExample(TrackingExample:10)
    

    栈轨迹字符串在支持debug和发现运行链中不同操作符的状态也是可用的。

    栈轨迹通过移除不相关的入口例如线程入口,单元测试和跟踪系统入口本身 来被过滤去减少噪音

    RxJavaSchedulersHook

    Deprecated

    这个插件允许你去重载默认的计算,i/o 和新线程Scheduler .集成 类RxJavaSchedulersHook 和重写如下方法 :

    • Scheduler getComputationScheduler( )
    • Scheduler getIOScheduler( )
    • Scheduler getNewThreadScheduler( )
    • Action0 onSchedule(action)

    接着按如下步骤:

    1. 创建一个新的你已经实现的RxJavaDefaultSchedulers 子类的对象

    2. 通过RxJavaPlugins.getInstance( )获取全局的RxJavaPlugins 实例

    3. 传入默认的scheduler对象到该实例的registerSchedulersHook( )函数

    当你完成了这些,RxJava 开始使用你的函数返回的Scheduler而不是它内嵌的默认的

    RxJavaErrorHandler

    Deprecated

    该插件允许你注册一个将会处理错误的函数,该错误将被传到SafeSubscriber.onError(Throwable)。(SafeSubscriber 用于封装即将到来的Subscriber当subscribe()被调用)。为此,继承类RxJavaErrorHandler``且 重写该函数:

    • void handleError(Throwable e)

    接着按如下步骤:

    1. 创建一个你实现的RxJavaErrorHandler 子类的新的对象

    2. 通过RxJavaPlugins.getInstance( )获取全局的RxJavaPlugins 实例

    3. 传入错误处理器到该实例的registerErrorHandler( )函数

    当你完成了这些,RxJava将开始使用你的错误处理器来处理传给SafeSubscriber.onError(Throwable)的错误。

    例如:

    RxJavaPlugins.getInstance().reset();
    
    RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
        @Override
        public void handleError(Throwable e) {
            e.printStackTrace();
        }
    });
    
    Observable.error(new IOException())
    .subscribe(System.out::println, e -> { });
    

    然而,该调用和操作符链一般情况下将不会在每个阶段被触发。

    Observable.error(new IOException())
    .map(v -> "" + v)
    .unsafeSubscribe(System.out::println, e -> { });
    

    RxJavaObservableExecutionHook

    Deprecated

    该插件允许你注册RxJava将调用某些常规的RxJava活动的函数,例如:日志或者metrics-collection purposes。为此,继承RxJavaObservableExecutionHook 类 且重写这些方法中的任何一个或全部

    image.png

    接着按如下步骤:

    1. 创建一个新的你实现的RxJavaObservableExecutionHook 子类的实例。

    2. 通过RxJavaPlugins.getInstance( )或者全局的RxJavaPlugins 实例

    3. 传入你的执行hook对象到该实例的registerObservableExecutionHook( )函数中。

    当你完成了这些,RxJava将会调用你的函数当遇到特定的被设计需要注意的条件

    相关文章

      网友评论

          本文标题:平行流、字符串、实用操作符

          本文链接:https://www.haomeiwen.com/subject/nnxboftx.html