美文网首页squbsScala In Action
squbs-20. 流的生命周期

squbs-20. 流的生命周期

作者: 吕亦行 | 来源:发表于2017-02-03 09:35 被阅读26次

    原文地址:Streams Lifecycle
    Akka Streams/Reactive流需要和服务的Runtime Lifecycle 集成。为了这个,一个自动化的或半自动话的集成通过 PerpetualStream实现。为了直接对流源的细粒度控制, LifecycleManaged提供一个包装,可以控制任何源组件的可能发生的停止或关闭,以便流可以优雅的启动/关闭。

    永久流(PerpetualStream)

    PerpetualStream是一个特性(triat),允许声明一个可以启动的流,当服务优雅的启动和关闭,不会在服务关闭的时候丢失消息。

    使用 PerpetualStream的流将符合以下要求,将允许PerpetualStream 中的钩子通过最小自定义重写数无缝的工作:

    1. killSwitch.flow作为在source之后的流的第一个阶段。killSwitch 是一个标准的Akka SharedKillSwitch,通过PerpetualStream特性(trait)提供。
    2. stream实现 FutureProduct通过它们最后的元素。 ProductTuple, List和其他的超类。Sink物化Future是很自然的。如果更多的物化值需要,它通常来自某种形式的 ProductSink成为流最后的元素,也通常物化Product的最后一个元素。
    3. Future呈现流的完结(物化值或最后的物化值)。换句话说,流结束时future完成。

    如果满足以上所有要求,没有其他自定义重写用于PerpetualStream函数。下面的代码符合PerpetualStream

    class WellBehavedStream extends PerpetualStream[Future[Done]] {
    
      def generator = Iterator.iterate(0) { p => 
        if (p == Int.MaxValue) 0 else p + 1 
      }
    
      val source = Source.fromIterator(generator _)
    
      val ignoreSink = Sink.ignore
      
      override def streamGraph = RunnableGraph.fromGraph(GraphDSL.create(ignoreSink) {
        implicit builder =>
          sink =>
            import GraphDSL.Implicits._
            source ~> killSwitch.flow[Int] ~> sink
            ClosedShape
      })
    }
    

    这就是。这个流行为良好,因为它物化 sink物化值,即Future[Done]

    关闭重写

    有时不可能定义良好的流。举个例子,Sink可能不会物化 Future或你需要在关闭时做更多的清理。因为这个原因,可以通过重写shutdown如下:

      override def shutdown(): Future[Done] = {
        // Do all your cleanup
        // For safety, call super
        super.shutdown()
        // The Future from super.shutdown may not mean anything.
        // Feel free to create your own future that identifies the
        // stream being done. Return your Future instead.
      }
    

    shutdown需要使用如下:

    1. 初始化流的关闭
    2. 做其他清理
    3. 当流结束处理,返回future

    注意:建议任何情况下调用 super.shutdown。调用是无害的或有其他副作用。

    备用关闭机制

    相比于使用killSwitchsource 可以提供一个更好方式来正确的关闭。在这种情况下,仅使用source的关闭机制和重写 shutdown来发起source的关闭。killSwitch 依然未使用。

    Kill Switch 重写

    如果killSwitch需要跨多流共享,你可以重写 killSwitch来反射共享实例

      override lazy val killSwitch = mySharedKillSwitch
    

    接收消息并将其转发到流

    一些流从actor消息中获取输入。虽然可能一些流配置可以物化source中的ActorRef,然而很难调用这个actor。因为PerpetualStream自身是个actor,他可以拥有一个公开的地址/路径并且转发消息至流source。这样做,我们需要重写receive 如下:

      override def receive = {
        case msg: MyStreamMessage =>
          val (sourceActorRef, _) = matValue
          sourceActorRef forward msg
      }
    

    处理流错误

    PerpetualStream默认从错误中恢复不会被流stage捕获。这个消息引起忽略异常。如果需要一个不同的行为的话,重写 decider

      override def decider: Supervision.Decider = { t => 
        log.error("Uncaught error {} from stream", t)
        t.printStackTrace()
        Restart
      }
    

    Restart将重启有错误的stage,而不会重启stream。查看Supervision Strategies获得可能的策略。

    结合一下

    下面的例子尽可能重写上面讨论的内容:

    class MsgReceivingStream extends PerpetualStream[(ActorRef, Future[Done])] {
    
      val actorSource = Source.actorPublisher[MyStreamMsg](Props[MyPublisher])
      val ignoreSink = Sink.ignore[MyStreamMsg]
      
      override def streamGraph = RunnableGraph.fromGraph(GraphDSL.create(actorSource, ignoreSink)((_, _)) {
        implicit builder =>
          (source, sink) =>
            import GraphDSL.Implicits._
            source ~> sink
            ClosedShape
      })
      
      // Just forward the message to the stream source
      override def receive = {
        case msg: MyStreamMsg =>
          val (sourceActorRef, _) = matValue
          sourceActorRef forward msg
      }
      
      override def shutdown() = {
        val (sourceActorRef, _) = matValue
        sourceActorRef ! cancelStream
        // Sink materialization conforms
        // so super.shutdown() will give the right future
        super.shutdown()
      }
    }
    

    制作一个Lifecycle-Sensitive source

    如果你期望拥有一个source,完全链接squbs的生命周期的动作,你可以将source包裹 LifecycleManaged:

    Scala

    val inSource = <your-original-source>
    val aggregatedSource = LifecycleManaged().source(inSource)
    

    Java

    final Source inSource = <your-original-source>
    final Source aggregatedSource = new LifecycleManaged(system).source(inSource);
    

    这个结果source将集成source实例化成一个(T, ActorRef)TinSource 的实例化类型, ActorRef 是 trigger actor的实例化类型(从Unicomplex接收事件,squbs的容器)

    这个集成source直到生命周期状态变成Active才开始从源source中发出,并且在生命周期状态成为 Stopping之后停止发出元素和关闭流。

    个性化集成Triggered Source

    如果你想要你的flow启动/停用个性化的事件,你可以整合一个个性化的trigger source,元素true将会启用,元素false将会停用。

    注意 Trigger有一个参数eagerComplete默认为false在scala中,而在JAVA中需要传递。如果eagerComplete设置为false,trigger source 的结束或终止将脱离这个触发。如果设置为true,这个终止会完成这个流。

    Scala

    import org.squbs.stream.TriggerEvent._
    
    val inSource = <your-original-source>
    val trigger = <your-custom-trigger-source>.collect {
      case 0 => DISABLE
      case 1 => ENABLE
    }
    
    val aggregatedSource = new Trigger().source(inSource, trigger)
    

    Java

    import static org.squbs.stream.TriggerEvent.DISABLE;
    import static org.squbs.stream.TriggerEvent.ENABLE;
    
    final Source<?, ?> inSource = <your-original-source>;
    final Source<?, ?> trigger = <your-custom-trigger-source>.collect(new PFBuilder<Integer, TriggerEvent>()
        .match(Integer.class, p -> p == 1, p -> ENABLE)
        .match(Integer.class, p -> p == 0, p -> DISABLE)
        .build());
    
    final Source aggregatedSource = new Trigger(false).source(inSource, trigger);
    

    个性化的生命周期事件触发

    如果你想要在ActiveStopping之外响应更多生命周期,举个例子,你想要Failed来同时关闭flow,你可以修改生命周期事件映射。

    import org.squbs.stream.TriggerEvent._
    
    val inSource = <your-original-source>
    val trigger = Source.actorPublisher[LifecycleState](Props.create(classOf[UnicomplexActorPublisher]))
      .collect {
        case Active => ENABLE
        case Stopping | Failed => DISABLE
      }
    
    val aggregatedSource = new Trigger().source(inSource, trigger)
    

    相关文章

      网友评论

        本文标题:squbs-20. 流的生命周期

        本文链接:https://www.haomeiwen.com/subject/ethrittx.html