美文网首页
Akka之Source相关API总结

Akka之Source相关API总结

作者: 乐言笔记 | 来源:发表于2017-11-15 16:14 被阅读179次

(1)apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed]
由Iterable创建Source。
例如:Source(Seq(1,2,3))
这类似于从迭代器开始, 但与此流的发布者直接连接的每个订阅者都将看到一个单独的元素流 (总是从开始处开始), 而不管它们何时订阅。

(2)fromIterator[T](f: () ⇒ Iterator[T]): Source[T, NotUsed]
从一个产生迭代器的函数开始一个新的Source。生成的元素流将继续,直到迭代器运行为空或在评估next()方法时失败。根据来自下游转换步骤的需求,将元素从迭代器中拉出。
例如:

val iterator = Iterator.iterate(false)(!_)
//创建一个无限迭代器,重复地将给定的函数应用于先前的结果。
//第一个参数是初始值,第二个参数是将重复应用的函数。
Source.fromIterator(() ⇒ iterator)
        .grouped(10)
        .runWith(Sink.head)
        .futureValue

结果是
immutable.Seq(false, true, false, true, false, true, false, true, false, true)

(3)cycle[T](f: () ⇒ Iterator[T]): Source[T, NotUsed]
从给定的元素(由产生迭代器的函数得到)开始一个循环的Source。元素的生产流将无限地重复由函数参数提供的元素序列。
例如:

Source.cycle(() ⇒ List(1, 2, 3).iterator)
         .grouped(9)
         .runWith(Sink.head)
         .futureValue

结果是
immutable.Seq(1, 2, 3, 1, 2, 3, 1, 2, 3)

(4)fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M]
由source形状(即只有一个出口)的图创建Source。
例如:

val pairs = Source.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._
 
  // prepare graph elements
  val zip = b.add(Zip[Int, Int]())
  def ints = Source.fromIterator(() => Iterator.from(1))
 
  // connect the graph
  ints.filter(_ % 2 != 0) ~> zip.in0
  ints.filter(_ % 2 == 0) ~> zip.in1
 
  // expose port
  SourceShape(zip.out)
})

(5)fromFuture[T](future: Future[T]): Source[T, NotUsed]
从给定的Future创建Source。当Future以成功值完成时(可能在物化Flow之前或者之后发生),流由一个元素组成。当Future以失败完成时,流将终止并带有一个failure。
例如:
Source.fromFuture(Future.successful("Hello Streams!"))

(6)fromCompletionStage[T](future: CompletionStage[T]): Source[T, NotUsed]
类似于Scala的Future创建Source,此处是由Java的CompletionStage创建Source。

(7)fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): Source[T, Future[M]]
由给定的future source形状的图创建Source。一旦给定的Future成功完成,则元素从异步source流出。如果Future失败,则流失败。

(8)fromSourceCompletionStage[T, M](completion: CompletionStage[_ <: Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]]
类似于fromFutureSource

(9)tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable]
元素定期以指定的间隔发出。
"滴答" 元素将被传递到请求任何元素的下游用户。
如果使用者在生成滴答元素时没有请求任何元素, 它以后将不会接收该滴答元素。它将在请求更多元素时立即接收新的滴答元素。
例如:
Source.tick(initialDelay = 2.second, interval = 1.second, "message!")

(10)single[T](element: T): Source[T, NotUsed]
由一个元素创建Source。
例如:Source.single("only one element")

(11)repeat[T](element: T): Source[T, NotUsed]
创建一个连续发送给定元素的Source。
例如:

Source.repeat(42)
        .grouped(3)
        .runWith(Sink.head) 
        .futureValue

结果是:
immutable.Seq(42,42,42)

(12)unfold[S, E](s: S)(f: S ⇒ Option[(S, E)]): Source[E, NotUsed]
创建一个Source,它会将S类型的值展开成一对下一个状态S,'E`类型的输出元素。
例如,10M以下的所有斐波纳契数字:

