美文网首页
[Hazelcast系列九] 主题(Topic)

[Hazelcast系列九] 主题(Topic)

作者: 大哥你先走 | 来源:发表于2020-02-12 00:39 被阅读0次

    Hazelcast支持常用的publish/subscribe消息模型。发布和订阅都是集群级操作,一个member订阅主题,其实是订阅了集群内所有member发布到主题的消息,即使member是在订阅之后加入集群。

    发布/订阅主题

    发布消息:

    fun main() = runBlocking {
        val hz: HazelcastInstance = Hazelcast.newHazelcastInstance()
        val topic = hz.getTopic<String>("topic")
        topic.publish("hello")
    }
    

    订阅消息:

    fun main() = runBlocking<Unit> {
        val hz: HazelcastInstance = Hazelcast.newHazelcastInstance()
        val topic = hz.getTopic<String>("topic")
        topic.addMessageListener(MessageListener {
            println(it.messageObject)
        })
    }
    

    统计信息

    主题两个统计量:发布的消息数和接收的消息数(member启动之后),只反映本节点的统计信息,非全局统计信息。

    fun main() = runBlocking {
        val hz: HazelcastInstance = Hazelcast.newHazelcastInstance()
        val topic = hz.getTopic<String>("topic")
        topic.publish("hello hazelcast")
        topic.addMessageListener(MessageListener {
            println(it.messageObject)
        })
        println(topic.localTopicStats.publishOperationCount)
        println(topic.localTopicStats.receiveOperationCount)
    }
    

    注意:统计数据没有备份,如果member下线,统计数据会丢失。

    理解主题的几个行为

    集群的所有成员都有集群内订阅的列表。当一个新成员订阅一个主题时,该成员会向集群所有的成员发送该订阅消息,如果一个新成员加入集群,新成员也会收到目前集群内所有的订阅信息。主题的行为受参数 globalOrderEnabled的影响。

    消息消费的顺序和发布顺序一致

    如果 globalOrderEnabled 设置为false,消息不会被排序,监听器将会按照消息发布的顺序处理消息。下面举一个简单的例子进行说明。假设集群有三个成员分别是member1,member2和member3,其中member1和member2订阅了主题message,member1依次向主题message发布消息a1,a2,member3依次向主题message发布消息 c1,c2。member1和member2接收到的消息顺序可能是下面的顺序:

    member1c1, a1, a2, c2

    member2c1, c2, a1, a2

    只能保证每个member发布的消息按顺序处理,不能保证不同member消息之间的处理顺序。

    所有成员以相同的顺序处理消息

    如果globalOrderEnabled 设置为true,监听同一个主题的所有member得到的消息的顺序是一致。上面的例子member1和member2接收消息的顺序将是相同的。

    member1a1, c1, a2, c2

    member2a1, c1, a2, c2

    保证生产和发布消息的顺序一致

    在两种场景下,StripeExecutor 负责消息的分发和接收。集群所有的事件的生成顺序和发布顺序的一致都是由StripeExecutor保证。StripeExecutor中线程的数量由参数hazelcast.event.thread.count配置,默认线程数为5。事件源和线程的映射关系为:hash(source's name) % 5。多个事件源可能共享一个线程,因此为了不阻塞其他消息,监听器中不能有太重的处理逻辑,如果可以提交到其他线程异步处理。

    配置主题

    对于一个主题可以配置:名字,是否收集统计信息,全局有序性和消息监听器。其中有两项参数拥有默认值:

    • global-ordering = false,不保证全局有序。
    • statistics =true, 收集主题统计信息。

    下面是使用xml对主题进行配置的样例。

    <hazelcast>
        ...
        <topic name="topic">
            <global-ordering-enabled>true</global-ordering-enabled>
            <statistics-enabled>true</statistics-enabled>
            <message-listeners>
                <message-listener>MessageListenerImpl</message-listener>
            </message-listeners>
        </topic>
        ...
    </hazelcast>
    

    主题配置支持一下元素:

    • statistics-enabled: 是否收集统计信息,默认为true。
    • global-ordering-enabled: 是否保证全局有序,默认值false。
    • message-listeners: 主题监听器。

    除去上面的配置下面的系统熟悉也和主题相关,但是不是主题所特有的配置:

    • hazelcast.event.queue.capacity 默认值1,000,000
    • hazelcast.event.queue.timeout.millis 默认值 250
    • hazelcast.event.thread.count 默认值 5

    相关文章

      网友评论

          本文标题:[Hazelcast系列九] 主题(Topic)

          本文链接:https://www.haomeiwen.com/subject/datqfhtx.html