AK 2.5.11 Cluster Sharding

作者: Queen〇fLaponia | 来源:发表于2018-03-22 10:57 被阅读390次

      分片是当下大数据标准名词,对于分布式存储意味如何决定数据存到哪台节点机去、对于分布式计算意味如何决定计算分布到哪台节点机上进行。Akka中的分片兼而有之,用于将实体(带状态actor,既有数据属性也有做计算的微服务属性)分布到多个节点机以实现分布式微服务,所有实体仍然属于一个统一集群。分布基于一个确定的分片规则,分片规则是一个算法/纯函数:输入是实体ID、输出一般是这个实体所在数据分区的编号即shard分片ID(分片ID和节点机之间有映射)。重点在于因为它是纯函数,随时随地可以调用无需依赖外界:不管在什么地方、什么时候调用一定得到一个唯一的确定结果:在实体初始分片时调用、在查找定位实体物理位置时调用,唯一需要保证的是实体ID是唯一且稳定的。简单说分片规则算法就是实体ID的哈希绝对值对最大分片数取模得到的余数就是分片ID:A simple sharding algorithm that works fine in most cases is to take the absolute value of the hashCode of the entity identifier modulo number of shards.

      Remote的actorSelection可以按逻辑路径查询actor,但需要指定搜索哪一台节点机:

        val selection = context.actorSelection("akka.tcp://actorSystemName@10.0.0.1:2552/user/actorName")

      这没多大屁用,但是加上一个简单的分片规则,使得actor的分布和定位自动化,实用价值立马爆棚。因此分片集群虽然是以Remote为基础,但是Remote基本没什么单独使用的必要。

      Sharding具备一个分布式分片管理层,说到分片集群不得不说的是草原老师在InfoQ的演讲,当时的lightBend测试博客。其他参考:Akka分片集群之旅。分片上下级结构是:每节点机唯一一个常驻的ShardRegion片区actor -> 下辖0~n个Shards分片actors -> 下辖0~n个Entity实体actors.  大致层次:

        集群-多台节点机  ->  每台节点机-1个片区  ->  每个片区-多个分片  ->  每个分片-多个实体

      ShardRegion是在集群每台参与分片的成员节点机上部署的分片管理actor,它是一种Stub驻点、longRunning的服务,集群所有片区actors构成了一个分布式分片管理层。携带实体ID的消息直接发给本机ShardRegion,ShardRegion分片管理层负责路由,根据消息的实体ID路由到相应节点机上的实体。因此,Sharding要求你必须提供从消息抽取分片和实体ID的函数、在这里分片ID抽取函数之所以也要提供,因为分片ID是从实体ID根据分片规则运算得到、而这个分片规则是可插拔的,由你定制,所以在用到的地方也要由你提供。实体ID是utf-8编码字符串。

    北京奥森

      可以有状态的actor即实体:It could for example be actors representing Aggregate Roots in Domain-Driven Design terminology. Here we call these actors “entities”. These actors typically have persistent (durable) state.

      Akka文档把实体类比为DDD聚合根:针对数据变化,把一组相关的对象划分为一个单元来考虑,即为聚合。聚合使用边界将内部和外部的对象划分开,每个聚合有一个根,这个根是一个实体并且,它是外部可以访问的唯一对象。根可以保持对任意聚合对象的引用,聚合内的对象也可以互相持有,但外部对象只能持有根对象的引用,显而易见,外部想要变更聚合内的对象必须/只能通过聚合根,根拥有全局标识符,并且有责任管理不变量。透过啰嗦的陈述,可以看到Akka想告诉我们的是actor可以创建、持有大量其他的对象,当然,这些对象都必须和该actor的业务定位相关,都是为这个actor所用的;然后,这些对象不能“逸出”、不能抛头露面。最后,由于在Akka世界中是一切皆actor模式,所以事实上根actor持有的是一些其他的actorRef,这就好理解了,说它是职责分派assign、worker替身模式都可以。

      实体必须有自己的唯一ID、分片也一样,分片ID用于一致性哈希分片,一致性哈希是诸如shard分片、region分区以及负载均衡神器。实体代码示例 实体类似于集群单例Cluster Singleton,如果你只用得到少量的分片actor,一台机器的资源也可以承载,那么为了简单起见用Cluster Singleton就够了,在此也可以看出实体和单例的一个共同点是:集群保证在一个时刻,一个实体/单例只会运行在一台节点机上,可以说ShardRegion片区actor是ClusterSingletonManager(都是本地Stub)与ClusterSingletonProxy(都是分布式通讯代理)的合体。

      所以、片区actor也是分片集群的消息路由器,消息的发送者不需要知道目的actor的实际位置,只要发给本机ShardRegion就好,因此,每节点一个的片区actor实际上构成了一个分布式路由层。每台节点机的actorSystem都应在启动时创建自己的唯一一个ShardRegion,而且:凡是发送给本节点实体的消息必然经过本地ShardRegion路由。你必须提供两个关键函数:

          1.extractEntityId:从消息当中提取entity实体ID的方法;

          2.extractShardId:从消息当中提取shard分片ID的方法,事实上是从实体ID通过分片规则运算得到,默认实现就够用,参见:ShardRegion.HashCodeMessageExtractor;

      ShardRegion会使用以上函数从接收的消息抽取ID以确定路由目标。初始化完成后、ShardRegion接收到第一条消息时会向ShardCoordinator 中央协调者询问shard位置,协调者决策由哪一个片区actor来持有特定shard分片,随后该ShardRegion会创建Shard分片子actor、Shard再创建独立实体,后面再来的消息不再查询中央协调者直接路由:ShardRegion —> Shard —> Entity. 

      片区actor的一个重要功能是可以远程路由,也就是将消息路由到别的集群节点机片区actor上,所以说分片集群每台节点机都有完整一致的路由能力,消息可以发送给任意一台节点机。第一次远程路由需要询问协调者,随后缓存路由信息之后就不需要再询问了。

    孩子与海

      ShardRegion片区actor管理下属的一组shard分片actor(shard继续往下就到了实体actor:A shard is a group of entities that will be managed together.),类似HBase的RegionServer管理下属的region数据分区(region往下就是数据元素了);ShardRegion可以看做“Shard Home”

      同样类似RegionServer,系统启动时也需要启动ShardRegion,并且还需要做一件事:注册支持的实体类型,启动+注册通过一个方法调用一块做:ClusterSharding.start,该方法会创建本地ShardRegion并且会返回给你actorRef、这个actorRef可以随时通过ClusterSharding.shardRegion找回,如果当前节点机角色与ClusterShardingSettings中指定的不符,则shardRegion以proxy only mode模式启动:可以做消息代理但不参与分片,start方法完整签名:

    val counterRegion: ActorRef = ClusterSharding(system).start(

      typeName = "Counter",

      entityProps = Props[Counter],

      settings = ClusterShardingSettings(system),

      extractEntityId = extractEntityId,

      extractShardId = extractShardId

    )

      对extractShardId 函数的唯一硬性要求是实体ID和分片ID之间的关系必须满足:对于一个特定的实体ID,得到的分片ID必须是定值:For a specific entity identifier the shard identifier must always be the same. 所以可以使用实体ID与numberOfShards(shards:最大分片数)求余得到分片ID(absolute value of the hashCode of the entity identifier modulo number of shards.),求余这种最简单的运算,可以将无限大的数字(实体ID hash)对分片数求余之后映射到一个小于该分片数的定值整数,对递增的实体ID求余得到的数字在{0, numberOfShards}这个区间振荡. i.e. 对于一个实体ID不管你在什么时候、什么地方去计算得到的都是同一个合法的分片shardID,这是个小学二年级学到的规律:余数小于除数。注意numberOfShards必须固定不变,所以写在代码里也是无妨,numberOfShards和HBase的region分区数量的不同在于,我们可以自定义numberOfShards的值,只要不是太大或者太小都无妨,比如就设100,那么从2台到100台的集群都适用。

      shards最大分片数:numberOfShards是你按照规划的集群规模所定义的你打算分多少片,最佳实践是分片数应该是规划的最多节点机数量的十倍大. i.e. 每台机10个shard,比如10台机则分100片。分片数少于节点机台数会造成一些节点机分不到shard. 过多的分片数会造成不必要的分片管理负担,e.g.  造成再平衡分片负担、还会增加延迟,因为路由给shard第一条消息时会进行各种协作协调,这里隐含每台节点机超过一个数量级(10个)分片可能会开始带来一丢丢负担,节点机<=10个分片情况,没有明显的管理负担,大致估计每节点机10~100个分片都不成问题。

      集群节点机随着伸缩可以变,但是numberOfShards不能变:For a specific entity identifier the shard identifier must always be the same 对确定的一个实体ID必须生成确定的一个分片ID,必须满足严格的一对一关系. Otherwise the entity actor might accidentally be started in several places at the same time. 分片算法的目的是:Try to produce a uniform distribution 产生一个统一分布,分片算法在集群所有节点都必须是一致的,它也是典型的不可滚动升级的,也就是说要改变numberOfShards,必须停掉集群所有节点机。 As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number of cluster nodes.  举例来说:10台机100个分片、宕掉9台、那么剩一台机也得自己跑100个分片,100个分片的集群最多扩容到100台机,101台就会有一台分不到分片运行,100分片就可以满足1~100台机的伸缩。

      综上所述,上代码:

    val extractEntityId: ShardRegion.ExtractEntityId = {

      case EntityEnvelope(id, payload) ⇒ (id.toString, payload)

      case msg @ Get(id) ⇒ (id.toString, msg)

    }

    val numberOfShards = 100

    val extractShardId: ShardRegion.ExtractShardId = {

      case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString

      case Get(id)              ⇒ (id % numberOfShards).toString

      case ShardRegion.StartEntity(id) ⇒  (id.toLong % numberOfShards).toString

    }

      ShardRegion.ExtractEntityId是 “标记类型”Marker type of entity identifier (`String`). 在ShardRegion中酱紫定义的:

      type EntityId = String


    How it works

      ShardRegion片区类似于hadoop里的NodeManager、ShardCoordinator 中央协调者类似于ResourceManager.  分布式系统中要确保实体单例,最重要的就是要做到所有节点具有一致的shards分布统一视图,也就是某个shard位于哪一台节点机,大家的看法都是一致的,因此,初始的shard位置分派以及后期的调整,是由集中式的中央协调者来决定。协调者以集群单例cluster singleton形式存在的, i.e.  在长老节点上的唯一实例。

      如果集群有新加入成员则中央协调者就得做shards的再平衡,i.e. 也即从一个节点迁移实体到另一个节点—服务主动迁移—实质是负载均衡。再平衡过程中,协调者首先通知所有片区一个shard切换/handoff换手 即将开始、所有片区则会将发给该shard的消息本地缓存、协调者对位置未定的处于再平衡阶段shard的请求不予答复,直到换手完成。拥有shard的原片区会有序停止该shard下的所有实体:给他们喂handOffStopMessage(default PoisonPill) 毒丸,该shard下的所有实体死光以后,协调者开始向新位置路由消息,所有片区缓存的消息都会路由到新位置,负责接手再平衡shard的片区会创建启动新的实体,注意这又是一次按需创建。但是这些新实体的状态如果需要恢复则需要使用Persistence. 综上所述,因为分片集群往往涉及服务迁移,而服务迁移又需要恢复服务状态,所以分片集群和persistence好基友是在Akka2.3一块发布的:For this reason Cluster Sharding and Akka Persistence are such a popular combination.

      shard的分配以及再平衡都由可插拔组件决定:ShardCoordinator.LeastShardAllocationStrategy最少分片优先分配策略,该策略会从当前拥有最多分片的片区上选取去做再平衡的分片,把它handoff转手给当前拥有最少分片的片区,一般也就是集群新成员,可以配置一个阈值,该阈值指定最大达到多大的差距(最多分片和最少分片的差)就必须开始再平衡。

      各个shard的位置信息保存在中央协调者中,这些信息就相当于中央协调者自身的状态了,为避免单点,中央协调者的状态默认采用Distributed Data 做容错,当中央协调者crashed,新的协调者将会接任并恢复状态,在此失效期间,各个片区缓存的shards保持可用、发给一些节点还不知道的shard的消息也会得到缓存、直到新的中央协调者恢复完毕、之后就全部走正常流程。shard位置信息是典型的运行时数据,它只在系统运行起来以后产生、并且也只在系统运行期间有价值,一旦整个集群系统都停掉了,这些信息也不再有价值了。相比之下,实体记忆则是持久化的:The state of Remembering Entities is durable, i.e. it is stored to disk. The stored entities are started also after a complete cluster restart.

      只要经过同一个片区actor向同一个实体路由消息,则消息的顺序可以保障As long as a sender uses the same ShardRegion actor to deliver messages to an entity actor the order of the messages is preserved. 这等价于Akka的另一个保障:两个确定的actor之间的消息保证送达顺序,也就是消息顺序得以保障的上下文是the same sender–receiver pair.  关于消息投递可靠性保障,要做到at-least-once需要基于AtLeastOnceDelivery in Persistence.

    分布式数据 vs Persistence持久化

      节点机不管是伸还是缩(包括有计划地减机器和意外Crash),都会涉及到实体的迁移,那么,实体的状态怎么恢复?两种方式:ddata(Distributed Data)或persistence.

      二者的功用一样,没有优劣之分,都能实现集群容错,协调者和片区的状态保持默认依靠分布式数据。如果除了做容错你的actor没有其他地方用到持久化的话,为了方便起见可以只用分布式数据,这样你就不用再安装、维护和操作第三方外部存储了。ddata有一个闪亮亮的特性:All data entries are spread to all nodes !发挥一下想象力:有了介个咱就可以在产品特性里写上一句:高可用集群支持n-(n-1)超级容错。

      持久化特性从一开始设计就应该固定下来,因为这个特性在整个集群必须统一,i.e. 不可能滚动升级该特性。我们在做实时系统时,第一直觉还是分布式数据更好,不过实际上持久化做存储的时候使用的是异步actor,所以也没太差。另外在持久化Actor中还有另外两个关键的的概念就是JournalSnapshot,前者用于保存日志流水,后者用于持久化快照,两者在Actor survive failures的时候都起到了至关重要的作用。Akka社区有HBase持久化插件也有redis的,HBase插件可以保存journal和snapshot,基于openTSDB的底层组件asynchbase,这玩意是openTSDB公司基于自己的异步hbase操作库开发的,不过一者是我们要去做异步持久化实际上可以直接基于actor的worker模式去做、二者HBase官方客户端也看到原生HBase只有同步客户端不太好、也在慢慢加入异步feature.

      分布式数据feature是默认开启的:akka.cluster.sharding.state-store-mode = ddata/persistence则为持久化,协调者的状态基于它保持,是一种WriteMajority/ReadMajority多数读写一致性。协调者的状态并没有默认保存到磁盘,当整个集群所有节点机宕机或停掉,状态会永久丢失,事实上也不再需要了。

      Remembering Entities实体记忆是持久化到磁盘的,它会持久化每个shard所拥有的实体名单,即使是整个集群完全重启,依然可以恢复重启之前每个shard的实体。设置rememberEntitiesflag = true可以开启,该设置在调用 ClusterSharding.start时、在ClusterShardingSettings上设置。同时确保你写的shardIdExtractor分片ID提取器具备代码Shard.StartEntity(EntityId). 该代码直接从一个EntityId映射到一个 ShardId,示例:

        val extractShardId: ShardRegion.ExtractShardId = {

          case EntityEnvelope(id, _)                 ⇒ (id % numberOfShards).toString

          case Get(id)                                          ⇒ (id % numberOfShards).toString

          case ShardRegion.StartEntity(id) ⇒  // StartEntity is used by remembering entities feature

                (id.toLong % numberOfShards).toString //该代码在集群重启时从持久状态恢复自动调用

    }

      配置为实体记忆后,Shard再平衡到另一个节点上时、或者从Crash恢复时,总会重新创建之前所拥有的所有实体,这是它默认的、全自动的行为,要完全停掉实体,需要发送Passivate消息给实体actor的父actor,否则实体总会被重新创建,配置为rememberEntitiesis=false的话,Shard在再平衡或Crash恢复后不会自动重建实体,实体只会在第一条属于它的消息到达Shard时被创建一次,也就是依然是按需创建creating on demand.

      分片使用自己的分布式复制子Replicator,在每个节点上都有,以这种方式,你可以指定对某些实体类型分配到某些节点上、另外的实体类型则分配到另外一些节点上. Each such replicator has a name that contains the node role and therefore the role configuration must be the same on all nodes in the cluster, i.e. you can’t change the roles when performing a rolling upgrade.

      ddata配置项:akka.cluster.sharding.distributed-data.

      实体记忆的性能代价还是有点高的,shard再平衡时,性能消耗随着实体数量增长而增长,当前版本的AK,如果每个shard的实体数量超过1w个的话,我们不推荐使用该特性。

    Startup after minimum number of members

      使用akka.cluster.min-nr-of-members 或 akka.cluster.role..min-nr-of-members. 可以指定集群开始分片的最少启动成员节点机数量,系统直到达到该数量的ShardRegion启动上线才会开始shard分配,这可以避免过多shards在启动阶段分配到第一个ShardRegion、然后在后续节点机陆续启动后又再次再平衡rebalance.

    Proxy Only Mode

      ShardRegion片区可以是纯代理模式,此时它不会创建任何实体, i.e. 只做消息的分布式路由,这时它更像一个纯路由器、属于分布式路由层但不参与分片、像是分片集群的旁观者。用ClusterSharding.startProxy专用方法可以创建纯代理Shard。比如说,做流数据处理的时候,有一些前置接收数据的节点机、还有一批后端组成分片集群的节点机,就可以这么干。再者,还可以通过角色划分达到同样目的,在调用ClusterSharding.start启动ShardRegion actor时,如果本机角色和ClusterShardingSettings指定的角色不符,则本机启动的ShardRegion就处于纯代理模式。这个feature感觉和集群客户端、分布式发布订阅有功能重叠,还要更方便一点,因为消息的发布只要发给本机ShardRegion就可以了,归纳一下各自特点可能是:

      1、纯代理ShardRegion只会把消息发给一个特定实体(先发给该实体所在节点机的ShardRegion再由他转发);

      2、集群客户端则是允许一个集群向另一个集群发送消息、或者是因为某些原因不能加入集群的机器向集群发送消息;

      3、分布式发布订阅则是集群内的、标准的消息订阅和发布机制,作为一个功能模块,使用更灵活功能更丰富,首先允许订阅者动态变化,其次支持针对主题的publish、针对path的send和SendToAll;

      综上,如果不采用热备方式,也就是说一个实体就只有一个,可以采用1、2;如果采用热备可以考虑3,同一份数据消息发给不止一个一模一样的功能实体。

    Passivation

      钝化,如果实体较长时间内不再用了,可以停掉他们减少内存占用。还可以定义消息接收超时receive timeout (context.setReceiveTimeout). 可以自动钝化。但是送达钝化actor的消息会被删除,要不丢消息,也就是优雅/有序钝化,实体可以先通知自己的父替自己代收消息、发送ShardRegion.Passivate消息给它的父也就是Shard actor.:我要钝化掉了、归隐磁盘,我的消息您帮我留着。那么Shard就会缓存它的消息 between reception of Passivate and termination of the entity. 实体是实例,它钝化实际上就是死亡,等到有它的消息来了,需要复活它来处理,算是克隆吧,但确实不是一个实例了,所以缓存消息最终会由该实体的一个新的incarnation化身也就是新实例来处理。这种用法适合移动互联网:用户离开/关闭app时actor钝化掉。

      使用ddata的时候,实体ID会被存储到ddata的Durable Storage. You may want to change the configuration of the akka.cluster.sharding.distributed-data.durable.lmdb.dir, since the default directory contains the remote port of the actor system. If using a dynamically assigned port (0) it will be different each time and the previously stored data will not be loaded——没看懂

    监管体系

      如果你想对实体应用自定义监管策略supervisorStrategy来代替默认的重启策略,你得写一个所有实体的直接父actor,在里面定义自定义策略:Escort是父、counter是子

    class Escort extends Actor {

        val counter = context.actorOf(Props[Counter], "theCounter")

        override val supervisorStrategy = OneForOneStrategy() { //默认监管策略

            case _: IllegalArgumentException ⇒ SupervisorStrategy.Resume

            case _: ActorInitializationException ⇒ SupervisorStrategy.Stop

            case _: DeathPactException ⇒ SupervisorStrategy.Stop

            case _: Exception ⇒ SupervisorStrategy.Restart

        }

        def receive = { case msg ⇒ counter forward msg  }

    }

      那么,我们可以让这个Escort负责所有实体的创建,如果实体之间还有上下级关系,那么就在创建下级实体之后将其actorRef以消息形式发给上级实体,上级实体保持持有所有下级实体actorRef即可。这样,只有Escort作为所有实体的监管者,而实体间的上下级关系也具备,分离监管层次和业务上下级层次,完美。Escort就像普通实体一样创建启动即可:

    ClusterSharding(system).start(

        typeName = "Escort",

        entityProps = Props[Escort],

        settings = ClusterShardingSettings(system),

        extractEntityId = extractEntityId,  extractShardId = extractShardId

    )

    注意停止的实体,当有它的消息来到时会自动重新启动——克隆新化身。

    优雅Shutdown

      给ShardRegion发送ShardRegion.GracefulShutdown消息来优雅地手动停止它,它的shard将会迁移,期间属于它的消息缓存和再平衡过程一样。

    警告:不要把Cluster Sharding和Automatic Downing联合使用。自动下线配置允许集群分裂为两个独立小集群,这会导致多个分片和实体同时运行,这会破坏Akka集群元数据,可能导致集群无法重启。如果还用了persistence那么还可能会破坏你的业务数据。

    如果已经造成Akka集群元数据破坏导致集群无法重启,使用如下Main命令执行程序清除损坏的Akka集群元数据:

    java  -classpath  jarFiles

        akka.cluster.sharding.RemoveInternalClusterShardingData

        -2.3 entityType1 entityType2 entityType3

      该程序包含在akka-cluster-sharding jar包文件,最好使用一致的classpath 和配置,可以使用sbt或Maven.  entity type实体类型和ClusterSharding.start中的一致。

      -2.3指定了删除Cluster Sharding in Akka 2.3.x存储的数据,因为这个版本使用了不同的persistenceId.

    所有配置自己看吧:Configuration

      突然想到一个actor模型的缺陷,那就是似乎做不到精确的定时任务,因为和actor对话的方式只有一种那就是发消息给它,但是消息必须先进消息队列,进队列就可能会延迟,做不到在一个精确时间点上做某一件事,比如保存一个时刻的大量actor的快照断面,延伸想到actor是否可以允许多个邮箱,按照优先级分类比如加急邮箱。难得挑出actor模型的毛病,它几乎就是OO的理想国:面向对象系统是由对象及其相互之间的消息构成。

      不那么精确的定时任务AK有个内置支持scheduler,使用步骤:

      1、从ActorSystem可以获取scheduler:在一个任务actor中可以用:context.system.scheduler来得到一个定时器,同时需要一个隐式传递的ExecutionContext,我们知道,它基本就是一个线程池,scheduler需要它提供的线程来执行 timer task 定时任务,一般情况下只要:import context.dispatcher. 也就是直接使用当前actor自己的dispatcher.

      2、scheduler.scheduleOnce(time, actorRef, message)方法可以把message消息调度给一个future 、以time为定时、消息会被发送一次、给actorRef也可以是self.  所以说即使是自己的事也得用消息通知,类似记事帖,要是到了时间自己很忙呢,就可能滞后。scheduleOnce方法返回值是一个Cancellable,如果一次性的任务按时完成了,可以用它取消timer.

      其它参考:替身模式

      任务actor代码示例

    相关文章

      网友评论

        本文标题:AK 2.5.11 Cluster Sharding

        本文链接:https://www.haomeiwen.com/subject/gbntfftx.html