AKKA
image.pngakka基于actor模型, 是一个用于构建可扩展的弹性的快速响应的应用程序的平台;
actor模型:是一个并行计算模型。 它把actor作为基本元素来对待:未响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor或者发送更多的消息
概念介绍
Actor:
actor是akka中最核心的概念,它是一个封装了状态和行为的对象,actor之间可以通过交换消息的方式进行通信,每个actor都有自己的收件箱,通过actor能够简化锁及线程管理,actor具有如下特性:
- 提供了一种高级抽象,能够简化在并发/并行应用场景下的编程开发
- 提供了异步非阻塞、高性能的事件驱动编程模型
- 超轻量级事件处理(每GB堆内存几百万actor)
类介绍
ActorSystem
在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以实际应用中,actorSystem一般是单例对象,我们通过ActorSystem创建很多actor,负责创建和监督actor
Actor
Actor负责通信,它包含一些重要的生命周期方法:
- preStart(): 在Actor对象构造方法执行后执行
- receive(): 在actor的preStart方法执行完成后执行,用于接收消息,会被反复执行
Demo
使用akka做一个简易的通信模型,实现一个主从结构通信
Master
主对象类,即注册中心,统计当前在线的worker数目
package akkaDemo
import akka.actor._
import com.typesafe.config.ConfigFactory
import org.apache.commons.cli.{GnuParser, Options}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._
/**
* @author phil.zhang
*/
class Master extends Actor {
// workMap
private val workerMap = new mutable.HashMap[Int, WorkerInfo]()
private val workerList = new ListBuffer[WorkerInfo]()
override def preStart(): Unit = {
println("master 已经启动")
import context.dispatcher
// 循环检查心跳
context.system.scheduler.schedule(0 millis, 10 seconds, self, Check)
}
override def receive: Receive = {
// 接受注册信息并统计
case RegisterMessage(workId, memory, cores) => {
val info = new WorkerInfo(workId, memory, cores)
info.lastHeartBeatTime = System.currentTimeMillis()
workerMap.put(workId, info)
workerList += info
val size = workerList.size
println(info)
println(s"worker$workId 注册成功,当前worker共:$size")
sender ! RegisterdMessage("注册成功")
}
// 检查心跳
case Check => {
val now = System.currentTimeMillis()
val outTimeList = workerList.filter(worker => now - worker.lastHeartBeatTime > 5000)
outTimeList.foreach(workerInfo => {
workerList -= workerInfo
workerMap.remove(workerInfo.workerId)
println("移除" + workerInfo.workerId)
})
}
// 接受心跳后更新心跳时间
case SendHeartBeat(workId) => {
if (workerMap.contains(workId)) {
val workerInfo = workerMap(workId)
workerInfo.lastHeartBeatTime=System.currentTimeMillis()
}
}
}
}
object Master {
def main(args: Array[String]): Unit = {
val options = new Options()
options.addOption("h", true, "host")
options.addOption("p", true, "port")
val parser = new GnuParser()
val line = parser.parse(options, args)
val host = line.getOptionValue("h")
val port = line.getOptionValue("p").toInt
val configStr=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config=ConfigFactory.parseString(configStr)
val actorSystem = ActorSystem("actorSystem", config)
val master = actorSystem.actorOf(Props(new Master), "master")
}
}
Worker
工作对象类, 向主类注册,并保持心跳
package akkaDemo
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import org.apache.commons.cli.{GnuParser, Options}
import scala.concurrent.duration._
/**
* @author phil.zhang
* @date 2020/2/6
*/
class Worker(val memory:Int,val cores:Int,val masterHost:String,val masterPort:String) extends Actor{
var master:ActorSelection = _
override def preStart(): Unit = {
// actorSystem 是master的ActorSystem的名字, master是masterActor的名字
master=context.actorSelection(s"akka.tcp://actorSystem@$masterHost:$masterPort/user/master")
// 向主类注册
master ! RegisterMessage(1, memory, cores)
println("worker注册")
}
override def receive: Receive = {
// 主类注册返回信息
case RegisterdMessage(message) => {
println("worker" + message)
import context.dispatcher
// 循环发起心跳
context.system.scheduler.schedule(0 millis, 2 seconds,self, HeartBeat)
}
// 发送心跳
case HeartBeat => {
master ! SendHeartBeat(1)
}
}
}
object Worker {
def main(args: Array[String]): Unit = {
val options = new Options()
options.addOption("mh",true, "master host")
options.addOption("mp",true, "master port")
options.addOption("h",true, "host")
options.addOption("p",true, "host")
options.addOption("m",true, "memory")
options.addOption("c",true, "cores")
val parser = new GnuParser()
val line = parser.parse(options, args)
val m_host = line.getOptionValue("mh")
val m_port = line.getOptionValue("mp")
val host = line.getOptionValue("h")
val port = line.getOptionValue("p")
val memory = line.getOptionValue("m").toInt
val cores = line.getOptionValue("c").toInt
val configStr=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config=ConfigFactory.parseString(configStr)
val actorSystem = ActorSystem("workerActorSystem", config)
val worker = actorSystem.actorOf(Props(new Worker(memory,cores,m_host,m_port)),"worker")
}
}
WorkerInfo
工作对象信息类, 用于描述工作对象
package akkaDemo
/**
* @author phil.zhang
* @date 2020/2/6
*/
class WorkerInfo(val workerId:Int,val memory: Int,val cores:Int) {
// 用于记录上次心跳时间
var lastHeartBeatTime:Long = _
override def toString = s"$workerId,$memory,$cores"
}
Message
定义了一些信息类型
package akkaDemo
/**
* @author phil.zhang
* @date 2020/2/6
*/
trait Message extends Serializable{
}
// slave发给master的心跳信息
case class SendHeartBeat(workId: Int) extends Message
// slave发给master的注册信息
case class RegisterMessage(workId: Int, memory: Int, cores: Int) extends Message
// master发给slave的注册反馈信息
case class RegisterdMessage(message: String) extends Message
// master发给自己的检查信息, 所以不需要序列化
case object Check
// slave发给自己的心跳信息,所以不需要序列化
case object HeartBeat
主要maven依赖
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.5.3</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.11</artifactId>
<version>2.5.3</version>
</dependency>
网友评论