美文网首页个人学习
【Hazelcast系列十一】分布式事件

【Hazelcast系列十一】分布式事件

作者: 大哥你先走 | 来源:发表于2020-02-17 22:03 被阅读0次

分布式事件

如果想感知某些事件,可以向Hazelcast实例注册监听器。监听器在集群范围内有效,向集群内的一个成员注册监听器,实际上是向集群内所有的成员注册了监听器。新加入成员产生的事件也会被发送给监听器。Hazelcast只生成注册了监听器的事件。如果没有监听器,也就不会有事件产生,为什么要去做没人关心的事情呢?如果在注册监听器时提供了谓词,事件在发送给监听器时首先要通过谓词,只有通过谓词的事件才能最终发送给监听器。

根据经验,不宜在监听器内实现过重的处理逻辑(会阻塞线程影响事件的处理)。如若真的需要复杂的处理逻辑,可以使用ExecutorService 异步执行。

注意: 事件并不是高可用的,在故障场景有丢失的风险。但是有一些变通的手段,比如配置容量等可以用来降低事件丢失的可能性。

Hazelcast 提供以下事件监听器。

集群事件:

  • Membership Listener
  • Distributed Object Listener
  • Migration Listener
  • Partition Lost Listener
  • Lifecycle Listener
  • Client Listener

分布式对象事件:

  • Entry Listener
  • Item Listener
  • Message Listener

JCache :

  • Cache Entry Listener
  • ICache Partition Lost Listener

Hazelcast 客户端:

  • Lifecycle Listener
  • Membership Listener
  • Distributed Object Listener

8.1. 集群事件

8.1.1. 监听成员事件

下面的事件会触发成员关系监听器的接口方法被调用:

  • memberAdded
  • memberRemoved
  • memberAttributeChanged

实现一个成员关系监听器需要实现 MembershipListener 接口。

class ClusterMembershipListener : MembershipListener {
    override fun memberRemoved(event: MembershipEvent?) {
        println(event?.member.toString())
    }

    override fun memberAdded(event: MembershipEvent?) {
        println(event?.member.toString())
    }

    override fun memberAttributeChanged(event: MemberAttributeEvent?) {
        println(event?.member.toString())
    }

}
注册监听器

实现监听器类后可以使用 addMembershipListener 方法向Hazelcast注册监听器:

val instance = Hazelcast.newHazelcastInstance();
instance.cluster.addMembershipListener(ClusterMembershipListener())

使用上面的方法配置监听器有一点不足:创建实例和注册监听之间的事件可能丢失。为了解决这个问题,Hazelcast支持在配置中配置监听器。

代码配置:

val config = Config()
config.addListenerConfig(ListenerConfig("io.github.ctlove0523.hazelcast.data.ClusterMembershipListener"))
val instance = Hazelcast.newHazelcastInstance(config)

声明式配置:

<hazelcast>
    ...
    <listeners>
        <listener>
            io.github.ctlove0523.hazelcast.data.ClusterMembershipListener
        </listener>
    </listeners>
    ...
</hazelcast>

Spring配置:

<hz:listeners>
    <hz:listener class-name="io.github.ctlove0523.hazelcast.data.ClusterMembershipListener"/>
    <hz:listener implementation="MembershipListener"/>
</hz:listeners>

8.1.2. 监听分布式对象事件

当集群内一个分布式对象创建和销毁时,分布式对象监听器的 distributedObjectCreateddistributedObjectDestroyed 方法会被调用。实现一个分布式对象监听器需要实现DistributedObjectListener接口。

class SimpleListener : DistributedObjectListener {
    override fun distributedObjectCreated(event: DistributedObjectEvent?) {
        println("created ${event?.distributedObject?.toString()}")
    }

    override fun distributedObjectDestroyed(event: DistributedObjectEvent?) {
        println("destroy ${event?.distributedObject?.toString()}")
    }

}
注册监听器

创建了监听器类后可以使用addDistributedObjectListener 方法向Hazelcast注册监听器:

