美文网首页
Akka Stream和Scala Future

Akka Stream和Scala Future

作者: Queen〇fLaponia | 来源:发表于2018-06-15 14:23 被阅读0次

  单体不是单机、单机不一定是单体,单体指铁板一块、无弹性、无恢复能力、一损俱损的系统,对于基于RPC分布式结构远程的失败和延时会严重影响本地的,InfoQ发明了个新名词叫做“分布式单体”,这只是单机上的错误在分布式场景下重演。单体应用中局部的异常、错误和阻塞是会传染的,因为全部是硬耦合,异常即使捕获,也无力本地恢复,因为我们明知道对象状态已污染但仍然无法替换运行时对象,异常一旦抛出线程外,则完全失控,这就是为什么传统的异常捕获很鸡肋,在Java领域经历了从checked到全面unChecked处理思路转变。

We spread computations or analyses over large clusters and call it “big data”, where the whole principle of processing them is by feeding those data sequentially—as a stream—through some CPUs.

  Akka is the name of a beautiful mountain(that Jonas climbed and skied some years age) in the Laponia region in Northern Sweden.

  The mountain is also sometimes called 'The Queen of Laponia'. Akka is also the name of a goddness in the Sami(the native Swedish population)mythology.

  She is the goddness that stands for all the beauty and good in the world. The mountain can be seen as the symbol of this goddness.

  Also,the name AKKA is the palindrome of letters A and K as in Actor Kernel and returns very few hits on Google.

  如果自己基于actor实现Stream流式数据处理是难以控制复杂性的,得考虑不要让邮箱溢出、由于原生actor不保障可靠数据传递你还得自己实现ACK使得数据接收端未收到预期消息时可以重传......最关键的是如果让你们这帮参差不齐的人各自实现,肯定会做得五花八门歪瓜裂枣,而实际上在消息流处理领域,已经有很棒的最佳实践和架构设计沉淀(参考《EIP》),简单、灵活,最重要的是标准化是可行的,这意味着抽取基础功能构建的框架将发挥最大价值这很重要,因此我们决定统一这个领域,历经七次重写推出Akka Streams API.  它的目标是提供一个直观、标准化模板化的流式处理架构,所有Akka Streams operators默认自带反压能力,所以只要你用Akka Streams,你的系统始终保持:no more OutOfMemoryErrors.

Akka流基于响应式流规范(Akka是创始倡议人)、旨在标准化、模板化流式编程。wiki称Akka是toolKit库,Akka应该是一种框架+运行时,但是以库的形式提供,这使得你可以使用springBoot以依赖的形式集成Akka.

  Akka Streams兼容 Reactive Streams and JDK 9+ java.util.concurrent.Flow。Reactive Streams则是定义如何在算子之间传输数据的通用机制,该机制在数据的生产者和发送者是异步的场景下要保障数据不丢、缓冲和其他资源不能被无端耗尽。Akka Streams API是面向最终用户的,但内部的各个处理阶段之间遵从了Reactive Streams接口,对外接口上二者完全不同:The Reactive Streams specification defines its protocol in terms of Publisher and Subscriber. These types are not meant to be user facing API, instead they serve as the low-level building blocks for different Reactive Streams implementations.  Akka Streams implements these concepts as Source, Flow(referred to asProcessorin Reactive Streams) and Sink without exposing the Reactive Streams interfaces directly. If you need to integrate with other Reactive Stream libraries, read Integrating with Reactive Streams

  stream和Akka IO很像:Working with streaming IO,IO是actor style,stream是function style.  stream将流看做一个个element数据元素的集合,所以流的数据源可以看成一个无限集合,流的特点是这些元素持续到来、那么装配在流上的操作就持续不断地执行,所以在运行时这个数据源是一个“取之不尽源源不断”的集合,这个数据源即所谓的Source

  stream以shape“外形”表达流处理算子的接口,三个典型算子:只有一个出口的Source、有进出口的Flow、只有一个入口的Sink,以所谓的“外形”区分:

三个基本的内置外形及其连接关系,之所以叫外形,是因为stream致力于将白板上绘制的流程图最方便地翻译到代码;shape外形注重这个算子对外的接口,也就是有什么输入输出port,比如说RunnableGraph的类型是ClosedShape,表示一个对外没有输入输出的“封闭”的完备图,已经全部组装好、可以直接运行了,它不能再与谁链接,被称为isLand;而FlowShape类型对外有一个输入和一个输出,所以它代表一个线性linear算子。本图中:Flow是Source的下游、是Sink的上游。

第一个例子

    val source: Source[Int, NotUsed] = Source(1 to100)  //类似List(1 to 10: _*),Int是元素类型;

    implicit val system = ActorSystem("QuickStart")//虽然很简单,但这个流也是由actor来执行的,所以需要创建actor系统;

    implicit val materializer = ActorMaterializer() //物化器,用于将流的operator算子实体化为actor并运行,默认会合并多个算子为一个actor,故而operator会有async方法,本质是划分actor,没有划分的会自动fusion即所谓算子融合

默认自动算子融合,说明还是要减少太细的、并无必要的跨线程交互,当然对耗时、阻塞的算子首选异步化。再一个问题是缓冲:Without fusing (i.e. up to version 2.0-M2) each stream operator had an implicit input buffer that holds a few elements for efficiency reasons. If your flow graphs contain cycles then these buffers may have been crucial in order to avoid deadlocks. With fusing these implicit buffers are no longer there, data elements are passed without buffering between fused operators. In those cases where buffering is needed in order to allow the stream to run at all, you will have to insert explicit buffers with the.buffer()operator—typically a buffer of size 2 is enough to allow a feedback loop to function.

  在Akka Stream的众多概念中容易混淆的是shape和operator:shape侧重逻辑上一个box operator的对外接口,是所谓外形外观;而operator其实直接对应了一个actor.  它内部可以是最基本的source组件、map计算、也可以是由多个算子组合而成的复合算子,它代表了内观;最后就是实体化概念,实体化就是物化,它就是创建actor实体运行你的流;

    source.runForeach( i ⇒ println(i) )  //令Source“跑起来”;没有Flow、Sink是隐式的;

  Source的类型参数指定了其元素的类型,Source有俩类型参数,第一个是Int,表示原始数据元素的类型,第二个NotUsed代表Materialized value物化值类型,表示一些你可能关心的中间值,其类型将在Source跑起来之后才产生(如果没有需要你关注的中间值则用NotUsed),所以第二个产出结果是你可能用得到的auxiliary备用值或prospective期待值。拎清三个概念,官网交代的不清楚:

    1、shape:外观,一个算子不管内部如何实现的,只要外观像什么就可以当做什么;

    2、operator:内观,分形组合;

    3、Materialize:物化、实体化;

