1.在application配置文件中增加相应的配置信息
akka {
actor {
provider = "cluster"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
# auto downing is NOT safe for production deployments.
# you may want to use it during development, read more about it in the docs.
#
# auto-down-unreachable-after = 10s
}
}
# Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
# Sigar native library extract location during tests.
# Note: use per-jvm-instance folder when running multiple jvm on one host.
akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native
seed-nodes:用于群集的初始,自动,连接。
ClusterSystem: Akka cluster的system名称。
如果一个Actor想要加入到一个Cluster集群,具体代码如下:
class SimpleClusterListener extends Actor with ActorLogging {
val cluster = Cluster(context.system)
// subscribe to cluster changes, re-subscribe when restart
override def preStart(): Unit = {
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
}
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case MemberUp(member) =>
log.info("Member is Up: {}", member.address)
case UnreachableMember(member) =>
log.info("Member detected as unreachable: {}", member)
case MemberRemoved(member, previousStatus) =>
log.info("Member is Removed: {} after {}", member.address, previousStatus)
case _: MemberEvent => // ignore
}
}
Actor将自己注册为某些集群事件的订阅者。 他根据当前集群的状态接受相关事件,当actor订阅之后,就能接受集群发生改变后发出的事件,
如果Cluster是部署在云系统运行,比如K8S,AWS等或者其他DNS方法发现节点的云上系统,你需要使用Akka Cluster Bootstrap模块来实现自动加入集群。
加入配置种子节点
你可以决定采用否应手动或自动方式,去配置集群初始接触点,这些初始的接触点即所谓的种子节点。在加入完成之后,种子节点也不是特殊的节点,他们和机器中的其他节点以一样的方式参与到集群中。当一个new节点启动后,他向所有的种子节点发送消息,然后给第一个给他反馈信息的种子节点发送 join命令。如果没有任何一个种子节点回复(可能尚未启动),则重试此过程直到成功或关闭。
下面是定义种子节点在conf文件中:
akka.cluster.seed-nodes = [
"akka.tcp://ClusterSystem@host1:2552",
"akka.tcp://ClusterSystem@host2:2552"]
也可以在启动jvm的时候,通过启动参数来配置种子节点
-Dakka.cluster.seed-nodes.0=akka.tcp://ClusterSystem@host1:2552
-Dakka.cluster.seed-nodes.1=akka.tcp://ClusterSystem@host2:2552
种子节点可以以任何顺序启动,并且不必运行所有种子节点,但是在最初启动集群时必须启动配置为种子节点配置列表中的第一个元素的节点,否则其他种子 - 节点不会初始化,其他节点也无法加入群集。对第一个种子节点的特殊要求,主要是考虑防止启动一个空cluster的时候,出现孤岛现象,这里也可以配置seed-node-timeout参数来控制节点加入的集群的超时时间。
一旦启动了两个以上的种子节点,关闭第一个种子节点就没有问题。 如果重新启动第一个种子节点,它将首先尝试加入现有群集中的其他种子节点。 请注意,如果同时停止所有种子节点并使用相同的种子节点配置重新启动它们,则它们将自行连接并形成新群集,而不是加入现有群集的其余节点。 这可能是不希望的,应该通过将多个节点列为冗余的种子节点来避免,并且不要同时停止所有的种子节点。
自动配置种子节点
您可能希望使用云提供程序或集群协调程序或某种其他形式的服务发现(例如托管DNS)自动发现种子节点,而不是手动配置在开发或静态分配的节点IP中的种子节点。 开源Akka Management库包含Cluster Bootstrap模块,它可以处理这个问题。 有关更多详细信息,请参阅其文档。
以编程方式连接到种子节点:joinSeedNodes
用户可以使用Cluster(system).joinSeedNodes的方式加入指定的种子节点代表的集群中,joinSeedNodes方法的参数便是需要加入的到的集群的种子节点信息。通常你不应该把本节点加入到joinSeedNodes中的列表,除非本节点是会本当成第一个种子节点使用。如果是当成了第一个种子节点,那么必须把该节点放入到joinSeedNodes第一个参数中。下面是具体代码
val cluster = Cluster(system)
val list: List[Address] = ??? //your method to dynamically get seed nodes
cluster.joinSeedNodes(list)
seed-node-timeout:等待其中一共种子节点回复初始请求加入的等待时间, 当这是第一个种子节点并且在该超时内没有来自其他种子节点的肯定回复时,它将自己加入以引导群集。 如果这不是第一个种子节点,则将使用此间隔执行连接尝试。
retry-unsuccessful-join-after =10s:当一个join请求失败,那么它会在10s之后再次尝试,如果配置为off表示禁用join请求失败重试。
默认情况下,无限期地重试给定种子节点的连接,直到成功连接。 如果通过配置超时失败,则可以中止该过程。 当中止时,它将运行Coordinated Shutdown,默认情况下将终止ActorSystem。 CoordinatedShutdown也可以配置为退出JVM。 如果动态组装种子节点并且在尝试失败后尝试重新启动种子节点,则定义此超时很有用。
akka.cluster.shutdown-after-unsuccessful-join-seed-nodes = 20s
akka.coordinated-shutdown.terminate-actor-system = on
您可以加入群集中的任何节点。 它不必配置为种子节点。 请注意,您只能加入现有的集群成员,这意味着对于引导,某些节点必须自己加入,然后以下节点可以将它们连接起来构成集群。
一个actor system只能加入一次集群。 其他尝试将被忽略。 成功加入后,必须重新启动才能加入另一个集群或再次加入同一集群。 它可以在重新启动后使用相同的主机名和端口,当它作为集群中现有成员的新版本出现时,尝试加入,然后现有的将从集群中删除,然后它将被允许加入。
注意:对于集群的所有成员,ActorSystem的名称必须相同。 启动ActorSystem时会给出名称。
节点down
当故障检测器认为某个成员无法访问时,不允许领导者履行其职责,例如将新加入成员的状态更改为“Up”。必须首先该节点变成再次可访问状态,或者必须将无法访问的成员的状态更改为“Down”。 将状态更改为“Down”可以自动或手动执行。 默认情况下,必须使用JMX或HTTP手动完成。
或者可以通过代码方式关闭节点:Cluster(system).down(address).
自动down(不推荐使用)
自动下线是通过配置akka.cluster.auto-down-unreachable-after = 120s参数实现自动下线。自动下线会在导致脑裂现象。比如网络异常,集群超负荷,gc时间过长
节点leaving
有两种方法可以从群集中删除成员。
1.可以停止actor系统(或JVM进程)。 如上所述,在自动或手动down之后,它将被检测为不可到达并被移除。
2.更优雅的方式是告诉群集,该节点应该离开,这可以使用JMX或HTTP执行。 它也可以通过以下方式以编程方式执行:
val cluster = Cluster(system)
cluster.leave(cluster.selfAddress)
节点订阅集群事件
您可以使用Cluster(system).subscribe订阅群成员状态更改通知。
cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])
一个完整的state的状态列表:akka.cluster.ClusterEvent.CurrentClusterState是发给订阅者第一个消息,后面会逐步更新事件的状态。
让我们看一个示例,该示例说明了名为backend的worker如何检测并注册到新的主节点,这里名为frontend。
示例应用程序提供了转换文本的服务。 当某些文本发送到其中一个前端服务时,它将被委派给执行转换作业的后端工作程序之一,并将结果发送回原始客户端。 可以动态地向集群添加或删除新的后端节点以及新的前端节点。
下面是backend work的转换job
class TransformationBackend extends Actor {
val cluster = Cluster(context.system)
//订阅了集群中成员的up事件,如果actor重启后,需要重新订阅
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case TransformationJob(text) => sender() ! TransformationResult(text.toUpperCase)
case state: CurrentClusterState =>
state.members.filter(_.status == MemberStatus.Up).foreach(register)
case MemberUp(m) => register(m)
}
def register(member: Member): Unit =
if (member.hasRole("frontend"))
context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
BackendRegistration
}
请注意,TransformationBackend actor订阅了集群事件可以检测新的,潜在的前端节点加入到cluster中,并向它们发送注册消息,以便他们知道他们可以使用后端工作程序。
frontend节点接受job,然后把job分派给注册到他上的backend节点。
class TransformationFrontend extends Actor {
var backends = IndexedSeq.empty[ActorRef]
var jobCounter = 0
def receive = {
case job: TransformationJob if backends.isEmpty =>
sender() ! JobFailed("Service unavailable, try again later", job)
case job: TransformationJob =>
jobCounter += 1
backends(jobCounter % backends.size).forward(job)
case BackendRegistration if !backends.contains(sender()) =>
context.watch(sender())
backends = backends :+ sender()
case Terminated(a) =>
backends = backends.filterNot(_ == a)
}
}
请注意,TransformationFrontend actor会监视已注册的后端: context.watch(sender()),以便能够将其从可用后端工作列表中删除。 死亡监视器使用群集故障检测器来检测群集中的节点,即除了正常终止监视的actor之外,它还检测网络故障和JVM崩溃。 当无法访问的群集节点已被删除和删除时,死亡监视会向观看者生成终止消息:
case Terminated(a) =>
backends = backends.filterNot(_ == a)
}
节点角色(Roles)
并非一个集群内所有的节点都是做相同的功能,比如一部分节点做web前端服务,一部分做数据库操作,一部分做数字运算。对这些节点的部署,可以让他们负责不同的功能,节点之间通过集群感知路由的方式进行通信。
节点的角色通过在akka.cluster.roles配置中进行定义,并且通常在启动脚本中将其定义为系统属性或环境变量。
节点的角色是订阅的MemberEvent中的成员信息的一部分。
集群规模达到时如何启动集群
一种普通的方法是在集群初始完毕之后,所有的成员都已经加入到集群,并且集群节点已经达到了确定的大小,然后启动actor。
在配置文件中,可以配置akka.cluster.min-nr-of-members = 3,当集群中的节点有3个节点的状态达到joining的时候,leader才会把节点状态从Joining变为up。
可以通过不同的角色来设置不同的集群启动最小的规模大小:
akka.cluster.role {
frontend.min-nr-of-members = 1
backend.min-nr-of-members = 2
}
可以在registerOnMemberUp回调中启动actor,当当前成员状态更改为“Up”时将调用该回调,即群集至少具已经达到定义好的成员数。
Cluster(system).registerOnMemberUp {
system.actorOf(Props(classOf[FactorialFrontend], upToN, true), name = "factorialFrontend")
}
这种回调方法还可以用在除了启动acter的其他事情。
如何清理removed状态的成员
可以在registerOnMemberRemoved回调中进行一些清理,当当前成员状态更改为“已删除”或群集已关闭时,将调用该回调。
另一种方法是将清理任务注册到协调关闭(Coordinated Shutdown)。
注意:在已关闭的集群上注册OnMemberRemoved回调,将立即在调用程序线程上执行OnMemberRemoved回调,或者该回调会在当前member的状态变为removed的时候进行回调。你可能希望在集群启动的时候做部署一下清理工作,但是集群可能在你部署执行清理工作的时候已经关闭,这依赖于竞争而不是集群的健康状态。
网友评论