集群的基础是远程,第一必须打开Remoting功能、第二必须在application.conf添加akka.cluster.seed-nodes配置,种子节点是新节点机加入集群的联络点,是任何集群必须有的配置。Akka的种子以及leader节点并没有太多特殊之处,因为Akka是类似cassendra的AP无主系统,和hbase之类的master不同,即使是leader节点也是很轻量的,切换代价也小得多。
回顾Akka Cluster是干嘛用的:provides a fault-tolerant decentralized peer-to-peer based cluster membership service with no single point of failure or single point of bottleneck. It does this using gossip protocols and an automatic failure detector. 基于成员服务提供了容错无主P2P集群。
Cluster Supreme 集群才是王道,区别是有主的和无主P2P的一些场景中,需要集群运行唯一一个actor. 你可以使用集群成员事件订阅来自己实现,但是仍然有一些边边角角的细节,所以,这个典型功能由Cluster Singleton实现为集群功能模块。
单例功能实际上可以部分代替分片集群,业务上对实体分好类后,可以令每种实体都是单例,一堆单例在一堆节点机上自由分布,这是一种融合主备+集群的思路如何在多台节点机上分布actors并且只使用逻辑名就可以与这些actor交互?这样就不用管actor实际位于哪台节点机,同时,分片集群中的实体状态往往需要分布式备份,用于宕机时实体迁移恢复状态,有两种做法:ddata和持久化,Akka官方推荐持久化,但如果流式数据不是全部需要存盘的话这样稍微有点重,仅仅为了恢复就长期持续存盘大量数据,如果不想写磁盘而是使用分布式缓存的话(since then you don’t have to setup and operate a separate data store (e.g. Cassandra) for persistence.)也有redis社区插件可用(原生ddata应该不适用海量大并发业务数据),但使用redis还是心有不甘,因为白白引入了一层进程间交互,而其实作为actor style我们自己做分布式缓存是顺理成章的事。
另外:在系统稳态时通过分片发送的消息可以保证顺序;稳态时分片集群的额外负载仅仅是最多两次通过片区转发消息这一丢丢:Once a shard’s location is known the only overhead is sending a message via the ShardRegion rather than directly.
支持集群中的actors之间的消息发布订阅(发布订阅语义天生隐含不关心物理位置),还支持仅使用目标actor逻辑名的点对点point-to-point的Send发布,后者并非无用,因为Akka对消息顺序的保障是一对固定的actor之间消息保证顺序,如果是前者的话,在sender和destination之间还有各自的本地mediator actor,经过测试,1万客户端通过TCP长连接每500ms生成模拟数据场景,顺序应该是有保障的,但是如果频率再高的话不好说。但后者应该可以保障顺序。
云的技术价值在于持续的业务数据处理即使在宕机情况下也能做到数据不丢不重不乱,这代表服务可迁移高可用,本质上是分布式的能力价值在另外一个actor system(不属于当前集群)中的actor与当前集群中的某个actor互相通讯,这“另外一个actor system中的actor”就是集群客户端,它不需要知道目标actor的实际位置。这玩意适合做集群监控,监控客户端使用传统web,在传统程序和actor style之间的一个翻译中介。
ddata用于在集群中分布式共享数据,对这些数据的访问使用key-value形式的API.
6、Aware路由器
所有路由器router都能够感知到集群的成员节点,router会吧消息路由给routees,routees随着节点机下线上线而变化,router自动感知这些变化。这个功能和发布订阅功能感觉都可以部分被分片集群替代,分片集群中ClusterSharding.startProxy这样启动的片区,就是一个消息Proxy,它知道所有实体的位置,可以为你做路由。
7、集群衡量Metrics
集群的成员节点可以收集到集群健康度数据,并且可以发布给集群其他节点或者是本机事件总线event bus上的订阅者。
在GearPump项目中,最简场景是一主master+多从worker,也就是一个监工一大把苦力,可是万一这一个监工不小心猝死了咋整?当然还是老套路一主多备,也就是启用master的HA,此时多个master节点会组成一个独立集群,master的状态要时刻保持增量同步,要做到这种保障级别的原因是master集群actor有不可丢失且不可再现(Replay)的状态,拿GP项目来说,各个app的当前状态(运行到哪一步、jar在哪里、提交参数...)是由master维护保持的,这就是master的状态,因为是运行时产生的数据,需要自己保持住这些状态,master使用了ddata分布式数据来保持这些状态,以备master节点宕机服务迁移。至于worker工作节点的状态一般都是可再现的也就是有第三方来保持,比如流式处理应用,一般基于前端的kafka+检查点机制,状态可以回放,当掉了重启回放即可,不必在集群间同步维护。源码org.apache.gearpump.cluster.master.Master的注释也说明了这点:We use Akka clustering to implement the Master high availability. The cluster consists of several master nodes, but no worker nodes. With clustering facilities, we can easily detect and handle the failure of master node crash. The master state is replicated on all master nodes with the Typesafe akka-data-replication library, when one master node crashes, another standby master will read the master state and take over. The master state contains the submission data of all applications. If one application dies, a master can use that state to recover that application. CRDT LwwMap is used to represent the state; it is a hash map that can converge on distributed nodes without conflict. To have strong data consistency, the state read and write must happen on a quorum of master nodes.
ddata的值类型是CRDT(Conflict Free Replicated Data Types),这东东的好处是在任意节点你想更新数据、直接做、没有必要和谁商量、也就是不必做任何分布式协调:比如和某个中心server通讯,这么说有点抽象,举例来说分布式事务整个就是一个大规模分布式协调过程。在不同节点机上的并发更新会通过monotonic merge function被自动融汇成唯一结果,承诺最终收敛。事实上从单机并发时代,就已经开始这个趋势了,那就是无阻塞(也就是无协调)的并发、或者是不需要去协商同步的操作:
1、单机时代:在单机时代并发只用锁就已经out了,CAS非阻塞同步机制在JDK提供有Unsafe包下的AtomicInteger、AtomicLong等,是更好的volatile. 对数据容器有CopyOnWrite.
2、集群时代:更要追求无阻塞并行,因为在单机上并发的协调发生在线程之间(即使是进程之间也是在单机上),但是在分布式情况下并行协调是跨机器的网络IO,操作成本高得多。Akka的集群分片、leader选举、ddata更新都是无需协商的,它们要么依赖于一个独立的算法或者规则、要么依赖于后期的异步收敛,所以说无主集群是未来。分布式事务、分布式锁、单点的共享数据都是我们不提倡的。
集群由很多节点组合而成,每个节点的标识是hostname:port:uid这样一个三元tuple. UID是actorSys实例标识。CRDT虽好但是不能用于大数据量高并发场景,一般用于状态的分布式同步。
敲黑板,下面聊聊FD(failure detector)、FF(Fast Failover),待完善FD用于关键性的节点机探活、宕机下线通知,使用了The Phi Accrual Failure Detector,FD对健康度持续监视和评价、达到一定判定标准则判定宕机,解耦了监视和判定,判定可以由你来定义健康度阈值threshold,超过则判定宕机,这样FD可用于更多场景、更通用。默认阈值是8、适用大多场景,云环境如Amazon EC2可以调到12,我测试的在网络稳定情况下默认判定时间是6~10秒,所有的宕机判定都有一个硬约束就是需要一定时间,由于TCP,这个时间无法完全消除。如果你想要更快的判定时间,有商业组件支持分片集群Fast Failover
Akka集群中每个节点都会被诺干其他节点(默认最多5个)监视,任何一个节点监视到下线也就是unreachable,都会用gossip广播出来,也就是说,只要有一个节点判定宕机则集群判定其宕机。这诺干监视者节点是在hashed ordered node ring哈希排序节点环上挑选的近邻节点,这样可以提高跨机架/数据中心的可能性,以免监视者和被监视者都在一个机架,同时这个排序顺序在所有节点上都是统一的(一致性哈希), 确保全覆盖full coverage.
Heartbeats心跳比如是每秒一次,每次心跳都是一种请求/应答式request/reply handshake,应答会作为FD的计算参数(比如累计计算历史心跳平均返回时间、这作为网络通讯状况加以考虑),请求/应答式通讯也叫r-r,它隐含了这种通讯是同步的,至于是不是长连接有待考证。
FD同样关心节点重新上线reachable事件,当所有监视一个节点的节点重新判定其恢复上线了,集群经过gossip dissemination协调,将判定其重新上线。我们看到对宕机下线是敏感型判定,只可信其有;而对重新上线是迟钝型,需要100%把握。
除了上述明确的主动的下线判定,如果有太多的系统消息没有收到一个节点的确认回复,则该节点将会被检疫隔离、认定下线并且无法恢复,这些系统消息包括watch, Terminated, remote actor deployment, failures of actors supervised by remote parent. 之后,该节点将被过渡到down或者removed状态 (下线)、检疫隔离无法恢复, 这与unrechable状态不同,unrechable的节点还是可以重新rechable的。该节点想要重新加入集群只能重启actorSys,Akka可能认为这种健康状况不明的节点是最麻烦的,假死无响应比明确的死掉更麻烦,所以一律需要隔离剔除,而unrechable反而是明确的状态,可能由于网络抖动造成的心跳丢包之类的短暂失联,是网络问题而节点本身还算健康。
使用原生的集群事件订阅功能:akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]以及可以接收集群事件的示例actor,这样你的actor就能订阅MemberUp、UnreachableMember、MemberRemoved等集群成员事件。
值得看的四部分内容(这些内容与GP项目相关):
1、worker拨号加入例子(貌似所谓的worker模式):角色为backend的worker拨号通知角色为frontend的master,即worker向master报道。工作任务会被发送给frontend服务,frontend会把工作委托给一个backend,它会执行文本转换工作并将结果返回给frontend。backend以及frontend节点都可以动态加入或剔除,这个例子的结构是frontend master不会订阅任何集群事件,启动后只是等待消息;只有backend worker会订阅up上线事件,凡是有上线成员且是master则向其报道,master接收worker报道后会记住它并Watch守望它,worker死亡时就会自动通知到master,也就是说关联是单向的只有master到worker,这是为了便于master给worker分配工作任务。启动顺序是先启动一个master再启动worker,那么后启动的worker怎么知道master呢?因为它也会接收到CurrentClusterState事件,这是专为新加入的成员发送的集群当前状态,它有一个属性status是akka.cluster.MemberStatus.Up类型,该类型表明的是当前成员的静态状态,这个Up应该称为在线,与MemberUp上线不同,上线是动态的,是成员状态转变为Up的动态事件,可以看一下akka.cluster.ClusterEvent.MemberUp源码注释:Member status changed to Up.
显然,订阅了集群事件的actor收到的第一条集群状态总览消息里面,所罗列的member状态只能是MemberStatus.Up在线。MemberStatus的注释说明:
Defines the current status of a cluster member node
对比ClusterEvent的注释说明:
Domain events published to the event bus. Subscribe with
Cluster(system).subscribe(actorRef, classOf[ClusterDomainEvent])
backend worker订阅了集群事件,以便发现新加入的frontend节点,并向其发送一个注册消息,等于向frontend master报道。frontend master会Watch守望注册过的backend worker以便当其不可用时从自己的worker列表中删除,不可用情况包括网络分区、JVM crash以及worker actor的正常终止。
2、Node Roles:一个集群中的各个节点机可能有不同角色、负担不同的职责、执行不同的工作,比如可能会有几个负责web前端frontend、一个负责数据访问backend、还有一个负责大数据量运算(再比如GP运行时,如果启用master HA则会有多台master角色节点机、其他的节点机基本上都是worker),角色这东西没什么,无它、就是为了划定一个范围,这个范围有些地方用得到,比如有节点机上线下线,肯定要看它是什么角色再采取相应处理,这是很自然的事;再比如GP的Master是自成actor集群的,该集群有一个分布式单例MasterWatcher,它的(单例)范围就是master角色节点机范围:.withRole(MASTER)。节点机的角色由配置项akka.cluster.roles决定,比如GP项目gear.conf配置文件就有两种角色配置:
cluster {
roles = ["master"].............master节点组成集群
roles = ["worker"]........................worker节点只启用了远程没有组成集群
3、How To Startup when Cluster Size Reached: GP项目使用了自定义的quorum法定多数节点机机制,这么计算:val quorum = masterList.size() /2 +1,可以看到这也与GP的介绍说明相符:只要超过半数的master节点正常在线,整个集群就可以继续function,你可以定义一个最少上线节点数,达到了则leader就可以开始了!:With a configuration option you can define required number of members before the leader changes member status of ‘Joining’ members to ‘Up’. 可以看到所谓的集群启动就是leader可以将成员状态过渡到MemberStatus.Up,这个神奇的最少上线节点配置就是:
akka.cluster.min-nr-of-members = 3
还可以按照角色(实际上还是划定范围,这里用到了角色):
akka.cluster.role {
frontend.min-nr-of-members = 1
backend.min-nr-of-members = 2
}
还可以:You can start the actors in aregisterOnMemberUpcallback, which will be invoked when the current member status is changed to ‘Up’, i.e. the cluster has at least the defined number of members:
Cluster(system) registerOnMemberUp {
system.actorOf(
Props(classOf[FactorialFrontend], upToN, true),
name = "factorialFrontend")
}
This callback can be used for other things than starting actors.
看成员当前的状态是看MemberStatus,而集群事件则是ClusterDomainEvent,这个封闭特质也是Marker interface标记接口,说白了就是只用来订阅的时候表示一下,表示我要订阅的是什么类型的消息,其他没卵用,sealed封闭的作用没记错的话是它的子类型不能写到别处去,只能和它一样写在ClusterEvent.scala这个源文件里,在Akka里面,消息的定义写在哪里这是个事儿,一般首选是和处理这些消息的actor定义写在一块,但供扩展使用的订阅消息就不行了,所以这里统统写在ClusterEvent.scala这个源文件里:
akka.cluster.ClusterEvent.MemberEvent是ClusterDomainEvent的一个子集。
集群单例
GP的Master是单例,单例用于集群范围的一致决策、对外唯一入口或者worker模式,还可以用于中心式的命名、路由服务。以上场景GP项目基本全占。且GP Master单例采用了增加一个单例父监管actor的方式。
单例肯定是一个性能单点,所以别用来做有性能要求的事就好了,再者它某种程度上是一个失效单点,这个倒不要紧,因为它可以自动迁移。总之别用来做大数据量关键业务处理就行,ChanaMQ有用单例做集群节点机ID分配。
本地CSP(ClusterSingletonProxy)用于与单例通讯,该代理会追踪单例、维护一个单例的Ref:resolve the singleton’s ActorRef by explicitly sending the singleton’s actorSelection the akka.actor.Identify message and waiting for it to reply. 所谓维护就是基于单例的actorSelection、使用标准Identify消息定位、获取并维持其actorRef,这是一种自动化的远程actor定位套路。GP Master单例有父监管,所以在CSP的Settings设置了.withSingletonName(s"${MASTER_WATCHER}/$MASTER")指定‘real’单例的逻辑路径,CSP按这个逻辑path定位单例。
在CSM的Settings可以设置.withHandOverRetryInterval(),这个handOverRetryInterval(换手期重试间隔)参数注释: When a node is becoming oldest it sends hand-over request to previous oldest, that might be leaving the cluster. This is retried with this interval until the previous oldest confirms that the hand over has started or the previous oldest member is removed from the cluster.
在CSP的Settings可以设置:.withBufferSize(),也可以在配置文件中指定:akka.cluster.singleton-proxy.buffer-size = 1000,这个BufferSize即用于:当单子暂时找不到时(换手期)本代理可以缓存的消息数,当单子再次上线,这些消息将投递给单子,最多一万条。
有的场景可以考虑用单例替代分片集群,好处是有CSP替你定位单例、缓存并replay消息。
Usage章节的一些bla
加入种子节点
种子节点就是新加入节点的初始联络点,除了这一点种子节点没有任何特殊的,它们加入集群也和其他节点一样。种子节点需要在application.conf预先配置:
akka.cluster.seed-nodes = [
"akka.tcp://ClusterSystem@host1:2552", "akka.tcp://ClusterSystem@host2:2552"]
也可以定义为Java system properties:
-Dakka.cluster.seed-nodes.0=akka.tcp://ClusterSystem@host1:2552
-Dakka.cluster.seed-nodes.1=akka.tcp://ClusterSystem@host2:2552
种子节点可以以任意顺序启动,也不需要所有种子节点必须启动,但是在初始启动集群时seed-nodes配置的第一个节点必须启动,否则其他种子节点无法启动,这是为了避免在空集群启动情况下形成分区孤岛。一旦两个以上种子节点启动完毕,再关闭第一个种子就没问题了。
An actor system can only join a cluster once. 一个actor系统只能有一次机会加入一个集群,actor系统不能同时加入两个集群;如果actor系统重启了,它也只能重新加入集群(使用相同的host name 和 port)
Downing下线
程序执行下线代码:Cluster(system).down(address)——将本机某actor system下线;或者启用了Automatic Downing情况下发生了网络分区。
发生网络分区时,分区的两端都会认为对方unreachable了,并进而将对方下线,结果就是产生两个分离的集群 。可怕的是:This can also happen because of long GC pauses or system overload. 长GC或系统过载(无响应)也会造成此种情况。
我们不建议在生产环境使用AK Cluster的auto-down feature,如果你也使用Cluster Singleton 或 Cluster Sharding 可能会造成致命性影响的错误,特别是也用Akka Persistence.
说白了,你就别用Automatic Downing了,它会造成俗称的精神分裂。
Leaving离开
有两种方式从集群删除成员:
1、停止actor system或者JVM进程,随后FD会探测到不可达,进而剔除;
2、推荐优雅退出:val cluster = Cluster(system); cluster.leave(cluster.selfAddress); 一个system只能属于一个Cluster
订阅集群事件
你可以订阅集群成员的状态变化通知:cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember]),作为整个集群的状态快照,akka.cluster.ClusterEvent.CurrentClusterState 将会作为第一条发送给订阅者的消息。
云只是商业、云只是口号、分布式是根基、 Cluster Supreme 集群才是王道至尊
网友评论