在前一片文章中我们介绍了Hazelcast中的主题,但是由于没有数据的备份,主题中的事件可能丢失,为了提高数据可靠性Hazelcast提供了可靠主题。可靠主题也使用ITopic
接口,为了保证数据的可靠性,可靠主题使用Ringbuffer
数据结构备份主题的事件。和普通主题相比,可靠主题有以下优势:
- 主题中的事件不会丢失。
Ringbuffer
默认有一个同步备份。 - 每个可靠主题使用独立的
Ringbuffer
,各个主题之间互不影响。 - 在脑裂环境中可靠主题无法工作。
下面是使用可靠主题发布事件的代码样例:
import com.hazelcast.core.Hazelcast
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val instance = Hazelcast.newHazelcastInstance()
val topic = instance.getReliableTopic<Long>("reliable")
var messageId = 1L
while (true) {
topic.publish(messageId)
messageId++
kotlinx.coroutines.delay(200L)
}
}
一个可靠主题的订阅者:
import com.hazelcast.core.Hazelcast
import kotlinx.coroutines.runBlocking
fun main() = runBlocking<Unit> {
val instance = Hazelcast.newHazelcastInstance();
val topic = instance.getReliableTopic<Long>("reliable")
topic.addMessageListener {
println(it.messageObject)
}
}
在创建一个可靠主题时,Hazelcast会为主题自动创建一个Ringbuffer。Ringbuffer的名字和可靠主题的名字相同,可以通过添加一个Ringbuffer配置对可靠主题对应的Ringbuffer进行配置。对于Ringbuffer可以配置容量、主题消息的TTL,设置可以配置持久存储。下面是一个对名字为“reliable”可靠主题使用的Ringbuffer配置的样例:
<hazelcast>
...
<ringbuffer name="reliable">
<capacity>1000</capacity>
<time-to-live-seconds>5</time-to-live-seconds>
</ringbuffer>
<reliable-topic name="reliable">
<topic-overload-policy>BLOCK</topic-overload-policy>
</reliable-topic>
...
</hazelcast>
默认,可靠主题使用一个共享的线城池,如果想得到更好的隔离,可以通过ReliableTopicConfig
配置。Ringbuffer读是非破坏性的,因此实现批量读更加简单,ITopic
每次最多读取10条数据。
缓慢的消费者
可靠主题提供了对消费速度慢的消费的控制和管理方法。因为不知道速度慢的消费者何时能赶上,因此将对应的事件无限期保存在内存中是不明智的。可以通过Ringbuffer的容量来对内存中的数量进行限制,当Ringbuffer存储的数据量超过容量限制,可以选择下面的四种策略进行处理:
-
DISCARD_OLDEST
: 丢弃老数据,即使设置了TTL。 -
DISCARD_NEWEST
: 丢弃新数据。 -
BLOCK
: 阻塞直到有数据过期。 -
ERROR
: 立即抛出TopicOverloadException
异常。
配置可靠主题
声明式配置:
<hazelcast>
...
<reliable-topic name="default">
<statistics-enabled>true</statistics-enabled>
<message-listeners>
<message-listener>
...
</message-listener>
</message-listeners>
<read-batch-size>10</read-batch-size>
<topic-overload-policy>BLOCK</topic-overload-policy>
</reliable-topic>
...
</hazelcast>
可靠主题的配置参数有以下四项:
-
statistics-enabled
: 默认值true
,是否开启统计。 -
message-listener
: 事件监听器。 -
read-batch-size
: 批量读大小,默认10. -
topic-overload-policy
: 事件超过容量限制时的处理策略。
网友评论