美文网首页squbsScala In Action
squbs-20. 流的生命周期

squbs-20. 流的生命周期

作者: 吕亦行 | 来源:发表于2017-02-03 09:35 被阅读26次

原文地址:Streams Lifecycle
Akka Streams/Reactive流需要和服务的Runtime Lifecycle 集成。为了这个,一个自动化的或半自动话的集成通过 PerpetualStream实现。为了直接对流源的细粒度控制, LifecycleManaged提供一个包装,可以控制任何源组件的可能发生的停止或关闭,以便流可以优雅的启动/关闭。

永久流(PerpetualStream)

PerpetualStream是一个特性(triat),允许声明一个可以启动的流,当服务优雅的启动和关闭,不会在服务关闭的时候丢失消息。

使用 PerpetualStream的流将符合以下要求,将允许PerpetualStream 中的钩子通过最小自定义重写数无缝的工作:

  1. killSwitch.flow作为在source之后的流的第一个阶段。killSwitch 是一个标准的Akka SharedKillSwitch,通过PerpetualStream特性(trait)提供。
  2. stream实现 FutureProduct通过它们最后的元素。 ProductTuple, List和其他的超类。Sink物化Future是很自然的。如果更多的物化值需要,它通常来自某种形式的 ProductSink成为流最后的元素,也通常物化Product的最后一个元素。
  3. Future呈现流的完结(物化值或最后的物化值)。换句话说,流结束时future完成。

如果满足以上所有要求,没有其他自定义重写用于PerpetualStream函数。下面的代码符合PerpetualStream

class WellBehavedStream extends PerpetualStream[Future[Done]] {

  def generator = Iterator.iterate(0) { p => 
    if (p == Int.MaxValue) 0 else p + 1 
  }

  val source = Source.fromIterator(generator _)

  val ignoreSink = Sink.ignore
  
  override def streamGraph = RunnableGraph.fromGraph(GraphDSL.create(ignoreSink) {
    implicit builder =>
      sink =>
        import GraphDSL.Implicits._
        source ~> killSwitch.flow[Int] ~> sink
        ClosedShape
  })
}

这就是。这个流行为良好,因为它物化 sink物化值,即Future[Done]

关闭重写

有时不可能定义良好的流。举个例子,Sink可能不会物化 Future或你需要在关闭时做更多的清理。因为这个原因,可以通过重写shutdown如下:

  override def shutdown(): Future[Done] = {
    // Do all your cleanup
    // For safety, call super
    super.shutdown()
    // The Future from super.shutdown may not mean anything.
    // Feel free to create your own future that identifies the
    // stream being done. Return your Future instead.
  }

shutdown需要使用如下:

  1. 初始化流的关闭
  2. 做其他清理
  3. 当流结束处理,返回future

注意:建议任何情况下调用 super.shutdown。调用是无害的或有其他副作用。

备用关闭机制

相比于使用killSwitchsource 可以提供一个更好方式来正确的关闭。在这种情况下,仅使用source的关闭机制和重写 shutdown来发起source的关闭。killSwitch 依然未使用。

Kill Switch 重写

如果killSwitch需要跨多流共享,你可以重写 killSwitch来反射共享实例

  override lazy val killSwitch = mySharedKillSwitch

接收消息并将其转发到流

一些流从actor消息中获取输入。虽然可能一些流配置可以物化source中的ActorRef,然而很难调用这个actor。因为PerpetualStream自身是个actor,他可以拥有一个公开的地址/路径并且转发消息至流source。这样做,我们需要重写receive 如下:

  override def receive = {
    case msg: MyStreamMessage =>
      val (sourceActorRef, _) = matValue
      sourceActorRef forward msg
  }

处理流错误

PerpetualStream默认从错误中恢复不会被流stage捕获。这个消息引起忽略异常。如果需要一个不同的行为的话,重写 decider

  override def decider: Supervision.Decider = { t => 
    log.error("Uncaught error {} from stream", t)
    t.printStackTrace()
    Restart
  }

Restart将重启有错误的stage,而不会重启stream。查看Supervision Strategies获得可能的策略。

结合一下

下面的例子尽可能重写上面讨论的内容:

class MsgReceivingStream extends PerpetualStream[(ActorRef, Future[Done])] {

  val actorSource = Source.actorPublisher[MyStreamMsg](Props[MyPublisher])
  val ignoreSink = Sink.ignore[MyStreamMsg]
  
  override def streamGraph = RunnableGraph.fromGraph(GraphDSL.create(actorSource, ignoreSink)((_, _)) {
    implicit builder =>
      (source, sink) =>
        import GraphDSL.Implicits._
        source ~> sink
        ClosedShape
  })
  
