美文网首页
akka编程demo

akka编程demo

作者: 烂泥_119c | 来源:发表于2020-02-12 22:43 被阅读0次

    AKKA

    akka基于actor模型, 是一个用于构建可扩展的弹性的快速响应的应用程序的平台;
    actor模型:是一个并行计算模型。 它把actor作为基本元素来对待:未响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor或者发送更多的消息

    image.png

    概念介绍

    Actor:

    actor是akka中最核心的概念,它是一个封装了状态和行为的对象,actor之间可以通过交换消息的方式进行通信,每个actor都有自己的收件箱,通过actor能够简化锁及线程管理,actor具有如下特性:

    • 提供了一种高级抽象,能够简化在并发/并行应用场景下的编程开发
    • 提供了异步非阻塞、高性能的事件驱动编程模型
    • 超轻量级事件处理(每GB堆内存几百万actor)

    类介绍

    ActorSystem

    在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以实际应用中,actorSystem一般是单例对象,我们通过ActorSystem创建很多actor,负责创建和监督actor

    Actor

    Actor负责通信,它包含一些重要的生命周期方法:

    • preStart(): 在Actor对象构造方法执行后执行
    • receive(): 在actor的preStart方法执行完成后执行,用于接收消息,会被反复执行

    Demo

    使用akka做一个简易的通信模型,实现一个主从结构通信

    Master

    主对象类,即注册中心,统计当前在线的worker数目

    package akkaDemo
    
    import akka.actor._
    import com.typesafe.config.ConfigFactory
    import org.apache.commons.cli.{GnuParser, Options}
    
    import scala.collection.mutable
    import scala.collection.mutable.ListBuffer
    import scala.concurrent.duration._
    /**
      * @author phil.zhang
      */
    class Master extends Actor {
    
      // workMap
      private val workerMap = new mutable.HashMap[Int, WorkerInfo]()
    
      private val workerList = new ListBuffer[WorkerInfo]()
    
      override def preStart(): Unit = {
        println("master 已经启动")
    
        import context.dispatcher
        // 循环检查心跳
        context.system.scheduler.schedule(0 millis, 10 seconds, self, Check)
      }
    
      override def receive: Receive = {
        // 接受注册信息并统计
        case RegisterMessage(workId, memory, cores) => {
          val info = new WorkerInfo(workId, memory, cores)
          info.lastHeartBeatTime = System.currentTimeMillis()
          workerMap.put(workId, info)
          workerList += info
          val size = workerList.size
          println(info)
          println(s"worker$workId 注册成功,当前worker共:$size")
          sender ! RegisterdMessage("注册成功")
        }
          // 检查心跳
        case Check => {
          val now = System.currentTimeMillis()
          val outTimeList = workerList.filter(worker => now - worker.lastHeartBeatTime > 5000)
          outTimeList.foreach(workerInfo => {
            workerList -= workerInfo
            workerMap.remove(workerInfo.workerId)
            println("移除" + workerInfo.workerId)
          })
        }
          // 接受心跳后更新心跳时间
        case SendHeartBeat(workId) => {
          if (workerMap.contains(workId)) {
            val workerInfo = workerMap(workId)
            workerInfo.lastHeartBeatTime=System.currentTimeMillis()
          }
        }
      }
    }
    
    object Master {
    
      def main(args: Array[String]): Unit = {
        val options = new Options()
        options.addOption("h", true, "host")
        options.addOption("p", true, "port")
    
        val parser = new GnuParser()
        val line = parser.parse(options, args)
    
        val host = line.getOptionValue("h")
        val port = line.getOptionValue("p").toInt
    
        val configStr=
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
          """.stripMargin
    
        val config=ConfigFactory.parseString(configStr)
        val actorSystem = ActorSystem("actorSystem", config)
        val master = actorSystem.actorOf(Props(new Master), "master")
      }
    
    }
    
    

    Worker

    工作对象类, 向主类注册,并保持心跳

    package akkaDemo
    
    import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    import org.apache.commons.cli.{GnuParser, Options}
    
    import scala.concurrent.duration._
    
    /**
      * @author phil.zhang
      * @date 2020/2/6
      */
    class Worker(val memory:Int,val cores:Int,val masterHost:String,val masterPort:String) extends Actor{
    
      var master:ActorSelection = _
    
      override def preStart(): Unit = {
    
        // actorSystem 是master的ActorSystem的名字, master是masterActor的名字
        master=context.actorSelection(s"akka.tcp://actorSystem@$masterHost:$masterPort/user/master")
        // 向主类注册
        master ! RegisterMessage(1, memory, cores)
        println("worker注册")
      }
    
      override def receive: Receive = {
        // 主类注册返回信息
        case RegisterdMessage(message) => {
          println("worker" + message)
    
          import context.dispatcher
          // 循环发起心跳
          context.system.scheduler.schedule(0 millis, 2 seconds,self, HeartBeat)
        }
          // 发送心跳
        case HeartBeat => {
          master ! SendHeartBeat(1)
        }
      }
    }
    
    object Worker {
    
      def main(args: Array[String]): Unit = {
        val options = new Options()
        options.addOption("mh",true, "master host")
        options.addOption("mp",true, "master port")
        options.addOption("h",true, "host")
        options.addOption("p",true, "host")
        options.addOption("m",true, "memory")
        options.addOption("c",true, "cores")
    
        val parser = new GnuParser()
        val line = parser.parse(options, args)
    
        val m_host = line.getOptionValue("mh")
        val m_port = line.getOptionValue("mp")
        val host = line.getOptionValue("h")
        val port = line.getOptionValue("p")
        val memory = line.getOptionValue("m").toInt
        val cores = line.getOptionValue("c").toInt
    
        val configStr=
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
          """.stripMargin
    
        val config=ConfigFactory.parseString(configStr)
    
        val actorSystem = ActorSystem("workerActorSystem", config)
    
        val worker = actorSystem.actorOf(Props(new Worker(memory,cores,m_host,m_port)),"worker")
    
      }
    }
    
    

    WorkerInfo

    工作对象信息类, 用于描述工作对象

    package akkaDemo
    
    /**
      * @author phil.zhang
      * @date 2020/2/6
      */
    class WorkerInfo(val workerId:Int,val memory: Int,val cores:Int) {
    
      // 用于记录上次心跳时间
      var lastHeartBeatTime:Long = _
      
      override def toString = s"$workerId,$memory,$cores"
    }
    
    

    Message

    定义了一些信息类型

    package akkaDemo
    
    /**
      * @author phil.zhang
      * @date 2020/2/6
      */
    trait Message extends Serializable{
    
    }
    
    // slave发给master的心跳信息
    case class SendHeartBeat(workId: Int) extends Message
    
    // slave发给master的注册信息
    case class RegisterMessage(workId: Int, memory: Int, cores: Int) extends Message
    
    // master发给slave的注册反馈信息
    case class RegisterdMessage(message: String) extends Message
    
    // master发给自己的检查信息, 所以不需要序列化
    case object Check
    
    // slave发给自己的心跳信息,所以不需要序列化
    case object HeartBeat
    
    

    主要maven依赖

        <dependency>
          <groupId>commons-cli</groupId>
          <artifactId>commons-cli</artifactId>
          <version>1.2</version>
        </dependency>
    
        <dependency>
          <groupId>com.typesafe.akka</groupId>
          <artifactId>akka-actor_2.11</artifactId>
          <version>2.5.3</version>
        </dependency>
        <dependency>
          <groupId>com.typesafe.akka</groupId>
          <artifactId>akka-remote_2.11</artifactId>
          <version>2.5.3</version>
        </dependency>
    

    相关文章

      网友评论

          本文标题:akka编程demo

          本文链接:https://www.haomeiwen.com/subject/dishfhtx.html