val instance = Hazelcast.newHazelcastInstance()
instance.addDistributedObjectListener(SimpleListener())

使用上面的方法配置监听器有一点不足:创建实例和注册监听之间的事件可能丢失。为了解决这个问题,Hazelcast支持在配置中配置监听器。

代码配置:

val config = Config()
config.addListenerConfig(ListenerConfig("xx.SimpleListener"))
val instance = Hazelcast.newHazelcastInstance(config)

声明式配置:

<hazelcast>
    ...
    <listeners>
        <listener>
            xx.SimpleListener
        </listener>
    </listeners>
    ...
</hazelcast>

Spring配置:

<hz:listeners>
    <hz:listener class-name="xx.SimpleListener"/>
    <hz:listener implementation="DistributedObjectListener"/>
</hz:listeners>

8.1.3. 监听迁移事件

下列事件会触发迁移监听器接口方法:

  • migrationStarted
  • migrationFinished
  • replicaMigrationCompleted
  • replicaMigrationFailed

实现一个迁移监听器类需要实现 MigrationListener 接口:

class SimpleMigrationListener : MigrationListener {
    override fun migrationFailed(p0: MigrationEvent?) {
        println("migration failed ${p0?.status}")
    }

    override fun migrationStarted(p0: MigrationEvent?) {
        println("migration started ${p0?.status}")
    }

    override fun migrationCompleted(p0: MigrationEvent?) {
        println("migration completed ${p0?.status}")
    }

}
注册监听器

可以使用 addMigrationListener方法向Hazelcast注册监听器:

val instance = Hazelcast.newHazelcastInstance()
val partitionService = instance.partitionService
partitionService.addMigrationListener(SimpleMigrationListener())

使用上面的方法配置监听器有一点不足:创建实例和注册监听之间的事件可能丢失。为了解决这个问题,Hazelcast支持在配置中配置监听器。

代码配置:

val config = Config()
config.addListenerConfig(ListenerConfig("xx.SimpleMigrationListener"))
val instance = Hazelcast.newHazelcastInstance(config)

声明式配置:

<hazelcast>
    ...
    <listeners>
        <listener>
            xx.SimpleMigrationListener
        </listener>
    </listeners>
    ...
</hazelcast>

Spring配置:

<hz:listeners>
    <hz:listener class-name="xx.SimpleMigrationListener"/>
    <hz:listener implementation="MigrationListener"/>
</hz:listeners>

8.1.4. 监听分区丢失事件

Hazelcast通过数据多副本提供容错能力。每个分区都有唯一的拥有者,根据配置每个分区会有不同数量的副本拥有者。但是,如果一些成员同时宕机可能导致数据丢失。分区丢失侦听器通过分区丢失了多少个副本信息来通知可能发生的数据丢失。每个分区丢失都会创建分区丢失事件。在检测到一个成员崩溃并将其从群集中移除之后,将执行分区丢失检测。 请注意,可能会因网络分区错误而触发错误的PartitionLostEvent事件。

实现分区监听器

分区监听器需要实现PartitionLostListener 接口:

class SimplePartitionLostListener: PartitionLostListener {
    override fun partitionLost(p0: PartitionLostEvent?) {
        println("partition lost ${p0?.toString()}")
    }
}

集群产生PartitionLostEvent 事件时,分区丢失监听器将会输出分区ID,丢失的副本索引以及检测到分区丢失的集群成员。

partition lost com.hazelcast.partition.PartitionLostEvent{partitionId=202, lostBackupCount=0, eventSource=[192.168.2.105]:5701}
注册监听器

代码配置1:

val config = Config()
config.addListenerConfig(ListenerConfig("xx.SimplePartitionLostListener"))
val instance = Hazelcast.newHazelcastInstance(config)

代码配置2:

val instance = Hazelcast.newHazelcastInstance()
val partitionService = instance.partitionService
partitionService.addPartitionLostListener(SimplePartitionLostListener())

和上面代码配置等价的声明式配置;

<hazelcast>
    ...
    <listeners>
        <listener>
            xx.SimplePartitionLostListener
        </listener>
    </listeners>
    ...