  // Just forward the message to the stream source
  override def receive = {
    case msg: MyStreamMsg =>
      val (sourceActorRef, _) = matValue
      sourceActorRef forward msg
  }
  
  override def shutdown() = {
    val (sourceActorRef, _) = matValue
    sourceActorRef ! cancelStream
    // Sink materialization conforms
    // so super.shutdown() will give the right future
    super.shutdown()
  }
}

制作一个Lifecycle-Sensitive source

如果你期望拥有一个source,完全链接squbs的生命周期的动作,你可以将source包裹 LifecycleManaged:

Scala

val inSource = <your-original-source>
val aggregatedSource = LifecycleManaged().source(inSource)

Java

final Source inSource = <your-original-source>
final Source aggregatedSource = new LifecycleManaged(system).source(inSource);

这个结果source将集成source实例化成一个(T, ActorRef)TinSource 的实例化类型, ActorRef 是 trigger actor的实例化类型(从Unicomplex接收事件,squbs的容器)

这个集成source直到生命周期状态变成Active才开始从源source中发出,并且在生命周期状态成为 Stopping之后停止发出元素和关闭流。

个性化集成Triggered Source

如果你想要你的flow启动/停用个性化的事件,你可以整合一个个性化的trigger source,元素true将会启用,元素false将会停用。

注意 Trigger有一个参数eagerComplete默认为false在scala中,而在JAVA中需要传递。如果eagerComplete设置为false,trigger source 的结束或终止将脱离这个触发。如果设置为true,这个终止会完成这个流。

Scala

import org.squbs.stream.TriggerEvent._

val inSource = <your-original-source>
val trigger = <your-custom-trigger-source>.collect {
  case 0 => DISABLE
  case 1 => ENABLE
}

val aggregatedSource = new Trigger().source(inSource, trigger)

Java

import static org.squbs.stream.TriggerEvent.DISABLE;
import static org.squbs.stream.TriggerEvent.ENABLE;

final Source<?, ?> inSource = <your-original-source>;
final Source<?, ?> trigger = <your-custom-trigger-source>.collect(new PFBuilder<Integer, TriggerEvent>()
    .match(Integer.class, p -> p == 1, p -> ENABLE)
    .match(Integer.class, p -> p == 0, p -> DISABLE)
    .build());

final Source aggregatedSource = new Trigger(false).source(inSource, trigger);

个性化的生命周期事件触发

如果你想要在ActiveStopping之外响应更多生命周期,举个例子,你想要Failed来同时关闭flow,你可以修改生命周期事件映射。

import org.squbs.stream.TriggerEvent._

val inSource = <your-original-source>
val trigger = Source.actorPublisher[LifecycleState](Props.create(classOf[UnicomplexActorPublisher]))
  .collect {
    case Active => ENABLE
    case Stopping | Failed => DISABLE
  }

val aggregatedSource = new Trigger().source(inSource, trigger)

相关文章

  • squbs-20. 流的生命周期

    原文地址:Streams LifecycleAkka Streams/Reactive流需要和服务的Runtime...

  • 漫谈 Activity

    生命周期 大部分的 Android 开发都了解 activity 的生命周期,甚至倒背如流。 根据不同的状态可以将...

  • 那些生命的足迹

    持续更新...... 1.关于生命周期的一些想法 生命周期描述的"组件"由创建->...->销毁的过程,掌握这些流...

  • 《软件工程》复习提纲

    整理自教材、PPT以及坊间流传的材料。 软件工程概念 软件生命周期模型 软件过程 软件生命周期的工作流 需求 软件...

  • 投资一句话

    买股票就是买公司 买公司就是买公司未来的现金流(的折现) 这里的现金流指的是净现金流 未来指的是公司的整个生命周期...

  • Android的Activity控件详解

    一.Activity的概念与Activity的生命周期图: 18364230.jpg 二.Activity的创建流...

  • 深入react技术栈(2)

    第一章(Raact数据流、React生命周期、React与DOM) React数据流 在React中,数据是自项向...

  • Android源码分析——Activity启动与生命周期

    通过分析源码,了解AstartActivity()方法背后的逻辑,以及生命周期是如何执行的 Activity启动流...

  • Vue 生命周期

    此文章面向Vue2.0 什么是生命周期 生命周期是一种拟人化的说法,每个Vue的实例或组件从创建到废除要经历多个流...

  • Vue生命周期-挂载流

    生命周期 又名生命周期钩子,生命周期回调函数,生命周期函数特定时间做特定的事,要把特定的事写特定的函数里函数中的t...

网友评论

    本文标题:squbs-20. 流的生命周期

    本文链接:https://www.haomeiwen.com/subject/ethrittx.html