第二个例子

  一个简单的echo server用来处理TCP文本消息,对每一行做简单的echo响应。使用AkkaStream处理Tcp连接的好处是stream库可以透明地给到你反压和分型组合能力,外加async可以十分方便地划分异步边界,反压和异步对于对性能、稳定性有极致要求的应用其实是蛮重要的,分型组合对于大型应用的架构梳理、组件重用很有意义

import akka.stream.scaladsl.Framing //TCP是流式数据,按行分帧

val connections: Source[IncomingConnection, Future[ServerBinding]] = Tcp().bind(host, port)

  AkkaIO的Tcp用法:IO(Tcp)、AkkaStream的用法:Tcp().bind、AkkaHTTP的用法:Http().bind。IncomingConnection是这个Source的Element元素类型,Future[ServerBinding]是你关心的物化值的类型。流是全异步独立运行,其停止只能是数据源发送停止信号或者运行当中抛出错误,物化值是唯一的在流运行起来之后能给到你流相关信息的“句柄”,比如可以手动unbind,或者在随机选择端口情况下,可以拿到监听端口。

val matValue = connections.runForeach { //matValue即物化值

  connection =>

  println(s"New connection from: ${connection.remoteAddress}")

  val echo = Flow[ByteString] .via( //单独定义一个Flow:执行分帧和响应

    Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) //以一行为标准来做分帧:Framing.delimiter会用特定ByteString作为分隔符监测流数据,在这里是换行符,它还会缓冲最大maximumFrameLength字节来查找以分隔符界定的帧(一个元素不能超过这个大小),并确保损坏的输入不会导致OOME;allowTruncation参数表示因为这是长连接、所以须要显式的一行结束的标识:indicates that we require an explicit line ending even for the last message before the connection is closed.

   .map(_.utf8String) //把每个ByteString帧转成一行字符串:the bytes of each line need to be converted to a string . This is of course a dangerous conversion, since only printable ASCII characters should be converted to a string but for our needs it is good enough.

  .map(_ + "!!!\n")  //加后缀构造响应;

  .map(ByteString(_) //ByteString:a sequence of bytes that can be sent over the wire.

)

//将IncomingConnection交给echo处理:

 connection.handleWith(echo) //IncomingConnection:bidirectional flow and not just a unidirectional one.  Tcp.IncomingConnection represents a connection that knows how to receive requests and how to send responses.

}

matValue.onComplete {  //物化值的使用:你可以用它判断port绑定情况

    case Success(b) =>

      println(s"Server started, listening on: ${b.localAddress}")

    case Failure(e) =>

      println(s"Server could not be bound to $address:$port: ${e.getMessage}")

  }

  netcat测试:

    请求:$ echo -n "Hello World" | netcat 127.0.0.1 8888

    响应:Hello World!!!

    请求:$ echo "Hello World\nHow are you?" | netcat 127.0.0.1 8888

    响应: Hello World!!!

    响应:How are you?!!!

  流程图:

Source emit IncommingConnection
用echo处理每一个conn

  对应客户端代码:

The code looks quite similar but in contrast to the server we don't have to manage the incoming connections anymore.

  在服务端我们看到,没有任何办法单独关闭一个connection,如果想实现某种关闭信号,需要使用GraphStage自定义


  用Flow去处理新的TCP连接,和用Route路由去处理HTTP请求是非常相似的:

  val adminHttpSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] = Http().bind("localhost", port)

  一个建立好的链路表达为一个incomingConnections,应用代码去pulls connections from this stream source ,之后可以使用Flow[HttpRequest, HttpResponse, _] to “translate” requests into responses. 

  第一个例子中的Source(1 to100)实际调用伴生对象工厂方法:

    def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed]

  Range的父类依次是:IndexedSeq -> Seq -> Iterable,Range和Vector一样都适合下标随机访问,比如(1 to 22)(20)得到21

  runForeach操作符接收参数就是消费者consumer function,这是个语法糖,它实际上是Source.runWith(Sink.foreach(el => ...)) 的别名,这就是一个最小流式处理也就是直连一个Source和一个Sink.

runForeach返回Future[Done]:

    val done: Future[Done] = source.runForeach(i ⇒ println(i))(materializer)

    implicit val ec = system.dispatcher

    done.onComplete(_ ⇒ system.terminate())

  凡是方法名包含“run”的,才是数据源启动/激活activation方法,而在这个启动方法调用之前的所有方法,都是“装配”方法,比如还可以对Source调用类似fold的scan:

    val factorials = source.scan(BigInt(1))((acc, next) ⇒ acc * next)

    val result: Future[IOResult] = factorials.map(num ⇒ ByteString(s"$num\n"))

      .runWith(  FileIO.toPath(Paths.get("factorials.txt"))  )

  在runWith方法调用之前的scan、map操作符,不会触发执行,流式数据处理的观点是:operator只是在描述这个流处理都要干些啥事儿,因为流所处理的数据是未来的数据、当前还没有来到,那么在未来怎么处理数据,需要“描述”(类似Future回调),这些描述是在operator上做的,所以说Source只是一个描述、我们需要装配这个Source.

  Akka Streams 库有很好的 API 层次设计。其 API 可以分成三大类

  1、高级API:FlowDSL/线性DSL;Source、Sink和Flow称为线性(linear)操作符,它们只能装配一条流处理链Chains,因为线性操作符最多只有一个输入或输出,你也可以用它们组装自定义线性操作符比如Source via Flow=Source、Flow to Sink=Sink、Flow via Flow=Flow...

 2、中级API: GraphDSL/图形DSL;Fan-in 、Fan-out和BidiFlow则有多个输入输出,所以它们是非线性的,可用于构造更复杂一点的Graph junction图结点、用于组装流处理有向无环图DAG.

  3、低级API:GraphStage图元;如果你要自定义可重用的复杂operator或Graph junction(比如说它们内部本身就是一个复杂的流式业务数据处理Graph,你希望封装这些代码或者有组件共享需求),才会用到图元。尽量直接使用1和2,也就是现成内置的、复用度高的、成熟简单的FlowDSL和GraphDSL、包括下面图示的6个operator:

中高级API有六个operator,operator可以看做为一个box黑盒,这个box有几个输入输出接口体现了它是不是线性operator、也决定了它怎么和其他operator组合,这些输入输出接口也称为shape;每个黑盒如果你打开来看,它内部可能又是由多个box组合而成的,大盒套小盒,这就是Akka Stream的分形组合能力。

  只有这6个葫芦娃满足不了你的时候,再考虑七娃GraphStage图元去自定义operator或junction,它的优势是够底层你几乎什么都能做(但正是什么都需要自己做所以重用度不够、易出错):The GraphStage abstraction can be used to create arbitrary operators with any number of input or output ports. 用GraphStage可以使用任意个输入输出灵活定义任意的算子,你定义的算子本身可以是一个复杂Graph,可以封装一系列业务操作。图元是一种GraphDSL的等价物(GraphStage继承自Graph),但是图元定义的算子是原子化的:Where GraphStage differs is that it creates an operator that is itself not divisible into smaller ones, and allows state to be maintained inside it in a safe way.  图元定义算子不可再分割且内部状态和actor一样是受保护的,原子化的含义就是说整个GraphStage会被作为一个actor物化执行,比较来看,GraphDSL使用了6个葫芦娃定义各种operator,它们当然是“离散”的、允许各自作为一个actor物化,但GraphStage整个作为一个actor物化,故称之为原子化

  所以当你开始使用图元定义自己的算子,你开始需要明确定义输入输出即 the “interface” of our operator, which is called shape in Akka Streams terminology:

1、自定义算子首先需要明确定义Inlet和Outlet:

  val in/out = Inlet/Outlet......//定义ports

2、你的算子继承GraphStage[T],T可能是FlowShape/SourceShape/SinkShape;所以还要覆盖定义:

  override val shape =new T......//使用ports定义shape,到这里就把你定义的算子外观描述好了

3、最后覆盖定义GraphStageLogic的创建方法,materializer会调用该方法物化GraphStage:

  // This is where the actual (possibly stateful) logic will live override def

  createLogic(inheritedAttributes: Attributes): GraphStageLogic = ??? //外观描述完毕、开始描述内观:在GraphStageLogic中实现你的处理逻辑(基于ports)。注意所有可变的状态都需要限制在为每次物化独立创建的GraphStageLogic中——物化会将GraphStageLogic实体化为一个actor. 多个actor之间当然不能有可变共享状态,如果它包含可变的状态,那必须是私有的。