Source.unfold(0 → 1) {
    case (a, _) if a > 10000000 ⇒ None
    case (a, b) ⇒ Some((b → (a + b)) → a)
 }

(13)unfoldAsync[S, E](s: S)(f: S ⇒ Future[Option[(S, E)]]): Source[E, NotUsed]
与unfold相同,但是使用一个异步函数来产生下一个状态元素元组。

Source.unfoldAsync(0 → 1) {
     case (a, _) if a > 10000000 ⇒ Future.successful(None)
     case (a, b) ⇒ Future{
       Thread.sleep(1000)
       Some((b → (a + b)) → a)
     }
 }

(14)empty[T]: Source[T, NotUsed]
创建一个没有元素的Source,即为每个连接的Sink立即完成的空流。
例如:Source.empty

(15)maybe[T]: Source[T, Promise[Option[T]]]
创建一个Source,它物化为一个scala.concurrent.Promise,它控制什么元素从Source发出。

如果物化的promise以Some完成,那么该值将在下游生成,然后是完成。
如果物化的promise以None完成,那么下游不会产生值,并立即发出完成信号。
如果物化的promise以failure完成,那么返回的source将以那个错误终止。
如果在promise完成前,source的下游取消,那么promise将以None完成。

(16)failed[T](cause: Throwable): Source[T, NotUsed]
创建一个Source,它立刻终止流,并将错误cause给每一个连接的Sink。
例如:

val ex = new Exception("buh")
Source.failed(ex)
     .flatMapMerge(1, identity)
     .runWith(Sink.head)
      .futureValue

(17)lazily[T, M](create: () ⇒ Source[T, M]): Source[T, Future[M]]
创建一个Source,直到下游有需求才物化,当source物化时,物化的future将以其值完成,如果下游取消或失败没有任何需求,则不会调用创建工厂,物化的Future是失败。

(18)asSubscriber[T]: Source[T, Subscriber[T]]
创建一个Source,其物化为一个org.reactivestreams.Subscriber

(19)actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef]
如何先定义流,而后给流传递数据呢?答案就是Source.actorRef。说明:Source.actorRef没有背压策略。
创建一个Source,其物化为一个akka.actor.ActorRef

如果下游有需求, 发送到该actor的消息将被发送到流中, 否则它们将被缓冲, 直到收到需求请求为止。

根据定义的akka.stream.OverflowStrategy,如果缓冲区中没有可用空间, 则可能会丢弃元素。

策略akka.stream.OverflowStrategy.backpressure不受支持, 如果将其作为参数传递, 则会抛出异常 llegalArgument ( "Backpressure overflowStrategy not supported")。

可以使用0的bufferSize禁用缓冲区, 如果下游没有需求, 则会丢弃接收到的消息。当 bufferSize是 0, overflowStrategy并不重要。在此源之后添加一个异步边界;因此, 假定下游总是会产生需求是绝不会安全的。

通过将akka.actor.Status.Failure发送到actor引用, 从而失败来完成流。在标示完成前,以防actor仍消耗其内部缓冲区(在收到一个akka.actor.Status.Success之后),它收到一个akka.actor.Status.Failure,故障将为 立即向下游发信号(而不是完成信号)。

当流完成、失败或从下游取消时, actor将被停止, 即您可以在发生此情况时观察它以获得通知。

val stringSourceinFuture=Source.actorRef[String](100,OverflowStrategy.fail) // 缓存最大为100,超出的话,将以失败告终
  val hahaStrSource=stringSourceinFuture.filter(str=>str.startsWith("haha")) //source数据流中把不是以"haha"开头的字符串过滤掉
  val actor=hahaStrSource.to(Sink.foreach(println)).run()
  actor!"asdsadasd"
  actor!"hahaasd"
  actor!Success("ok")// 数据流成功完成并关闭

(20)combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ⇒ Graph[UniformFanInShape[T, U], NotUsed]): Source[U, NotUsed]
MergeConcat按照扇入策略将多个Source合并,返回一个Source。
例如:

