美文网首页
akka-stream与actor系统集成

akka-stream与actor系统集成

作者: hangscer | 来源:发表于2017-09-24 15:50 被阅读0次

    一共有四个api:

    • Source.actorRef,返回actorRef,该actorRef接收到的消息,将被下游消费者所消费。
    • Sink.actorRef,接收actorRef,做为数据流下游消费节点。
    • Source.actorPublisher,返回actorRef,使用于reactive stream的Publisher。
    • Sink.actorSubscriber,使用于reactive stream的Subscriber。

    Source.actorRef

      val stringSourceinFuture=Source.actorRef[String](100,OverflowStrategy.fail) // 缓存最大为100,超出的话,将以失败告终
      val hahaStrSource=stringSourceinFuture.filter(str=>str.startsWith("haha")) //source数据流中把不是以"haha"开头的字符串过滤掉
      val actor=hahaStrSource.to(Sink.foreach(println)).run()
      actor!"asdsadasd"
      actor!"hahaasd"
      actor!Success("ok")// 数据流成功完成并关闭
    

    "how to create a Source that can receive elements later via a method call?"在akka-http中经常遇见Source[T,N]的地方就是对文件上传和下载的功能的编码(文件IO)中,完成file=>Source[ByteString,_]的转化,或者Source(List(1,2,3,4,5))这种hello-world级别的玩具代码中,这些代码中在定义Source时,已经确定流中数据是什么了。那么如何先定义流,而后给流传递数据呢?答案就是Source.actorRef。说明:Source.actorRef没有背压策略(背压简单说就是生产者的生成速率大于消费者处理速率,导致数据积压)。

    Sink.actorRef

    class MyActor extends Actor{
      override def receive: Receive = {
        case "FIN"=>
          println("完成了哇!!!")
          context.stop(self)
        case str:String =>
          println("msgStr:"+str)
      }
    }
    ......
      val actor=system.actorOf(Props[MyActor],"myActor")
      val sendToActor=Sink.actorRef(actor,onCompleteMessage = "FIN")
      val hahaStringSource=Source.actorRef[String](100,OverflowStrategy.dropHead).filter(str=>str.startsWith("haha"))
      val actorReceive=hahaStringSource.to(sendToActor).run()
      actorReceive!"hahasdsadsa1"
      actorReceive!"hahasdsadsa2"
      actorReceive!"hahasdsadsa3"
      actorReceive!"hahasdsadsa4"
      actorReceive!Success("ok")
    //output
    msgStr:hahasdsadsa1
    msgStr:hahasdsadsa2
    msgStr:hahasdsadsa3
    msgStr:hahasdsadsa4
    完成了哇!!!
    

    Sink作为数据流终端消费节点,常见用法比如Sink.foreach[T](t:T=>Unit)Sink.fold[U,T](z:U)((u:U,t:T)=>U)等等。Sink.actorRef用于指定某个actorRef实例,把本该数据流终端处理的数据全部发送给这个actorRef实例去处理。解释上述程序,Sink,actorRef需要说明哪一个actorRef来接收消息,并且在数据流上游完成时,这个actorRef会接收到什么样的消息作为完成的信号。我们可以看到onCompleteMessage这条消息并没有受到str=>str.startsWith("haha")这过滤条件的作用(同样的,Sink.actorRef没有处理背压功能,数据挤压过多只能按某些策略舍弃,或者直接失败)。

    背压处理

    以上Source.actorRefSink.actorRef均不支持背压策略。我们可以借助Source.actorPublisher或者Sink.actorPublisher在数据流的上游或者下游处理背压问题,但是需要去继承ActorPublisher[T]ActorSubscriber实现了处理逻辑。

    Source.actorPublisher

    在数据流上游处自己手动实现背压处理逻辑:

    case object JobAccepted
    case object JobDenied
    case class Job(msg:String)
    ...
    class MyPublisherActor extends ActorPublisher[Job]{
      import akka.stream.actor.ActorPublisherMessage._
      val MAXSize=10
      var buf=Vector.empty[Job]
      override def receive: Receive = {
        case job:Job if buf.size==MAXSize =>
          sender()!JobDenied //超出缓存 拒绝处理
        case job:Job =>
          sender()!JobAccepted //确认处理该任务
          buf.isEmpty&&totalDemand>0 match {
            case true =>
              onNext(job)
            case false=>
              buf:+=job //先向缓存中存放job
              deliverBuf() //当下游存在需求时,再去从缓存中消费job
          }
        case req@Request(n)=>
          deliverBuf()
        case Cancel=>
          context.stop(self)
      }
    
      def deliverBuf():Unit= totalDemand>0 match {
        case true =>
          totalDemand<=Int.MaxValue match {
            case true =>
              val (use,keep)=buf.splitAt(totalDemand.toInt) //相当于(buf.take(n),buf.drop(n))
              buf=keep
              use.foreach(onNext(_)) //把buf一份两半,前一半发送给下游节点消费,后一半保留
            case false=>
              buf.take(Int.MaxValue).foreach(onNext(_))
              buf=buf.drop(Int.MaxValue)
              deliverBuf() //递归
          }
        case false=>
      }
    }
    ...
    val jobSource=Source.actorPublisher[Job](Props[MyPublisherActor])
    val jobSourceActor=jobSource.via(Flow[Job].map(job=>Job(job.msg*2))).to(Sink.foreach(println)).run()
    jobSourceActor!Job("ha")
    jobSourceActor!Job("he")
    

    actorPublisher的函数签名def actorPublisher[T](props: Props): Source[T, ActorRef]。上述代码中totalDemand是由下游消费节点确定。onNext(e)方法在ActorPublisher中定义,作用是将数据传输给下游节点。当然还有onComplete()onError(ex)函数,也是用于通知下游节点作出相应处理。

    Sink.actorSubscriber

    case class Reply(id:Int)
    ...
    class Worker extends Actor{
      override def receive: Receive = {
        case (id:Int,job:Job)=>
          println("finish job:"+job)
          sender()!Reply(id)
      }
    }
    ...
    class CenterSubscriber extends ActorSubscriber{
      val router={ //路由组
        val routees=Vector.fill(3){ActorRefRoutee(context.actorOf(Props[Worker]))}
        Router(RoundRobinRoutingLogic(),routees)
      }
      var buf=Map.empty[Int,Job]
      override def requestStrategy: RequestStrategy = WatermarkRequestStrategy.apply(100)
      import akka.stream.actor.ActorSubscriberMessage._
      override def receive: Receive = {
        case OnNext(job:Job)=>
          val temp=(Random).nextInt(10000)->job
          buf+=temp //记录并下发任务
          router.route(temp,self)
        case OnError(ex)=>
          println("上游发生错误了::"+ex.getMessage)
        case OnComplete=>
          println("该数据流完成使命..")
        case Reply(id)=>
          buf-=id//当处理完成时,删去记录
      }
    }
    ...
    val actor=Source.actorPublisher[Job](Props[MyPublisherActor]).to(Sink.actorSubscriber[Job](Props[CenterSubscriber])).run()
    actor!Job("job1")
    actor!Job("job2")
    actor!Job("job3")
    

    ActorSubscriber可以接收如下几种消息类型:OnNext上游来的新消息、OnComplete上游已经结束数据流、OnError上游发生错误以及其他普通类型的消息。继承ActorSubscriber的子类都需要覆写requestStrategy以此来提供请求策略去控制数据流的背压(围绕requestDemand展开,何时向上游请求数据,一次请求多少数据等等问题)。

    相关文章

      网友评论

          本文标题:akka-stream与actor系统集成

          本文链接:https://www.haomeiwen.com/subject/eqwkextx.html