持久化
当我们在集群系统中,一台机器向另一台机器发送一段数据,负责接收的机器在接收数据前突然宕机,就会造成数据丢失无法恢复。Akka实现了对actor 持久化的方法来恢复数据。
Akka持久化使有状态的actor能保留它的内部状态,因此我们不会因为JVM崩溃、监管者引起或集群中迁移导致数据丢失无法恢复而尴尬,Akka持久化可以帮助我们恢复actor。
持久化的什么?
持久化的是actor内部状态的变化,并且这些变化只是附加到原有的存储上。
actor是如何进行恢复的呢?
我们可以通过将保存的变化进行重放,从而使它们可以重建其内部状态。当重放的内容庞大时会需要很多时间,因此Akka提供了快照功能将重放记录分解从而减少恢复时间。
另外一个重点是Akka持久化也提供了“至少一次消息传递语义”的点对点通信来保证消息不丢失。
依赖
使用Akka持久化,要在你的项目中添加以下依赖:
libraryDependencies += "com.typesafe.akka" %% "akka-persistence" % "2.5.11"
Akka持久化还提供了一些内置的持久化插件,包括基于内存堆的日志、基于本地文件系统的快照存储以及基于LevelDB的日志。
基于LevelDB的插件需要以下额外的依赖:
libraryDependencies += "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"
体系结构
1. PersistentActor(持久化actor):是一种特殊的带有内部状态的Actor,既可以执行命令又能以事件来源模式来进行内部状态持久化的。若因系统崩溃、人为终结等,系统在重启后Actor通过之前持久化的信息可以恢复之前的状态。
2. AtLeastOnceDelivery(至少一次传递):消息传递的机制,意味着每条应用了这种机制的消息潜在的存在多次投递尝试并保证至少会成功一次。就是说这条消息可能会重复但是不会丢失。
3. AsyncWriteJournal(异步存储日志): 将发送给持久化Actor的消息序列异步存储到日志中。日志为每条消息维护一个不断增加的序列号。日志存储的底层实现是可插拔的。Akka的持久化扩展自带一个叫做"leveldb",向本地文件系统写入的日志插件。Akka社区里还有更多日志存储插件提供。
4. Snapshot store(快照存储):对持久化Actor或持久化视图的内部状态的快照进行持久化。快照用于优化回复的时间。快照存储的底层是可插拔的。Akka持久化扩展自带一个向本地文件系统写入的“本地”快照存储插件。Akka社区里还有更多快照存储插件提供。
5. Event sourcing(事件来源):把使状态产生变化的事件按发生时间顺序持久化,而不是把当前整个状态存储起来。在恢复状态时把日志中这些事件按原来的时间顺序重演一遍回到原来的状态。
下面我们根据持久化的体系结构来详细地介绍
事件来源Event sourcing
事件来源背后的基本思想其实很简单,当一个持久化actor接收到一个(非持久化)命令,首先它要验证(这个命令)是否可以运用到当前状态。
如果命令验证成功,根据这个命令产生一个事件。在事件成功持久化之后,可以用来改变actor的状态。
当持久化actor需要恢复时,因为之前已经验证过可以运用到当前状态,我们可以直接将持久化的事件进行重放。
文章推荐:Events As First-Class Citizens
Akka持久化通过PersistentActor支持事件来源。actor可以使用persist方法持久化和处理事件。通过实现receiveRecover和receiveCommand定义PersistentActor的行为。下面的示例演示了这一点。
本例子中定义了Cmd和Evt两种数据类型,Cmd代表命令,Evt代表事件
receiveRecover方法是在恢复过程中处理事件和快照消息。
receiveCommand方法用来处理普通Actor的消息。在上面的示例中,如果actor收到的是命令的话会调用persist方法
persist方法是异步的方式持久化事件。它有两个参数 ,一个是事件 ,另一个是事件处理程序(event handler)。
事件处理程序是将之前持久化过的事件进行处理,该事件在内部作为独立消息发送回持久化actor 来使事件处理程序执行,来改变或关闭持久化actor的状态。持久化事件的发送者也是相应命令的发送者,因此当命令的发送者没显出时事件处理程序也可以回复。
当使用persist方法来持久化事件时在调用和执行相关事件处理程序的过程中,要保证持久化actor不会收到下一步的命令,否则会受到影响。当在某个命令的上下文中多次调用persist方法时。这个过程中收到的消息一直被暂存直到presist方法运行结束。
如果实例化事件失败,onPersistFailure方法将被调用(默认记录为error),并且actor将无条件地被停止。如果持久化事件在存储之前被拒绝,比如事件发生连续错误,onPersistRejected将被调用(默认记录为warning)并且actor继续下一条消息
运行该示例最简单的方法是下载Typesafe Activator,并打开Akka Persistence Samples with Scala这个教程。它包含如何运行PersistentActorExample的说明。这个例子的源代码可以在 Akka Samples Repository 中找到。
标识符
一个持久化actor必须有个标识符 这个标识符必须用persistenceId 方法来定义。
override def persistenceId = "my-stable-persistence-id"
恢复
PersistentActor在启动和重启时通过重放之前持久化的日志消息来实现自动恢复,如果在恢复过程中收到新的消息,会将新消息先存储起来等恢复完成后,在收到新的消息。
可以限制同一时间并发的恢复的数量,来限制系统和后端的数据存储不超载,如果超过限制actor将等待到其他恢复都完成后才开始。配置方式:
akka.persistence.max-concurrent-recoveries = 50
自定义恢复
在应用程序中有时也需要依照客户具体要求来恢复,通过返回recovery 方法中的自定义Recovery对象来执行自定义恢复。recovery 是PersistentActor的一个方法。
你可以使用SnapshotSelectionCriteria.None. 来跳过加载快照和重放所有事件。它用于将快照序列化格式变成互不相容的方式时。不适宜用于事件被删除的情况下。
override def recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria.None)
另一个可能的自定义恢复是设置重放的上界,对debug很有帮助,使得actor仅在过去的某个点重放。
override def recovery = Recovery(toSequenceNr = 457L)
在PersistentActor的recovery 方法中返回Recovery.none()可以使恢复失效。
override def recovery = Recovery.none
恢复状态
持久化actor可以通过以下方法查询它自己的恢复状态
def recoveryRunning: Boolean
def recoveryFinished: Boolean
持久化actor在回复完成后会收到一个特殊的RecoveryCompleted 消息。然后再执行下一步操作
如果actor从日志中的恢复状态有问题,onRecoveryFailure 会被调用(记录为error)并且actor将被停止。
内部暂存(stash)
持久化actor有一个私有的暂存用来缓存整个恢复过程中进来的消息或者暂存persist\persistAll方法持久化的事件。内部暂存通过挂钩到unstashAll 与普通暂存协作
你应该控制消息的产出不要超过持久化actor的处理能力,否则暂存消息的数量将无限增长。所以我们要在mailbox配置中定义暂存的容量来保护暂存并防止发生OutOfMemoryError
akka.actor.default-mailbox.stash-capacity=10000
注意,如果你有很多持久化actor,要定义一个小的暂存容量,防止占用过多的内存
持久化actor定义了三个策略来处理内部暂存容量超出的故障。默认的溢出策略是ThrowOverflowExceptionStrategy,具体内容是丢弃当前的信息,抛出StashOverflowException异常,造成actor重启。
你可以覆盖internalStashOverflowStrategy 方法为了“独特的”持久化actor来返回DiscardToDeadLetterStrategy 或者ReplyToStrategy 或者通过提供FQCN(Fully Qualified Class Name完全限定类名)来给所有的持久化actor来定义“默认值”。
在persistence 的配置中:
akka.persistence.internal-stash-overflow-strategy="akka.persistence.ThrowExceptionConfigurator"
DiscardToDeadLetterStrategy 策略也有一个打包好的配akka.persistence.DiscardConfigurator.
你也可以查询默认策略:
Persistence(context.system).defaultInternalStashOverflowStrategy
放宽的局部一致性要求和高吞吐量的用例
如果面临放宽的局部一致性要求和高吞吐量,有时PersistentActor及其persist在处理大量涌入的命令时可能会不够,有时你可能会放宽一致性要求——例如你会想要尽可能快速地处理命令,假设事件最终会持久化并在后台恰当处理,并在需要时追溯性地回应持久性故障。
persistAsync方法提供了一个工具,用于实现高吞吐量的持久化actor。当日志仍在致力于持久化和执行用户事件回调代码时,它不会暂存传入的命令。
推迟操作,直到持久化处理程序已经执行
PersistentActor 提供了一个实用的方法deferAsync(延迟异步),它工作起来类似于persistAsync但是不持久化传递过的事件,它将保留在内存中,并在调用处理程序时使用。建议将其用于读取操作,以及在domain模型中没有相应事件的操作。
请注意,sender()在处理程序回调中是安全的,将指向该命令的原始发送方,该命令将调用这个deferAsync处理程序。
持久化嵌套调用
可以在各自的回调块中调用persistAsync 和persist,它们将适当地保留线程安全性(包括sender的正确值)和存储保证。
在下面的示例中,将发出两个持久调用,每个调用都在其回调中发出另一个持久化调用
向PersistentActor发送a和b命令,执行顺序:
首先“outer层”的持久化调用先被发出并且它们的回调被应用。一旦这个事件在日志(journal)中被确认持久化,inner的回调将被调用。只有这些所有的处理程序成功调用,下一个命令才会被传递给持久化actor。换句话说,初始调用在外部层中的persist()所保证的传入命令的存储(stash)被扩展,直到所有嵌套的持久化回调都被处理。
故障
如果事件的持久化失败,将会调用onPersistFailure(在默认情况下记录错误),并且将无条件地停止actor。
因为日志是不可用的,所以当持久化失败重启是很困难的,最好是停止该角色,在退出超时之后重新启动它。akka.pattern.BackoffSupervisoractor支持这重启
如果事件的持久化在存储之前被拒绝,将会调用onpersistreject(在默认情况下记录一个警告),并且actor 将继续使用下一个消息
如果在启动actor时从日志中恢复actor 的状态有问题,就会调用onRecoveryFailure(在默认情况下记录错误),并停止该actor。
Atomic 写入
通过使用persistAll或persistAllAsync方法来原子地存储多个事件。这意味着传递给该方法的所有事件都被存储,如果出现错误,则不会存储它们。
使用persistAll来只持久化事件的子集,因此一个持久化actor的恢复将永远不会被部分地完成。
一些日志在各自的事件中不支持原子写入而且它们反对使用persistAll命令。也就是说onPersistRejected是调用一个异常(类似 UnsupportedOperationException).
Batch 写入
为了在使用persistAsync时优化吞吐量,在将事件写入日志之前,一个持久的参与者内部批量事件将被存储在高负载下。批量大小是由日志往返期间发出的事件数量动态决定的。在将一个批次发送到日志后,在确认收到之前的批次之前,不能再发送批次。批处理写入从来都不是基于时间的,这使得延迟至少是最小的。
消息删除
可以将所有消息(由单个持久actor记录的日志)删除到指定的序列号。持久化actor可以调用deleteMessages方法
deletemessage请求的结果向持久化actor发出信号,如果删除成功,则发出DeleteMessagesSuccess 如果发送失败则发出DeleteMessagesFailure
即使所有的消息在deleteMessages调用后被删除,消息删除也不会影响日志的最高序列号
持久化状态处理
各个方法成功后的状态在PersistentActor中重写的故障处理程序的回调是显式。这些处理程序的默认实现发出一个日志消息(persist的error/恢复故障/其他warning)并记录失败的原因和导致失败的消息。
对于决定性的故障(例如恢复或持久化事件失败),在故障处理程序调用之后持久化actor将被停止。这是因为如果底层的日志发送持久化故障信号,它很可能要么完全失败,要么超负荷,重新启动,并试图再次坚持这个事件,这不会帮助日志恢复,而会引起 Thundering herd problem, 因为许多持久化actor将重新启动并尝试再次持久化他们的事件。
使用BackoffSupervisor (在 Failures中描述)它实现了一个指数回退策略,该策略允许日志在持久化actor的重新启动之间恢复更多的喘息空间。
安全地关闭持久化actor
对于普通的actor来说,可以接受使用特殊的PoisonPill 消息来向一个actor发出信号,当它接收到这个信息时,它应该停止自己的动作,且actor自己无法阻止。
注意:在使用PersistentActor时,(在调用持久化处理程序之前)actor在它处理其他需要放入暂存的消息之前,可能收到或处理PoisonPill ,造成actor提前关闭。
重放过滤器
当多个写入者(即多个持久化actor实例)用相同的序列号来记录不同的消息,可能会有事件流被破坏。在这种情况下,您可以配置如何在恢复时过滤来自多个写入者的重播消息。
在你的配置中akka.persistence.journal.xxx下。replay-filter部分(xxx是您的日志插件id),您可以从以下值中选择回放过滤器模式:
repair-by-discard-old
fail
warn
off
快照
当你使用actor时,你可能要注意有些actor可能会积累非常长的事件日志,并经历较长的恢复时间。正确的方法是分裂成一组较短的actor来大幅度减少恢复时间。
持久actor可以通过调用saveSnapshot方法来保存内部状态的快照。如果保存快照成功,持久化actor将接收到SaveSnapshotSuccess消息,否则是SaveSnapshotFailure消息
如果没有指定,他们默认为SnapshotSelectionCriteria.Latest (最新的快照)。若要禁用基于快照的恢复,应用程序应使用SnapshotSelectionCriteria.None。如果已保存的快照没有匹配指定的SnapshotSelectionCriteria,恢复时将重播所有日志消息。
快照删除
持久化actor可以通过调用deleteSnapshot方法来删除单个快照,该方法使用快照的时间戳。
如果大量删除一个范围内的与SnapshotSelectionCriteria匹配的快照,持久化actor可以使用deleteSnapshots
快照状态处理
保存和删除快照也可以有成功或失败,此信息通过如下表所示的状态消息反馈给持久actor
如果失败的消息actor没有处理,每个传入的失败消息将记录一个默认的警告日志消息。在成功消息上没有执行默认操作,但是您可以自由地处理它们,例如,为了删除在内存中快照,或者在失败的情况下再次尝试保存快照。
至少一次传递
要发送至少一次发送语义到目的地的消息,您可以在发送端对您的PersistentActor混合AtLeastOnceDelivery特性。当它们在可配置超时内未被确认时,它负责重新发送消息。
发送actor的状态包括未被接收方确认的消息被发送的状态必须是持久的,因此它可以在方送的actor或jvm崩溃时幸存下来。AtLeastOnceDelivery 特性坚持传递消息的意图,并接收到确认信息。
deliver方法用于将消息发送到目的地。当目的地已回复一条确认消息,调用confirmDelivery方法。
传递和确认传递之间的关系
要将消息发送到目标路径,请在持久化发送消息的意图之后使用deliver 方法。
目的地的actor必须发送回一个确认消息,当发送方的actor收到确认消息 应该将消息传送成功这个事实持久化,然后调用confirmDelivery方法
如果持久化actor当前没有恢复,则deliver 方法将向目标参与者发送消息。当恢复时,消息将被缓冲,直到它们被确认使用confirmDelivery.。一旦恢复完成,在整个恢复过程中如果有未解决的消息没有被确认,在发送任何其他消息之前,持久参与者将重新发送这些消息。
传递要求一个deliveryIdToMessage 函数通过在消息中提供deliveryId ,因此deliver 和confirmDelivery 创建关系是可能的。deliveryId 必须往返于传递。在接收到消息后,目标actor将把相同的deliveryId包裹在确认消息中,并返回给发送方。发送方随后将使用它调用confirmDelivery方法来完成传递程序。
deliveryId是无间隙严格单调递增序列号。所有目标actor将使用相同的序列,即当发送到多个目标时会看到序列中的空白。所以不可能使用自定义的deliveryId。然而你可以在消息中发送自定义关联标识符到目的地。然后,您必须保留内部deliveryId(传递到deliveryIdToMessage函数)和自定义关联id之间的映射(传递到消息中)。您可以通过在Map(correlationId -> deliveryId)中存储这样的映射来实现这一点,在您的消息的接收方以您的自定义相关id进行回复之后,您可以从该映射中检索到要传递的deliveryId,并将其传递到confirmDelivery 方法中。
AtLeastOnceDelivery 特性有由未经证实的消息和一个序列号组成的一个状态,它并不存储这个状态本身。你必须持久化从你的PersistentActor调用deliver和confirmDelivery所对应的事件,从而可以在PersistentActor的恢复阶段调用相同的方法恢复状态。有时,这些事件可以从其他业务级别的事件中派生出来,有时您必须创建单独的事件。在恢复过程中deliver的调用不会发出消息,但如果没有匹配的confirmDelivery执行,它将稍后发送。
getDeliverySnapshot和setDeliverySnapshot提供支持快照功能。AtLeastOnceDeliverySnapshot包含完整的投递状态,包括未经确认的消息。如果你需要一个自定义的快照保存actor其他部分的状态,你还必须包括AtLeastOnceDeliverySnapshot,它使用protobuf序列化,即利用Akka的通用序列化机制。最简单的方法是将AtLeastOnceDeliverySnapshot中的字节作为blob包含在你自定义的快照中。
重新传递尝试之间的间隔是由redeliverInterval方法定义的,默认值是由akka.persistence.at-least-once-delivery.redeliver-interval来配置,可以在实现类中重写该方法来返回非默认值。
在每次重新发送爆发时将发送的消息的最大数量由redeliveryBurstLimit方法定义(爆发的频率是重发间隔的一半)。如果有很多未确认的消息(例如,如果目的地很长一段时间都没有),这有助于防止大量的消息同时发送。默认值可以被akka.persistence.at-least-once-delivery.redelivery-burst-limit来配置,可以在实现类中重写该方法来返回非默认值。
在进行n次尝试之后,有一个AtLeastOnceDelivery.UnconfirmedWarning消息发送到self,重新发送将仍然继续,但是你可以选择调用confirmDelivery 来取消重新发送。传递的数量尝试之前发出的警告是由warnAfterNumberOfUnconfirmedAttempts定义方法。默认值可以被akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts可以在实现类中重写该方法来返回非默认值
AtLeastOnceDelivery 特性将消息保存在内存中,直到确认其成功交付。actor能保留在内存中的未经确认的消息的最大数目限制是由maxUnconfirmedMessages方法定义的。如果超过了此限制deliver方法将不会接受更多的消息,它将抛出AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException异常。。可以用akka.persistence.at-least-once-delivery.max-unconfirmed-messages配置其默认值。可以用实现类重写该方法来返回非默认值。
事件适配器
在长期运行的项目中,使用事件源有时需要将数据模型完全从domain 模型中分离出来。
事件适配器在以下情况提供帮助:
Version Migrations版本迁移:
存在事件存储在版本1应该被“上抛”到版本2,这样的过程需要实际代码来实现,不只是序列化层的变化。在这些场景下,toJournal 函数通常是特性函数,然而v1.Event=>v2.Event被formJournal实现,在fromJournal 方法中执行必要的映射。这种技术有时被称为其他CQRS库中的“向上转换”。
Separating Domain and Data models 分离Domain 和Data模型
由于事件适配器,可以将domain模型与用于在日志中持久存储数据的模型完全分离,例如可能希望在domain模型中使用case类,但是,将它们的协议缓冲区(或任何其他二进制序列化格式)保留到日志中。一个简单的toJournal:MyModel=>MyDataModel 和 fromJournal:MyDataModel=>MyModel适配器可以用来实现
Journal Specialized Data Types 日志专门的数据类型:
公开底层日志中已知的数据类型,例如数据用JSON来存储可以写一个事件适配器toJournal:Any=>JSON。日志可以直接存储json,而不是将对象序列化为二进制表示。
可以将多个适配器绑定到一个类以进行恢复,在这种情况下,所有绑定适配器的fromJournal方法将应用于给定的匹配事件(按照配置中的定义顺序)。由于每个适配器可能从0返回到n个适应事件(称为EventSeq)每个适配器都可以对事件进行调查,如果它确实需要调整,它将返回相应的事件(s)。在此修改过程中没有任何可贡献的其他适配器只返回EventSeq.empty。在重放过程中,改编事件按次序传递给PersistentActor 。
注意
对于更高级的模式演化技术参考Persistence - Schema Evolution文档
持久化有限状态机(FSM)
PersistentFSM 以FSM的方式处理传入消息。它的内部状态是作为一系列变化被持久化的,后来被称为domain事件。传入消息、FSM状态和转换之间的关系,domain事件的持久性是由DSL定义的。
一个简单的例子
为了演示PersistentFSM特性的特性,请考虑一个代表Web store客户的actor。我们的“WebStoreCustomerFSMActor”的合同是
AddItem 在客户向购物车中添加商品时发送。
Buy 当客户完成购买时。
Leave当顾客离开商店时没有购买任何东西。
GetCurrentCart 允许查询客户购物车的当前状态。
顾客的几种状态:
LookingAround客户正在浏览网站,但没有添加任何东西到购物车
Shopping 客户最近将商品添加到购物车中。
Inactive 顾客购物车里有商品,但最近没有添加任何东西。
Paid 顾客已经购买了商品。
客户的操作被“记录”为连续的“domain事件”序列。为了恢复最新客户的状态,这些事件在一个actor的开始时重播。
使用snapshot-after进行定期快照
如果你把reference.conf中的下列flag改编,你能够周期性的调用PersistentFSM 中的saveStateSnapshot()
akka.persistence.fsm.snapshot-after = 1000
这意味着在序列号达到1000的倍数后调用saveStateSnapshot()。
存储插件
在Akka持久性扩展中,用于日志和快照存储的存储备份是可插入的。在Akka社区项目页面上有一个持久性日志和快照存储插件的目录,请参阅社区插件。
当一个持久化actor定义自己的一组插件时,插件可以被“默认”选择为所有持久化actor,或者“单独”
当一个持久的actor不重写journalPluginId和snapshotPluginId方法时,持久性扩展将使用在reference.conf中配置的“默认”日志和快照存储插件。
但是,这些条目是空的””,并且需要在用户application.conf中通过覆盖来显式的用户配置。一个日志插件的例子,把消息写入LevelDB ,详情请查看看 Local LevelDB journal.对于快照存储插件的示例,该插件将快照写入到本地文件系统的单个文件中,详情请查看看 Local snapshot store.
应用程序可以通过实现插件API并通过配置激活它们来提供自己的插件。插件开发需要以下导入:
急切地初始化持久化插件
默认情况下,持久化插件根据自己的用处灵活的启动。急切地启动某个插件可能是有益的。为了做到这些,你应该先添加akka.persistence.Persistence在akka.extensions关键字下。在akka.persistence.journal.auto-start-journals和 akka.persistence.snapshot-store.auto-start-snapshot-stores下指定您希望自动启动的插件的id。例如,如果您希望对leveldb日志插件和本地快照存储插件进行急切的初始化,您的配置应该如下所示:
日志插件API
日志插件继承AsyncWriteJournal
AsyncWriteJournal 是一个actor,实现方法
如果存储后端API仅支持同步,阻塞写入,方法实现如下,
日志插件还必须实现在AsyncRecovery中定义的用于重放和序列号恢复的方法,实现方法
日志插件实例是一个actor,因此与persist actor的请求相对应的方法将按顺序执行。
它可以委托给异步库,或者委托给其他参与者来实现并行。
日志插件类必须有一个带有这些签名的构造函数。
· 一个com.typesafe.config.Config参数和一个配置路径的String参数
· 一个com.typesafe.config.Config参数
· 没有参数的构造函数
actor系统配置的插件部分将在配置构造函数参数中传递。插件配置路径在String参数中传递。
plugin-dispatcher是插件actor的分配器,如果没有指定,则默认为akka.persistence.dispatchers.default-plugin-dispatcher.
不要在系统默认调度器上运行日志任务,因为这可能会导致其他任务的匮乏。
快照存储插件API
一个快照存储插件必须扩展SnapshotStore actor并实现以下方法:一个快照存储插件必须扩展SnapshotStore actor并实现以下方法:实现方法
快照存储实例是一个actor,因此与persist actor的请求相对应的方法将按顺序执行。
它可以委托给异步库,或者委托给其他参与者来实现并行。
快照存储插件类须有一个带有这些签名的构造函数。
1.一个com.typesafe.config.Config参数和一个配置路径的String参数
2.一个com.typesafe.config.Config参数
3.没有参数的构造函数
actor系统配置的插件部分将在配置构造函数参数中传递。插件配置路径在String参数中传递。
plugin-dispatcher是插件actor的分配器,如果没有指定,则默认为akka.persistence.dispatchers.default-plugin-dispatcher.
不要在系统默认调度器上运行快照存储任务,因为这可能会导致其他任务的匮乏。
TCK插件
为了帮助开发人员构建正确和高质量的存储插件。我们提供了Technology Compatibility Kit(技术兼容性工具包)
TCK可以从Java和Scala项目中使用。要测试您的实现(独立于语言),您需要包括akka-persistence-tck依赖:
"com.typesafe.akka" %% "akka-persistence-tck" % "2.5.11" % "test"
要在测试套件中包含TCK测试,只需扩展所提供的JournalSpe
请注意,有些测试是可选的,并且通过覆盖support…方法给TCK提供运行测试所需的信息。你可以使用布尔值或提供的CapabilityFlag.on / CapabilityFlag.off 值来实现这些方法
我们也提供了一个简单的基准类JournalPerfSpec它包含了JournalSpec所拥有的所有测试,在打印性能统计数据时,还可以在日志上执行一些较长的操作。虽然它不是为了提供一个合适的基准测试环境,但它可以用来对您的日志在最典型的情况下的性能有一个粗略的感觉。
为了将SnapshotStore TCK测试包含在测试套件中,只需扩展SnapshotStoreSpec:
如果你的插件需要一些设置情况下(启动一个模拟数据库,删除临时文件等等),你可以重写beforeAll和afterAll方法,并在测试的生命周期中加入:
我们强烈建议在你的测试套件包括这些规格,因为它们涵盖了您可能已经忘记在从头编写插件时要测试的范围广泛的案例。
预先包装好的插件
本地levelDB日志
levelDB日志插件的配置入口:akka.persistence.journal.leveldb它将消息写入一个本地的LevelDB实例。通过定义配置属性启用此插件:
基于LevelDB的插件还需要以下附加的依赖声明:
"org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"
LevelDB文件的默认位置是当前工作目录中一个名为journal的目录。此位置可以由配置中指定的相对或绝对的路径更改:
akka.persistence.journal.leveldb.dir = "target/journal"
用这个插件,每个actor系统可运行其自己私有的LevelDB实例。
LevelDB的一个特性是,删除操作不会移除日志中的消息,而是为每个删除的消息添加一个“墓碑”。在大量使用日志的情况下,特别是频繁的删除,这可能是一个问题,因为用户可能发现自己在处理不断增加的日志大小。为此,LevelDB提供了一个特殊的journal compaction函数,它通过以下配置公开。
共享LevelDB日志
一个LevelDB实例还可以由多个actor系统(在相同或不同节点上)共享。它,例如,允许持久化actor进行故障转移到备份节点,并从备份节点继续使用共享的日志实例。
通过实例化SharedLeveldbStore actor可以启动一个共享的LevelDB实例。
默认情况下,共享的实例将日志消息写入到当前的工作目录中一个名为journal的本地目录。可以通过配置更改存储位置:
akka.persistence.journal.leveldb-shared.store.dir = "target/shared"
使用共享的LevelDB存储的actor系统必须激活akka.persistence.journal.leveldb-shared插件。
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
这个插件必须通过注入(远程)SharedLeveldbStore actor引用来初始化。注入是通过以actor引用作为参数调用SharedLeveldbJournal.setStore方法完成的。
内部日志命令(由持久化actor发送的)会被缓冲直到注入完成。注入是幂等的,即只有第一次的注入被使用。
本地快照存储区
本地快照存储插件配置条目是akka.persistence.snapshot-store.local 它将快照文件写入到本地文件系统中。通过定义配置属性来启用这个插件。
默认的存储位置是当前工作目录中一个名为snapshots 的目录。这可以通过配置中指定的相对或绝对的路径来更改
akka.persistence.snapshot-store.local.dir = "target/snapshots"
默认的存储位置是当前工作目录中一个名为snapshots 的目录。这可以通过配置中指定的相对或绝对的路径来更改
持久性插件代理
持久性插件代理允许共享日志和快照存储在多个actor系统(在相同或不同的节点)。
例如,这允许持久化actor将故障转移到备份节点,并继续使用备份节点上的共享日志实例。代理的工作方式是将所有日志/快照存储消息转发到一个单独的、共享的持久性插件实例,因此支持由代理插件支持的任何用例。
日志和快照存储代理分别通过akka.persistence.journal.proxy和akka.persistence.snapshot-store.proxy配置来控制。
通过target-journal-plugin或target-snapshot-store-plugin来设置你想优先使用的插件(例如akka.persistence.journal.leveldb)。start-target-journal和 start-target-snapshot-store关键字应该在一个actor系统中设置成on——这是一个将实例化共享持久性插件的系统。
接下来,需要告诉代理如何找到共享插件。这个可以通过设置 target-journal-address 和 target-snapshot-store-address 配置关键字来完成或者以编程方式调用PersistencePluginProxy.setTargetLocation方法
自定义序列化
快照序列化和Persistent消息的有效载荷是可以通过Akka序列化基础架构配置的。例如,如果应用程序想要序列化
·有效载荷的MyPayload类型与自定义的MyPayloadSerializer
·快照的类型MySnapshot与自定义的MySnapshotSerializer
在应用程序配置中。如果未指定,则使用默认的序列化程序。
对于更高级的模式演化技术,请参考 Persistence - Schema Evolution文档
测试
运行测试时使用sbt的LevelDB默认设置,请确保在你的sbt项目中设置fork := true,否则你将看到一个UnsatisfiedLinkError。或者,你可以切换到一个LevelDB Java 端口,通过这样的设置
akka.persistence.journal.leveldb.native = off
或
akka.persistence.journal.leveldb-shared.store.native = off
在Akka配置中。LevelDB 的Java端口仅用于测试目的。
还要注意,对于LevelDB Java端口,您需要以下依赖项:
"org.iq80.leveldb" % "leveldb" % "0.9" % "test"
配置
持久化模块有几个配置属性,请参考 reference configuration.
多个持久性插件配置
默认情况下,持久化actor将使用“默认”的日志和快照存储插件,这些插件配置在reference.conf配置资源中,
当持久参与者重写了journalPluginId和snapshotPluginId方法时,该actor将由这些特定的持久性插件而不是默认值服务:
请注意,journalPluginId和snapshotPluginId必须引用正确配置的引用。带有标准类属性的conf插件条目以及特定于这些插件的设置,也就是:
网友评论