val sourceOne = Source(List(1))
val sourceTwo = Source(List(2))
val merged = Source.combine(sourceOne, sourceTwo)(Merge(_))

val mergedResult: Future[Int] = merged.runWith(Sink.fold(0)(_ + _))

(21)zipN[T](sources: immutable.Seq[Source[T, _]]): Source[immutable.Seq[T], NotUsed]
将多个流的元素合并到一个序列流中。

val sources = immutable.Seq(
        Source(List(1, 2, 3)),
        Source(List(10, 20, 30)),
        Source(List(100, 200, 300)))

Source.zipN(sources)
        .runWith(Sink.seq)
        .futureValue

结果是:

immutable.Seq(
          immutable.Seq(1, 10, 100),
          immutable.Seq(2, 20, 200),
          immutable.Seq(3, 30, 300))

(22)zipWithN[T, O](zipper: immutable.Seq[T] ⇒ O)(sources: immutable.Seq[Source[T, _]]): Source[O, NotUsed]
使用组合函数将多个流的元素合并到一个序列流中。

val sources = immutable.Seq(
        Source(List(1, 2, 3)),
        Source(List(10, 20, 30)),
        Source(List(100, 200, 300)))

Source.zipWithN[Int, Int](_.sum)(sources)
        .runWith(Sink.seq)
        .futureValue

结果是:

immutable.Seq(111, 222, 333)

(23)queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]]
创建一个Source,它物化为````akka.stream.scaladsl.SourceQueue```。

您可以将元素推送到队列中, 如果下游有需求, 则它们将被发送到流中, 否则它们将被缓冲, 直到收到需求请求为止。如果下游终止, 缓冲区中的元素将被丢弃。

根据定义的akka.stream.OverflowStrategy,如果缓冲区中没有可用空间, 则可能会丢弃元素。

确认机制可用。
akka.stream.scaladsl.SourceQueue.offer返回Future [QueueOfferResult],如果元素被添加到缓冲区或发送到下游,则它将以QueueOfferResult.Enqueued完成。 如果元素被丢弃,它将以QueueOfferResult.Dropped完成。当流失败时,以QueueOfferResult.Failure完成 或者下游完成时,以
QueueOfferResult.QueueClosed完成。

当缓冲区已满时,策略akka.stream.OverflowStrategy.backpressure不会完成最后offer():Future调用。

可以使用akka.stream.scaladsl.SourceQueue.watchCompletion查看流的可访问性。当流完成时,它返回一个以成功完成的future或者当流失败时,它返回一个以失败完成的future。

可以通过设置bufferSize为0关闭缓冲区,然后接收到的消息将等待下游的需求,如果有另一个消息等待下游需求,这种情况下结果将根据溢出策略完成。

(24)unfoldResource[T, S](create: () ⇒ S, read: (S) ⇒ Option[T], close: (S) ⇒ Unit): Source[T, NotUsed]
从某个可以打开、读取、关闭的资源,创建一个Source。
以阻塞的方式与资源交互。

可以使用监管策略来处理read函数的异常。所有由createclose抛出的异常,都将使流失败。

Restart监管策略将关闭并再次创建阻塞IO。默认策略是Stop,意味着在read函数出现错误流将终止。

通过变更akka.stream.blocking-io-dispatcher或者为提供的Source使用ActorAttributes设置,来配置默认的调度器。

遵守ActorAttributes. SupervisionStrategy属性。

(25)unfoldResourceAsync[T, S](create: () ⇒ Future[S], read: (S) ⇒ Future[Option[T]], close: (S) ⇒ Future[Done]): Source[T, NotUsed]
类似于unfoldResource,但是使用返回Futures而不是纯值的函数。

相关文章

网友评论

      本文标题:Akka之Source相关API总结

      本文链接:https://www.haomeiwen.com/subject/zmdkpxtx.html