概述
squbs通过zkcluster模块获得集群化服务。zkcluster是一个利用ZooKeeper管理akka集群和分区的Akka扩展。
它类似于Akka集群的领导和成员管理功能。但它更丰富, 因为它提供了分区支持, 并消除了输入节点的需要。
配置
在运行时目录下,我们需要一个squbsconfig/zkcluster.conf文件。它应该提供了如下属性:
- connectionString:一个用逗号分隔定义所有zookeeper节点
- namespace:一个字符串,它是znode的有效路径,它将是此后创建的所有znode的父节点
- segments:用于划分分区数量的分区段的数目
下面是一个zkcluster.conf文件内容的例子:
zkCluster {
connectionString = "zk-node-01.squbs.org:2181,zk-node-02.squbs.org:2181,zk-node-03.squbs.org:2181"
namespace = "clusteredservicedev"
segments = 128
}
用户指南
首先将扩展简单的注册为所有正常的akka 扩展。然后你可以访问并使用zkClusterActor,如下所示:
val zkClusterActor = ZkCluster(system).zkClusterActor
// Query the members in the cluster
zkClusterActor ! ZkQueryMembership
// Matching the response
case ZkMembership(members:Set[Address]) =>
// Query leader in the cluster
zkClusterActor ! ZkQueryLeadership
// Matching the response
case ZkLeadership(leader:Address) =>
// Query partition (expectedSize = None), create or resize (expectedSize = Some[Int])
zkClusterActor ! ZkQueryPartition(partitionKey:ByteString, notification:Option[Any] = None, expectedSize:Option[Int] = None, props:Array[Byte] = Array[Byte]())
// Matching the response
case ZkPartition(partitionKey:ByteString, members: Set[Address], zkPath:String, notification:Option[Any]) =>
case ZkPartitionNotFound(partitionKey: ByteString) =>
// Monitor or stop monitoring the partition change
zkClusterActor ! ZkMonitorPartition
zkClusterActor ! ZkStopMonitorPartition
// Matching the response
case ZkPartitionDiff(partitionKey: ByteString, onBoardMembers: Set[Address], dropOffMembers: Set[Address], props: Array[Byte] = Array.empty) =>
// Removing partition
zkClusterActor ! ZkRemovePartition(partitionKey:ByteString)
// Matching the response
case ZkPartitionRemoval(partitionKey:ByteString) =>
// List the partitions hosted by a certain member
zkClusterActor ! ZkListPartitions(address: Address)
// Matching the response
case ZkPartitions(partitionKeys:Seq[ByteString]) =>
// monitor the zookeeper connection state
val eventStream = context.system.eventStream
eventStream.subscribe(self, ZkConnected.getClass)
eventStream.subscribe(self, ZkReconnected.getClass)
eventStream.subscribe(self, ZkLost.getClass)
eventStream.subscribe(self, ZkSuspended.getClass)
// quit the cluster
zkCluster(system).zkClusterActor ! PoisonPill
// add listener when quitting the cluster
zkCluster(system).addShutdownListener(listener: () => Unit)
依赖
在你的build.sbt或Scala构建文件增加如下依赖:
"org.squbs" %% "squbs-zkcluster" % squbsVersion
设计
如果您正在进行zkcluster的更改,请阅读:
- 成员是基于zookeeper临时节点,关闭会话会通过ZkMembershipChanged改变领导者。
- 领导是基于curator框架的LeaderLatch,新的选举将广播ZkLeaderElected给所有的节点。
- 分区由领导者计算,并由领导节点中的ZkPartitionsManager写入znode。
- 分区修改只能由领导者完成,它要求其ZkPartitionsManager来强制执行修改。
- ZkPartitionsManager 的追随者节点将观察Zookeeper的 znode 变化。ZkPartitionsManager 的追随者节点将观察动物园管理员的 znode 变化。一旦领导者改变了 paritions 在重新平衡后, 在追随者的节点将得到ZkPartitionsManager 通知, 并更新他们的内存快照的分区信息。
无论谁需要因为分区改变ZkPartitionDiff得到通知,应当发送ZkMonitorPartition 到集群已经注册的actor。
ZkMembershipMonitor是处理成员和领导的actor类型。
ZkPartitionsManager是处理分区管理的actor。
ZkClusterActor是用户应该向其发送查询的接口。
网友评论