这个示例来自Lightbend的 Lightbend Project Starter里的Cluster Sharding Scala。
是一个简单的分片集群用做记录温度的例子:
Device是Entity/Actor在每个分片上运行多个,用来记录温度并计算平均值。
Device接收消息时默认(初始)情况下执行 counting(Nil)创建一个空的List。
通过模式匹配RecordTemperature(id,temp)类型消息
消息在Device的伴生对象中定义,可以在外部调用Device.RecordTemperature()
将温度存入List 并改变自己的下一步行为 become(counting(temperature))
这样每当该actor收到消息就会递归地增加List
become()
和unbecome()
方法是将运算行为压入/弹出行为栈。第一次become(counting(temperature))是将在栈顶端的初始receive方法counting(Nil)换成counting(temperature)
因为counting()是PartialFunction偏函数,偏函数也是对象,也有它的状态。temperatures可以保存起来。
import akka.actor._
object Device {
case class RecordTemperature(deviceId: Int, temperature: Double)
}
class Device extends Actor with ActorLogging {
import Device._
override def receive = counting(Nil)
def counting(values: List[Double]): Receive = {
case RecordTemperature(id, temp) =>
val temperatures = temp :: values
log.info(s"Recording temperature $temp for device $id,
average is ${temperatures.sum / temperatures.size} after ${temperatures.size} readings");
context.become(counting(temperatures))
}
}
Devices
object Devices {
// Update a random device
case object UpdateDevice
}
class Devices extends Actor with ActorLogging {
import Devices._
private val extractEntityId: ShardRegion.ExtractEntityId = {
case msg @ Device.RecordTemperature(id, _) => (id.toString, msg)
}
private val numberOfShards = 100
private val extractShardId: ShardRegion.ExtractShardId = {
case Device.RecordTemperature(id, _) => (id % numberOfShards).toString
// Needed if you want to use 'remember entities':
//case ShardRegion.StartEntity(id) => (id.toLong % numberOfShards).toString
}
val deviceRegion: ActorRef = ClusterSharding(context.system).start(
typeName = "Device",
entityProps = Props[Device],
settings = ClusterShardingSettings(context.system),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
val random = new Random()
val numberOfDevices = 50
implicit val ec: ExecutionContext = context.dispatcher
context.system.scheduler.schedule(10.seconds, 1.second, self, UpdateDevice)
def receive = {
case UpdateDevice =>
val deviceId = random.nextInt(numberOfDevices)
val temperature = 5 + 30 * random.nextDouble()
val msg = Device.RecordTemperature(deviceId, temperature)
log.info(s"Sending $msg");
deviceRegion ! msg
}
}
在idea的scala插件中导入:
使用sbt构建完成后运行sample.sharding.ShardingApp
网友评论