</hazelcast>

8.1.5. 监听生命周期事件

生命周期监听器会收到以下事件的通知:

  • STARTING
  • STARTED
  • SHUTTING_DOWN
  • SHUTDOWN
  • MERGING
  • MERGED
  • CLIENT_CONNECTED
  • CLIENT_DISCONNECTED

实现生命周期监听器需要实现LifecycleListener 接口:

class SimpleLifecycleListener : LifecycleListener {
    override fun stateChanged(p0: LifecycleEvent?) {
        println("sate changed ${p0?.toString()}")
    }
}

生命周期监听器是本地的,只能通知应用所在成员上发生的事件。

注册监听器

使用 addLifecycleListener注册监听器:

val instance = Hazelcast.newHazelcastInstance()
instance.lifecycleService.addLifecycleListener(SimpleLifecycleListener())

使用上面的方法注册监听器可能会丢失Hazelcast实例创建和注册监听器之间的事件,更好的方式是使用配置的方式进行注册监听器。

代码配置1:

val config = Config()
config.addListenerConfig(ListenerConfig("xxx.SimpleLifecycleListener"))
val instance = Hazelcast.newHazelcastInstance(config)

声明式配置:

<hazelcast>
    ...
    <listeners>
        <listener>
            xxx.SimpleLifecycleListener
        </listener>
    </listeners>
    ...
</hazelcast>

Spring配置:

<hz:listeners>
    <hz:listener class-name="xxx.SimpleLifecycleListener"/>
    <hz:listener implementation="LifecycleListener"/>
</hz:listeners>

8.1.6. 监听客户端

集群成员可以使用客户端监听器来感知客户端接入集群和离开集群。客户端的连接和断连只会触发客户连接的成员收到事件,集群的其他成员无法感知。

一个简单的客户端监听器:

class SimpleClientListener : ClientListener {
    override fun clientDisconnected(p0: Client?) {
        println("disconnected ${p0?.toString()}")
    }

    override fun clientConnected(p0: Client?) {
        println("connected ${p0?.toString()}")
    }
}
注册监听器

代码配置1:

val instance = Hazelcast.newHazelcastInstance()
instance.clientService.addClientListener(SimpleClientListener())

声明式配置:

<hazelcast>
    ...
    <listeners>
        <listener>
            xxx.SimpleClientListener
        </listener>
    </listeners>
    ...
</hazelcast>

Spring配置:

<hz:listeners>
    <hz:listener class-name="xxx.SimpleClientListener"/>
    <hz:listener implementation="com.yourpackage.ExampleClientListener"/>
</hz:listeners>

8.2. 分布式对象事件

8.2.1. 监听Map事件

使用 MapListener 及其子接口可以监听map操作触发的事件。

捕获Map事件

如果一个类想捕获Map事件,需要实现目标事件对应的接口,比如entryAddedListenerMapClearedListener

class SimpleMapListener : EntryAddedListener<String, String> {
    override fun entryAdded(p0: EntryEvent<String, String>?) {
        println("entry added key = ${p0?.key},value = ${p0?.value}")
    }
}

fun main() = runBlocking<Unit> {
    val instance = Hazelcast.newHazelcastInstance()
    val map = instance.getMap<String, String>("map")
    map.addEntryListener(SimpleMapListener(), true)
}

创建另一个Hazelcast实例,并向map中添加几个数据严重监听器是否能够收到通知:

fun main() = runBlocking<Unit> {
    val instance = Hazelcast.newHazelcastInstance()
    val map = instance.getMap<String, String>("map")
    for (i in 1..3) {
        map["key$i"] = "value$i"
    }
}

监听器的输出:

entry added key = key1,value = value1
entry added key = key2,value = value2
entry added key = key3,value = value3

8.2.2. 监听Map 分区丢失

向Hazelcast注册一个MapPartitionLostListener监听器可以监听map分区丢失事件。