4、在Source/Flow类型GraphStageLogic中注册OutHandler定义当下游请求数据时的行为,反压在这里体现:只有当下游请求pull数据时,才能发送push数据元素 ,所以向下游push数据只有这一个onPull回调方法给你用,你不能在没有pull请求情况下主动push数据,AkkaStream所有算子都满足这个约定,即:gearpump钟祥演讲提到的逐层反压

  setHandler(out, new OutHandler {

    override def onPull() //下游请求pull数据时回调该方法,由你决定如何向下游push数据

  当然,你还可以在GraphStage启动时就向上游请求数据:

    override def preStart(): Unit = pull(in)

5、在Sink/Flow类型GraphStageLogic中注册InHandler定义当上游push数据过来时的行为:

    setHandler(in, new InHandler {

        override def onPush()//收到通知:有新元素Push过来了,你可以使用grab(in)捕获数据

6、另外在OutHandler中还有onDownstreamFinish;在InHandler中还有onUpstreamFinish,可以覆盖定义当下游、上游终止时的行为,默认是completeStage终止本operator.

  Source、Sink和Flow都有fromGraph方法,可以将相应shape的GraphStage(Graph)直接转换为Source、Sink和Flow类型去继续做更高维度的组合。关于Flow/Graphe DSL的介绍,墙裂推荐刘颖大佬


  Flow、Graph的装配非常类似Future回调以及scala闭包,都是对现在还未发生的、未来会发生在另一个上下文中的操作的一种描述、对程序的一种装配,在调用run*方法之前的所有装配代码不会触发任何实际的资源调用,而只是捕获运行时需要的所有信息。

Akka 相关的Other frameworks and toolkits have emerged to form an ecosystem around Akka:

1、The Spray toolkit[12]is implemented using Akka and features aHTTP serveras well as related facilities, such as adomain-specific language(DSL) for creatingRESTfulAPIs

2、The Play frameworkfor developingweb applicationsoffers integration with Akka[13]

3、Up until version 1.6,Apache Sparkused Akka for communication between nodes[14]

4、The Socko Web Server library supports the implementation of REST APIs for Akka applications[15]

5、The eventsourced[16] library provides event-driven architecture(see alsodomain-driven design) support for Akka actors

6、The Gatling stress test tool for load-testing web servers is built upon Akka[17]

7、The Scalatra web framework is built upon Akka and offers integration with it[18]

8、The Vaadin web app development framework can integrate with Akka[19]

9、The Apache Flink platform for distributed stream and batch data processing is built upon Akka.[20]

10、The Lagom framework for building reactive microservices is implemented on top of akka.[21]

There are more than 250 public projects registered on GitHub which use Akka.


    Scala Standard Library  Future 相比于java Future增加了回调、flatMap支持,2011年1.1版本Akka单独对Future做过增强,两年后合并入scala语言标准库:

Akka是道 scala是术;有道无术,术尚可求也,有术无道,止于术;Akka与Scala同属lightbend公司双剑合璧取长补短,从Java8开始已然明显引领着Java的方向

  scala Future目前已经是可组合单子,它不仅属于scala VO模式重要组成。Future合并入scala标准库说明,scala语言将之作为最基础的设施,必将更广泛的在语言标准库中使用,因此它也必将成为scala作为OC面向并发语言的重要组成。

  scala Future回调要求提供隐式EC(ExecutionContext),和Java的Executor相比你连execute提交任务方法都省了,不变的是Future及其回调函数的执行相对于调用主线程是异步的、会由EC调度给其他线程去执行、其他线程执行完后会设置Future的结果值、然后触发回调,这一切都不影响调用主线程,所以scala Future几乎可以等价看做线程。EC注释:

  A custom `ExecutionContext` may be appropriate to execute code which blocks on IO or performs long-running computations. ——BuckHeading舱壁模式;

`ExecutionContext.fromExecutorService` and `ExecutionContext.fromExecutor` are good ways to create a custom `ExecutionContext`. ——从传统ES得到EC

  The intent of `ExecutionContext` is to lexically scope code execution. That is, each method, class, file, package, or application determines how to run its own code. ——EC的目标是可以让你自主决定程序各个单元如何运行它们的代码,这个单元可以是方法、类、文件、包或一个app,这个设计很棒很赞很有设计感。

  在actor当中用到了Future的地方要小心了(Ask都是按Future返回,在Akka官网FQA特别讲到这个问题),因为和actor编程范式有overlap:Future在执行上类似于本地的worker actor,做的事情差不多、需要的资源一样、用法上却有所差异,某种角度上可以说Actor是对Future的泛化和深度应用,因为用完备的Future代表线程、scala成为面向并发语言;因为用轻巧的actor代表协程、Akka可称为面向并发的框架。以下看一下Future及其在actor中的使用准则。

  Future可以看做类似Option的数据结构,一个占位符用来表达现在可能还没有、在未来某时刻才能得到的异步操作结果(这个结果也可能是一个异常),Future是一个可以在线程间传递结果值的共享对象(AtomicRefence)Future分为阻塞和非阻塞两个基本用法:

1、阻塞✘:最直观、阻塞式等待(使用包对象工具concurrent.Await 的 ready/result 方法)、不需要EC、由当前线程阻塞式执行:

    implicit val timeout = Timeout(5 seconds)

    val future = actor ? msg //ask背后实现

    val result = Await.result(future, timeout.duration).asInstanceOf[String] //阻塞式得到Future结果,如果你需要actor的应答结果且阻塞可控、业务允许的话,可以这么用;

    当actor应答消息返回时Future即可“complete”,由于消息类型是Any、Future具体类型为Future[Any],所以用asInstanceOf对Any类型的结果做处理,还可以直接Future.mapTo[String]—注意这是回调本质上不同了;我们可以直接Future{表达式}来构造Future[T],表达式的返回值类型即T:

scala Future没有死等方法,可以去直接获取value.get但这是非阻塞的

2、非阻塞✔:标准方式,注册回调:

    future.onComplete { //注册回调表达式,表达式类型为Try[T] =>U,所以可以使用模式匹配格式定义偏函数pf:

      case Success(result) => doSomethingOnSuccess(result)

      case Failure(failure) => doSomethingOnFailure(failure) //Try方便你模式匹配分别处理正常和异常

    }

    onComplete要的是一个表达式Expression,我们用偏函pf来写这个表达式,如果你用了onComplete之外的变量,pf又特么成了个闭包Closure。如果在Future之后使用了回调,那么Future内的表达式必须有返回结果,以供回调捕获处理,使用EC.fromExecutorService构建固定4条线程的线程池并测试:

import java.util.concurrent.Executors

import scala.concurrent.{ExecutionContext, Future}

import scala.util.{Try, Success, Failure}

val pool = Executors.newFixedThreadPool(4)

implicit val ec = ExecutionContext.fromExecutorService(pool)

我们看到线程池轮流调度四条线程处理八个任务;且执行Future本体的线程和执行onComplete回调的线程可能不是一条,比如2号任务本体由4号线程执行、回调则由1号线程执行;

  测试表明 onComplete 回调中的 pf 偏函有可能是在Future之外的第三条线程去做的,也可能直接由Future执行线程执行。其它方法如filter、map、foreach都是基于onComplete,但是它们与onComplete的不同在于它们只能在Future成功即返回Success情况才会被调用(onComplete始终会被调用),而且会自动“解包”Success(最后需要注意的是,如果你在idea写一些小程序测试Future,注意system.terminate()不能顺手直接写在filter、map、foreach或onComplete之后而应该写在里面,因为这些方法都是异步注册方法,你的逻辑很可能还没跑完,system就被terminate了):

直接import scala内置的concurrent.ExecutionContext.Implicits.global,这是一个ForkJoin线程池;在REPL中写一个Future的话它会直接被执行,我们看到首先是main主线程打印Future类型还是not completed;接着Future开始被ForkJoin线程执行、然后map在Future completed之后得到执行(箭头处等待了10秒);最后再查看Future已经是completed REPL中定义一个Future默认会被立即提交执行 表达式在scala中是顶级概念、除了statement语句其他皆为表达式,我们可以用偏函数的格式去写表达式,但是不能用普通表达式的格式写偏函数,一般的偏函数都是严格的模式匹配形式 多说一句,表达式在Scala是无处不在阳光普照一般的存在、闭包则是剑走偏锋但非常重要的扫地僧;表达式几乎就是万能的,除了语句你写什么都是表达式,表达式就是值、值也是表达式

  我们看到Future更像一种设计模式,有了它则库的使用接口更通用、同时用户端代码更灵活,库更有理由使用它、把是否阻塞的决定权留给用户代码,用户代码根据业务场景自主决定。Future不只是线程间共享数据结构,类似的,Option/Try貌似只是一种数据结构,实则还属于scala异常处理架构的重要组成,它们标示了scala对待异常处理的思路转变;它们还代表了Scala对VO(值对象)的思考和态度,在这方面它们与Either、Done都属于一种设计模式,哥把它命名为VO模式:支持combinators连接符、多个Value Object可以组合成vo pipeline.

  scala的很多语法糖,宜理解其背后的设计思路,如果不了解背景和思路,单看语法糖本身会觉得它只是个trick,只是有点意思但是没太大意义;VO模式代表了Scala对于运行时程序状态的深入思考,深化细化了状态的表述,VO模式支持多种combinators:map、flatMap、filter、foreach...(类似Future回调),基于这些combinators组合函数调用链pipeLine是VO模式的标准用法。

  VO包括Future以及Option、Try、Either:

1、Option:Odersky亲自设计。要么“有”Some、要么“没有”None;把正常的对没有的业务表达None与程序表达Null区分开,Some/None类型的增加方便了模式匹配判断结果、抽取结果值一气呵成; 最后是getOrElse让你从容处理业务上的没有;

2、Try:来自Twitter。要么成功、要么异常;往往与Future合并使用,代表一个计算的成功与失败,类似Either但语义不同,可以看作是Either[Throwable, T];最后是recover/recoverWith让你从容处理异常;

3、Either[A,B]:要么A要么B,详见注释;

  我们作为应用代码更多的是使用Future(除非你在开发库),要么是从库API得到Future然后使用它、要么是交给Future一段Expression让它去跑,但是什么时候跑完、Future什么时候complete咱不知道咱也不敢问、因为你干预不了。Future最终得到结果的行为叫Complete完成或FullFill填充,如果你在编写库代码、想要自主可控地控制一个Future 的 explicitly completed,scala提供有Promise,只有Promise能主动complete它,也只有Promise能给它拥有的Future赋值一次且只有一次,可以说Future是immutable的懒求值变量、是两条线程之间传递结果值的通道,这两条线程可以理解为生产者/库线程和消费者/库用户线程。

Futures or Promises?

While the Scala Future API extensively utilizes Promises in its function implementations, we don’t need to explicitly use Promises very often as the Futures API already delivers a suite of common concurrent features for writing asynchronous code.  If the business logic doesn’t need Promises, just stick to the Futures API.  But for cases in which you need to provide a “contract to be fulfilled at most once in the future”, say, between two modules like the producer/consumer example, Promises do come in handy.

  Future本质上类似单值一次性使用的消息队列,我们从中取出东西的方式有推和拉两种:poll轮询是拉就像get,回调则是推、completed后主动调用你。举个栗子:AkkaHTTP的HttpApp工具类,在启动程序主线程之后开始在端口监听HTTP连接请求,然后主线程挂起、等待控制台用户的回车信号再关闭系统,为此使用了Promise大致是这样:

  def  waitForShutdownSignal: Future[Done] = { //注意该方法返回一个Future[Done]

    val promise = Promise[Done]() //Promise仅仅用于暴露一个类型为Done的Future用于控制流程用

    sys.addShutdownHook {

      promise.trySuccess(Done) //可以执行完成的方法还有success、failure...

    }

    Future {

      blocking {

        if (StdIn.readLine("Press RETURN to stop...\n") !=null) //这里的代码令包装外围Future阻塞

          promise.trySuccess(Done) //在两种情况下会Complete:JVM关停;控制台回车;

      }

    }

  promise.future //Promise.Future不执行具体任务、仅仅暴露出去做控制信号

}

  Done是一个:More clearly signals intent  than `Unit`,使用它的时候(我们只要知道bindingFuture是一个Future):

val bindingFuture:Future[ServerBinding]  = Http().bindAndHandle(...

Await.ready(

 bindingFuture.flatMap(_ ⇒ waitForShutdownSignal(theSystem)), // chaining both futures to fail fast

 Duration.Inf) // It's waiting forever because maybe there is never a shutdown signal

  你可以像上面那样函数式串联Future(composability组合能力,flatMap),这种写法简洁明确,表现了scala的OC特性。bindingFuture需要completed即完成绑定之后开始flatMap回调、回调表达式要等待waitForShutdownSignal函数完成、也就是开始阻塞、阻塞的是一条专用回调线程不影响主线程,因为使用Await.ready等待回调线程阻塞导致当前主线程阻塞,相当于直接写代码:

  StdIn.readLine()  // let it run until user presses return

  flatMap方法参数是个表达式、该表达式返回与调用者同类型的集合或者VO(VO模式)比如Option、Future等,因为方法随后会“摆平”调用结果:

  1、第一个Future首先需要completed,然后才会调用flatMap;

  2、flatMap的参数表达式被执行即第二个Future开始执行,并在completed之后整个flatMap才算执行完,但第二个Future开始阻塞;

  3、第一个Future Failure会导致fail fast,可以测一下:

如果第一个Future失败则 flatMap不会得到调用、第二个Future不会执行、直接到最后的onComplete.  即chaining both futures to fail fast 串联Future
第二个Future失败;串联情况下为了避免陷入类似NodeJS的回调地狱、避免Future的嵌套,还提供了flatMap方法 到了大量Future串联、或者需要维护状态的程度,就应该认真考虑改成actor了,Thanks Joe ArmStrong!

  还好,串联最多也就用用flatMap“摆平”一下2层Future嵌套,flatMap方法在很多集合或VO里都有,它接收的表达式返回类型需要与所在类型相同,比如Future.flatMap表达式参数类型也得是Future、List.flatMap表达式参数类型得是List、Seq或者Option...我们可以理解flatMap是专用于“剥壳+筛选”的,因为Scala增加了VO模式,很多场合会用VO去包装结果值,为了方便直接获取结果值往往需要“剥壳”,再者对于None、Failure也需要方便方法针对这些VO的语义做过滤,None直接过滤掉、Failure不执行...所以flatMap方法也属于VO模式的组成部分。在java8中总算有点用的CompletableFuture具备这些功能。

  类似上面Await代码:只有当bindingFuture已经complete时(也就是Http绑定成功)才会去调用waitForShutdownSignal,而waitForShutdownSignal返回的Future会一直阻塞,直到用户在控制台回车表示要停掉系统,可以回调解绑定:

bindingFuture

  .flatMap(_.unbind()) //unbind返回Future[Done]、需要“摆平”,否则attempt是Future[Done]

  .onComplete(attempt ⇒ { //attempt类型是Try[Done]

    postServerShutdown(attempt, theSystem) //服务关闭钩子方法

  ...})

其他还有andThen顺序串联回调

  我们说过actor很好的解决了几个懵逼问题包括异常处理,解决手段之一是对象之间解耦,在actor世界对象就是actor,actor之间通过统一通讯底层异步通讯,这样对象之间实现了彻底的解耦,到了对象这一级,这就是最彻底、最底层的深度解耦了,这可以实现很多开脑洞的功能(allows actors to be decoupled in both time and space, supporting distribution, mobility, and location transparency.):就比如对异常的处理、一个actor失效之后可以轻松恢复它重建它,传统单体系统实际上是一个紧耦合对象图,不可能轻松做到异常恢复(With Akka, companies solve problems that were previously deemed impossible.)、传统单体应用系统在运行时是被多条线程穿插的紧耦合对象图、诺干不受控的线程万箭穿心。actor实现深度解耦之后可以地址透明、所以Akka可以letitCrash让它崩、所以分布式和自愈到了actor世界竟然成了水到渠成。另一个更早从解耦合而受益的是ESB,无论mule还是springIntegration都实现了配置式的极速系统开发,解耦是在消息处理组件上、没有到对象一级、但已经使得开发、运维方式上得到了明显的提升,比如mule基于spring(没有选择OSGi)实现了组件热加载/热替换,这都是解耦的力量,这向我们展示了:对象,能够互不依赖、互不牵扯羁绊、做到自恰是多么有意义:

传统应用中普遍的微观线程对象运行时结构,运行时的线程就像穿糖葫芦一样穿行于各个对象的方法;这种情况下只能串行无法并行,因为对象的方法是嵌套入栈的。 这就是并发,对象A和B就存在并发问题;如果一个对象被万箭穿心那就是严重并发问题。 所有actor只依赖统一的Akka通讯层,actor之间无依赖,每个actor的运行线程来自Akka基础设施的统一调度,可称为烤串模式,这不再是穿糖葫芦模式不会有万箭穿心问题,实现所有actor的全局并行。Akka统一通信层即ActorSystem,它身兼数职:DDD工厂 +Actor托管容器 +默认线程调度层 +配置中心

  我们知道异常/错误是可能要了系统的命的、可能污染一个对象的数据导致系统状态不一致、也可能干掉一条线程甚至整个进程,但这么重要的事从未得到完善解决,传统开发模式一直以来都没有有效或者足够简单的办法应对,spring为此努力过收效甚微,这说明异常处理问题只是冰山一角,它是一个涉及面更大的问题;Deep Decouple开脑洞应用:烤串模式 + VO模式结合对于异常应该尽量纳入正常的控制流思路等一系列设计思路上的转变才算完善解决了这个问题。

  一丝熟悉的味道是Spring的Ioc控制反转理论,与之类似,Ioc强调O之间不要硬依赖,运行时都依赖于Ioc对象容器;但是O之间的交互仍然需要接口,所以在编译开发时依赖于接口,没有做到actor那么彻底的解耦(只依赖于简单的消息VO),实际上只是做到了促进了面向接口编程、一定程度上发扬了解耦优势、因为面向接口本来就是想要去解耦(但不彻底),这种不彻底的解耦模型并不清晰,最重要的是对象之间在运行时仍然是方法调用栈嵌套,仍然无法在对象级别实现全局并发。因此,Spring对于运行时程序结构没有任何改善,Spring是对对象解耦合的一次尝试,但解决层次不够,避重就轻,是一次不彻底的尝试。

在Akka环境中怎么用Future

  EC可以直接使用system.dispatcher,它与scala执行环境、java线程池的大致关系:Akka MessageDispatcher = scala ExecutionContext = java Executor

  Akka中所有actor都是由MessageDispatcher调度线程去运行的,如果在actor内部调用Future,Future的行为匹配/兼容这个actor的行为 (e.g. all CPU bound and no latency requirements),那么你可以放心重用这个actor当前使用的默认dispatcher,这也是最简单的方式。我们说过实际上Future等价于本地actor,我们创建子worker actor去分担工作实际上和使用Future是一样的,好处是:Unlike a Future that is returned from an Actor, this Future is properly typed, and we also avoid the overhead of managing an Actor.   这种Future你可以放心import context.dispatcher.

  但也并非所有Future都可以这样,什么情况不可以?等同于worker actor的使用标准:当需要执行一些阻塞任务时、我们可能会用一堆worker actor去做,但是这些worker actor的线程池往往会单独配置、这就是非著名设计模式:舱壁模式,Future也一样(可以说Future就是本地actor的简化版),在actor内部使用Future的注意点和使用worker actor是一样的,此外,二者的区别在于,Future是一次性的异步任务、无需longRunnabing、也无需要保持的状态,这就是在Akka环境使用Future的大致准则。

  最后一个不得不注意的细节是:在上面例子中onComplete上注册的pf偏函,如果你在pf中使用了onComplete函数域之外的变量,那么pf就变成了一个闭包Closure.  需要注意所有闭包使用上的注意点:比如pf用到了sender则意味着、此处代码运行时是在actor之外的线程中,而那时actor很可能已经在处理后续其他任务、sender已经不一样了,解决办法是按闭包使用标准,在闭包代码中“缓存”sender:val replyTo = serder(),或者使用管道模式:

管道pipe模式

  “pipe”管道模式基本是专门用来处理闭包中的send()问题用的,交给pipeTo的actorRef不会再变。一旦Future完成(upon completion of the Future)它会直接将结果发送给指定actor,这是为转发场景量身打造的专属语法糖,要用管道模式必须importing akka.pattern.pipe. 代码演示及代码行为流程图:

    val future = target ? "some message"

    future.pipeTo( sender() )

转发UserData请求用户信息数据、转发UserActivity请求用户行为数据

可以把这个专属语法糖总结为:

  ( targetActor ? message ).pipeTo( anotherActor )

回顾一下java蒸汽时代的Executor并发框架

  我们在并发场景中谈论到Job任务的时候,默认包含两个要素:

    1、任务:开发时你写的代码,用scala语言来说就是你写的函数,java语言就是Runnable.run的内容,仅仅是代码;

    2、线程:运行时执行你这段代码的线程;

    1是我们自主可控的东西,2是在你可控范围之外的东西,你能做的只是向操作系统提交你写的代码,至于什么时候执行、执行多久,操作系统也得看CPU大爷的心情,没你什么事,大爷很忙的。

    我们很多并发系统的问题,原子性问题可见性问题死锁等等问题,全在于我们和大爷互相不理解却还是得打交道、因为我们属于软件系统领域,大爷属于硬件半导体领域,两个行业而且两个行业都很硬核和独立,充当翻译的编译器对我们也不太友好它有时候会重排序乱翻译。最早的编程就是简单的每客户每线程,比如webServer,当访问量暴增线程数就暴增(在以毫秒计的芯片世界,这种阻塞场景下,大多线程根本没事干游手好闲、每次轮到CPU时间片执行就是看一眼poll/select阻塞任务),超过一万性能不升反降,怎么办?java.util.concurrent.Executor!它包含两个要素:

    1、任务队列,缓冲;

    2、一个线程池,线程等任务,而不是任务等线程;

    来的任务先不着急调度线程执行,先在队列排队,如果没有空闲线程,那你就先等着,有效预防OOME,我们上面说了,任务其实就是一段代码,代码占内存吗?不占的,只要还没跑起来不占多少内存,如果任务堆积,那么这就叫优雅降级,响应慢一些而已嘛,但至少不会出错。

    Executor只是高层接口,它的左膀右臂是:

    1、伴生对象/静态工厂、贴心小助手Executors提供多种Executor实现:newFixedThreadPool、newCachedThreadPool(这俩实际产出的是ThreadPoolExecutor通用实现)、newSingleThreadExecutor单线程的池(创建唯一线程、池会保证这个唯一线程的存在);

    2、ExecutorService在Executor基础上扩展出shutdown、awitTermination等生命周期管理方法,实际类型是它的子类ThreadPoolExecutor比较多。这个东西为什么必须有?如果是我们自己new Thread那么我们自己持有线程引用,线程的生命周期管理当然也是我们的事,但是现在,我们只是把我们的任务提交给Executor线程池,每个任务由哪个线程执行的我们根本不知道,也无从管理,对于一个完备的类库来说,需要暴露出来线程生命周期管理方法。

    有时候,回顾一下JDK的设计,其实能感受到一些前瞻性,比如最简单的,为什么把线程分为Thread和Runnable接口?scala出现以后,我们更加觉得Runnable接口不顺眼,本来就是脱裤子放屁,函数式以后连run方法名都不必写了还要它作甚?回顾java的Executor我们看到:在Executor出来之前设计的Runnable已经有了函数式的影子,迫于无处不OO的歪风邪气,你是没办法直接把一个run方法交给Thread的、而且方法也无法保持状态(也就是一次运行之后产出一个带有中间数据的方法供下次运行),在OO的世界只有贵族也就是对象才有这个能力,方法只是对象的奴隶。所以我们不得不画蛇添足般的写一个Runnable,这是一种理解。但是,分离出来Runnable才为后续Executor框架铺平了道路,可以理解为:你写无害的Runnable、我负责棘手的Thread,你负责写代码、我负责运行代码

  蒸汽时代的Runnable为Executor提供了一种任务的良好表达方式,但是其run方法定义让你连运行结果都无处返回,除非将结果数据存入另一个共享对象——在函数式世界,这叫副作用。到了java5推出了新的Callable(参考掘金scalaCool),Callable+Future最佳搭档:

    1、Callable:支持将结果返回到主线程

    2、Future:提供了方法用于获得结果、取消任务以及检验任务是否已完成/取消

    Callable+Future这一对新搭档在时间维度上增强了java并行框架可用性,这俩接口都类似集合、要返回某种结果,所以都是泛型定义,Future和Option都提供了类似集合的API,我们知道集合即使没有元素,调用其任何API也是安全的,不会抛出NPE,而Future和Option,也都恰好是可能为空也可能有值的,像集合一样暴露安全API当然是极好的,特别是用于方法链时。

    ExecutorService中所有的submit方法都会返回Future,你可以用它取得任务结果或者取消任务。上述所有的工具,都是线程安全的,也就是说你提交任务的主线程,和执行任务的分支线程之间不会有什么瓜葛,更重要的是,它们之间可以传递结果信息而不用你去写共享对象

    对于“如果在actor内部调用的Future匹配/兼容这个actor的行为 (e.g. all CPU bound and no latency requirements),那么最简单的就是重用这个dispatcher”更多的理解:

    Akka宣称actor以及ActorSystem都可以作为基本构造快去构建并发系统,意思是actor当然不用说了,而且你还可以在一个节点机或者一个JVM起多个ActorSystem都是没有问题的,因为ActorSystem的基本结构也是ExecutorService,在一个应用系统当中使用诺干乃至大量ExecutorService都是可以的,这有个名词:SEDA,我们知道EDA是事件驱动架构,SEDA就是多阶段事件驱动架构,每个阶段都是一个ExecutorService,它们首尾相连,也就是多个线程池+工作队列结构体组合,共同处理并发流式数据——这也就类似在一个JVM当中起多个ActorSystem了,某种角度上说,也可以把ActorSystem就看做是一个ExecutorService,任务队列+线程池结构体。java当中的ExecutorService到了Akka改名叫ExecutorContext是因为在java中并发执行是一项服务,在actor模型中本身完全就是并发的线程池异步执行,是整个运行环境,如果你还要做一些异步工作可以直接求助于这个环境。

    实际上scala重新定义了独立的、顶级ExecutionContext特质,然后在下面的子特质中逐层混入java的Executor、ExecutorService来匹配java体系,对应java的Executor的是ExecutionContextExecutor子特质,对应java的ExecutorService的则是ExecutionContextExecutorService,对于javaer可以看做ExecutorContext等价于ExecutorService,下面也就不分scala的ExecutorContext和java的ExecutorService了。对于Future,它比较简单,scala加入了类集合操作,在同步等待方法上,去掉了java Future的get()方法,改为了可以指定超时时间的 Await.

    需要注意的是Future.onComplete有可能会由第三条线程C来执行,下面小测试验证。


    EC注释讲到:

    APIs such as `Future.onComplete` require you to provide a callback and an implicit `ExecutionContext`. The implicit `ExecutionContext` will be used to execute the callback. 

    意思是Future.onComplete的执行可能会涉及三个线程分别是主线程A、运行Future并植入结果的线程B、运行Future回调的线程C:

    1、主线程A负责发起Future异步调用、注册onComplete回调函数,然后撒手不管了;

    2、Future异步调用直接隐式求助于环境ExecutionContext,环境会调度另一个线程B来执行,这个B你只知道它从哪来来的,但是你看不到也控制不了;

    3、Future调用执行完毕之后,结果返回给Future,接着调用onComplete回调函数,此时可能需要第三条线程C来执行,Future.onComplete注释讲到:If the future has already been completed, this will either be applied immediately or be scheduled asynchronously.  是指onComplete回调函数有可能由线程B继续执行、也有可能由隐式传入onComplete方法的ExecutionContext调度线程C去异步地执行,“immediately ”的字面意是立即,含义是当前线程执行。

    Future用于在生产者消费者线程之间异步传递未来结果,虽然我们在开发时大多都是从库的并发调用返回Future,但是也可以这样直接传入表达式来手动构造:

手撸的话可以照此格式:Future{ Expression表达式 },concurrent包中的future方法构造的方式已deprecated;表达式‘1’虽然简单但也是由另一个线程B把结果值1置入Future的 使用自定义EC传给Future.onComplete

    实际当中onComplete反而不常用,因为Future的map、filter等都基于回调,编码可以流水线式串联一系列函数,Akka流在使用上和Future相似,都是异步、懒求值回调式的函数调用链。

    在构造Akka Streams 的 flows和graphs时,可以把它们看作是对一个流的描绘蓝图,或者一个执行计划,Stream materialization流实体化意思是在装配/描述环节我们是在写class类,只有当组合出来并且run后才会产生对象出来,也就是实体化materializer,而且类可以不止产生一个对象。在Akka Streams里实体化一般来说是启动一个Actors,它会执行这个流,但也可以是打开文件或socket连接等。

    三个可重用构造块:Source、Sink、Flow. 它们连接起来组成一个流图RunnableGraph/RunnableFlow,因为三个东西都是可重用的,所以它们的组合会产生一个全新的RunnableFlow,这是一个代表这个流的全新对象,是上面提到的“蓝图”/“描述”/“执行计划”的实体化对象。流的实体化完成标志就是得到一个RunnableFlow.

a Flow connected with both a Source and a Sink results in a RunnableFlow

    了解ESB的同学,一看就懂,source/sink也就是endpoint,代码示例直观易懂,参考原文。下图示点击事件流:

以250ms为判断时间分组、归纳出来双击和 连击

完成这个流处理的代码so easy:

val multiClickStream = clickStream.throttle(250.millis)

    .map(clickEvents => clickEvents.length).filter(numberOfClicks => numberOfClicks >= 2)

再精简一点:

  val multiClickStream = clickStream.throttle(250.millis).map(_.length).filter(_ >= 2)

Papers that have influenced the design of Akka

相关文章

  • Akka Stream和Scala Future

    单体不是单机、单机不一定是单体,单体指铁板一块、无弹性、无恢复能力、一损俱损的系统,对于基于RPC分布式结构远程...

  • Paypal工程博客

    草原 1月30日 20:30来自iPhone客户端 PayPal 尝试用 akka stream 和 akka a...

  • Akka系列(五):Java和Scala中的Future

    随着CPU的核数的增加,异步编程模型在并发领域中的得到了越来越多的应用,由于Scala是一门函数式语言,天然的支持...

  • 006 Rust 异步编程,Stream 介绍

    Stream 介绍 Stream和Future类似,但是Future对应的是一个item的状态的变化,而Strea...

  • Akka Stream 和 HTTP

    墙裂推荐社区雷锋刘颖大佬的streams系列文章[https://pragmaticscala.com/post...

  • squbs-17. 持久化缓冲区(PersistentBuffe

    PersistentBuffer 是一系列 Akka Streams 流组建的第一个。它像 Akka Stream...

  • 异步 Stream

    flutter 异步方式 Future Async/await Stream Stream stream 是一个事...

  • Akka框架简介

    Akka是JAVA虚拟机平台上构建高并发、分布式和容错应用的工具包和运行时。Akka用Scala语言编写,同时提供...

  • 分布式应用框架Akka详解

    1.什么是Akka Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Sc...

  • flutter笔记之stream学习

    stream类似future,只不过future是接收一次的异步返回结果,stream可以接收多次。主要是为了处理...

网友评论

      本文标题:Akka Stream和Scala Future

      本文链接:https://www.haomeiwen.com/subject/llkyeftx.html