class SimpleMapPartitionLostListener : MapPartitionLostListener {
    override fun partitionLost(p0: MapPartitionLostEvent?) {
        println("partition lost ${p0?.toString()}")
    }
}
注册监听器
val instance = Hazelcast.newHazelcastInstance()
val map = instance.getMap<String, String>("map")
map.addPartitionLostListener(SimpleMapPartitionLostListener())

使用上面的方法注册监听器可能会丢失Hazelcast实例创建和注册监听器之间的事件,更好的方式是使用配置的方式进行注册监听器。

代码配置:

val config = Config()
config.getMapConfig("map").addMapPartitionLostListenerConfig(MapPartitionLostListenerConfig("xxx.SimpleMapPartitionLostListener"))

声明式配置:

<hazelcast>
    ...
    <map name="map">
        <entry-listeners>
            <entry-listener include-value="false" local="false">
                xxx.SimpleMapPartitionLostListener
            </entry-listener>
        </entry-listeners>
    </map>
    ...
</hazelcast>

Spring配置:

<hz:map name="map">
    <hz:entry-listeners>
        <hz:entry-listener include-value="true"
            class-name="xxx.SimpleMapPartitionLostListener"/>
        <hz:entry-listener implementation="dummyEntryListener" local="true"/>
    </hz:entry-listeners>
</hz:map>
Map 监听器属性
  • include-value ,事件是否包含value,默认true

  • local ,是否只监听本地事件,默认值 false

8.2.3. 监听 MultiMap 事件

如果要监听MultiMap 产生的事件,需要实现EntryListener接口。

class SimpleEntryListener : EntryListener<String, String> {
    override fun entryEvicted(p0: EntryEvent<String, String>?) {
        println("evicted ${p0?.toString()}")
    }

    override fun entryUpdated(p0: EntryEvent<String, String>?) {
        println("updated ${p0?.toString()}")
    }

    override fun mapCleared(p0: MapEvent?) {
        println("cleared ${p0?.toString()}")
    }

    override fun entryAdded(p0: EntryEvent<String, String>?) {
        println("added ${p0?.toString()}")
    }

    override fun entryRemoved(p0: EntryEvent<String, String>?) {
        println("removed ${p0?.toString()}")
    }

    override fun mapEvicted(p0: MapEvent?) {
        println("evicted ${p0?.toString()}")
    }

}
注册监听器

使用addEntryListener 方法注册:

val instance = Hazelcast.newHazelcastInstance()
val map = instance.getMultiMap<String, String>("map")
map.addEntryListener(SimpleEntryListener(), true)

使用上面的方法注册监听器可能会丢失Hazelcast实例创建和注册监听器之间的事件,更好的方式是使用配置的方式进行注册监听器。

代码配置:

val config = Config()
val multiMapConfig = config.getMultiMapConfig("map")
multiMapConfig.addEntryListenerConfig(EntryListenerConfig("xxx.SimpleEntryListener", false, false))
val instance = Hazelcast.newHazelcastInstance(config)

声名式配置:

<hazelcast>
    ...
    <multimap name="map">
        <value-collection-type>SET</value-collection-type>
        <entry-listeners>
            <entry-listener include-value="false" local="false">
                xxx.SimpleEntryListener
            </entry-listener>
        </entry-listeners>
    </multimap>
    ...
</hazelcast>

Spring 配置:

<hz:multimap name="map" value-collection-type="SET">
    <hz:entry-listeners>
        <hz:entry-listener include-value="false"
            class-name="xxx.SimpleEntryListener"/>
        <hz:entry-listener implementation="EntryListener" local="false"/>
    </hz:entry-listeners>
</hz:multimap>
MultiMap 监听器属性

和Map监听器属性一致。

8.2.4. 监听元素事件

IQueue, ISetIList 接口使用元素监听器。实现一个元素监听器需要实现ItemListener ,添加和删除元素会触发事件。

下面是一个简单的元素监听器

class SimpleItemListener : ItemListener<String> {
    override fun itemRemoved(p0: ItemEvent<String>?) {
        println("item removed ${p0?.toString()}")
    }

    override fun itemAdded(p0: ItemEvent<String>?) {
        println("item added ${p0?.toString()}")
    }
}
注册监听器

创建完监听器后,可以使用addItemListener 注册监听器:

val instance = Hazelcast.newHazelcastInstance()
val set = instance.getSet<String>("set")
set.addItemListener(SimpleItemListener(), true)

使用上面的方法注册监听器可能会丢失Hazelcast实例创建和注册监听器之间的事件,更好的方式是使用配置的方式进行注册监听器。

代码配置:

setConfig.addItemListenerConfig(
new ItemListenerConfig( "xxx.SimpleItemListener", true ) );

声明式配置:

<hazelcast>
    ...
    <set>
        <item-listeners>
            <item-listener include-value="true">
                xxx.SimpleItemListener
            </item-listener>
        </item-listeners>
    </set>
    ...
</hazelcast>

Spring配置:

<hz:set name="default" >
    <hz:item-listeners>
        <hz:item-listener include-value="true"
            class-name="xxx.SimpleItemListener"/>
    </hz:item-listeners>
</hz:set>
元素监听器属性
  • include-value:事件是否包含value,默认为true
  • local:是否只监听本地成员事件,默认值为false

8.2.5. 监听主题消息

ITopic 接口使用消息监听器用来在收到主题对应的消息时通知注册的监听器。实现一个消息监听器需要实现MessageListener

class SimpleMessageListener : MessageListener<String> {
    override fun onMessage(p0: Message<String>?) {
        println("get message ${p0?.toString()}")
    }
}
注册监听器

创建完监听器后,可以使用addMessageListener 向Hazelcast注册监听器:

val instance = Hazelcast.newHazelcastInstance()
val topic = instance.getTopic<String>("topic")
topic.addMessageListener(SimpleMessageListener())

使用上面的方法注册监听器可能会丢失Hazelcast实例创建和注册监听器之间的事件,更好的方式是使用配置的方式进行注册监听器。

代码配置:

val config = Config()
config.getTopicConfig("topic").addMessageListenerConfig(ListenerConfig("xxx.SimpleMessageListener"))

声明式配置:

<hazelcast>
    ...
    <topic name="default">
        <message-listeners>
            <message-listener>
                xxx.SimpleMessageListener
            </message-listener>
        </message-listeners>
    </topic>
    ...
</hazelcast>

Spring配置:

<hz:topic name="default">
    <hz:message-listeners>
        <hz:message-listener
            class-name="xxx.SimpleMessageListener"/>
    </hz:message-listeners>
</hz:topic>

8.3. 全局事件配置

  • hazelcast.event.queue.capacity: 默认值1000000
  • hazelcast.event.queue.timeout.millis: 默认值 250
  • hazelcast.event.thread.count: 默认值 5

集群成员中的executor控制并调度接收到的事件,同时也负责保证事件的有序性。对于Hazelcast中的所有事件,对于给定的key,可以保证事件生成的顺序和事件发布的顺序一致。对于map和multimap来说,对于同一个key的操作顺序可以保证。对于list,set,topic和queue,事件的顺序和操作的顺序一致。

如果事件队列达到容量限制(hazelcast.event.queue.capacity) ,最后一个事件无法在hazelcast.event.queue.timeout.millis内插入事件队列,这些事件将会被丢弃并发出一个警告信息“EventQueue overload”。

为了实现事件的有序性,StripedExecutor中的每一个线程负责处理事件的一部分。如果监听器执行的计算非常耗时,这有可能导致事件队列达到容量限制并丢失事件。对于map和multimap,可以将hazelcast.event.thread.count配置为更高的值,以减少键冲突,因此,工作线程在StripedExecutor中不会相互阻塞。对于list,set,topic和queue,必须把负载重的工作提交到其他线程中处理。为了保证事件的顺序,在其他线程中应该实现StripedExecutor中同样的逻辑。

相关文章

网友评论

    本文标题:【Hazelcast系列十一】分布式事件

    本文链接:https://www.haomeiwen.com/subject/ossvgftx.html