美文网首页数客联盟Kafka我爱编程
Kakfa沟通机制(APiKeys & Listener

Kakfa沟通机制(APiKeys & Listener

作者: WestC | 来源:发表于2018-02-27 10:36 被阅读86次

    Kafka不同进程“沟通机制”

    Kafka服务的定位是一种高吞吐量的分布式消息订阅系统。服务再运行过程中,不同的进程(broker和controller之间,客户端和集群)之间需要进行“沟通”来保证功能的可用性。

    Kafka主要通过两种方式进行“沟通”,以保证自身状态或需求被感知:

    通过nio方式完成“沟通”

    controller通过ControllerChannelManager发送消息到各broker,主要有ApiKeys.LEADER_AND_ISR, ApiKeys.STOP_REPLICA, ApiKeys.UPDATE_METADATA_KEY 三种消息。其中仅有ApiKeys.STOP_REPLICA有相关callback操作。broker通过KafkaRequestHandlerPool接收消息并调用kafkaApi进行消息相应。其他如producer发送消息,consumser消费消息,获取集群信息等操作,也是客户端通过nio的方式发送请求。Broker可处理的消息有如下:

    消息 备注
    ApiKeys.LEADER_AND_ISR Controller 发送给broker
    ApiKeys.STOP_REPLICA Controller 发送给broker
    ApiKeys.UPDATE_METADATA_KEY Controller 发送给broker
    ApiKeys.PRODUCE producer发送生产数据请求给broker
    ApiKeys.FETCH consumer发送消费数据请求给broker
    ApiKeys.LIST_OFFSETS 客户端/consumer 发送查询offset请求
    ApiKeys.METADATA 获取集群topic信息
    ApiKeys.CONTROLLED_SHUTDOWN_KEY brokershutdown时向controller发送消息
    ApiKeys.OFFSET_COMMIT consumer向coordinator发送的offset commit 请求
    ApiKeys.OFFSET_FETCH consumer发送的请求partitions的当前的offset
    ApiKeys.GROUP_COORDINATOR consumer向broker发送group的coordinator是哪个节点的请求
    ApiKeys.JOIN_GROUP consumer向groupcoordinator发送 join group的请求
    ApiKeys.HEARTBEAT consumer定期发送heartbeat请求到groupcoordinator
    ApiKeys.LEAVE_GROUP consumer停止消费时发送退出group请求到groupcoordinator
    ApiKeys.SYNC_GROUP consumer变为leader获取follower时发送的已同步请求到groupcoordinator
    ApiKeys.DESCRIBE_GROUPS 客户端工具kafka-consumer-groups.sh ConsumerGroupCommand中使用describe group信息所用
    ApiKeys.LIST_GROUPS 客户端工具kafka-consumer-groups.sh ConsumerGroupCommand中使用list group信息所用
    ApiKeys.SASL_HANDSHAKE nio的select发送消息/请求时,需要先发送sasl——handshake请求
    ApiKeys.API_VERSIONS 客户端发送的kafka版本请求

    通过zookeeper注册监听器完成“沟通”

    客户端(broker/client)如果需要将诉求(如topic创建,删除,reblance,partition reassign等)告知服务,所采取的方案是再ZooKeeper中创建目录,写入数据,服务端(此处指controller)通过listener监控到zk中的变化,并作出响应

    Zookeeper的watch机制与第三方封装

    • Zookeeper是分布式应用程序协调服务。不同的进程通过约定的目录的写入和监控操作完成消息的互相感知。开源大数据多数服务的高可用性实现都是依靠zookeeper的协调功能完成。zookeeper客户端通过watcher机制完成对zookeeper的集群的监控。

    • 由于zookeeper应用开发的复杂性,出现了第三方对zk的应用开发的封装。常用的有Curator FrameWork,I0Itec-zkclient。其中hive,hbase,spark等都是通过Curator FrameWork完成和zk的交互。Kafka则选用的是I0Itec-zkclient与zk交互。

    Kafka中实现listener的方法:

    • Kafka通过zkUtils中调用org.I0Itec.zkclient.zkCLient完成listener(listener中相关的handle方法有相关的处理逻辑)的注册。
    • zkClient继承Watcher,再监控的对象发生变化时,zkCLient中的process方法会被调用
    • 在该方法中会完成对不同的listener的handle方法的调用

    I0Itec-zkclient主要有如下三种listener,分别实现不同的监听方式。

    消息 handle的方法 备注
    IZkChildListener handleChildChange listening on zk child changes for a given path
    IZkDataListener handleDataChange listening on zk data changes for a given path
    IZkDataListener handleDataDeleted listening on zk data changes for a given path
    IZkStateListener handleStateChanged Called when the zookeeper connection state has changed.
    IZkStateListener handleNewSession Called after the zookeeper session has expired and a new session has been created
    IZkStateListener handleSessionEstablishmentError Called when a session cannot be re-established
    listener接口 listener listener 模块 监控对象 备注
    IZkChildListener DeleteTopicsListener PartitionStateMachine DeleteTopicsPath = "/admin/delete_topics"
    IZkChildListener IsrChangeNotificationListener KafkaController IsrChangeNotificationPath = "/isr_change_notification"
    IZkChildListener zkTopicEventListener ZookeeperTOpicEventWatcher BrokerTopicsPath = "/brokers/topics"
    IZkChildListener NodeChangeListener ZkNodeCHangeNotificaionListener in SimpleAclAuthorizer AclChangedZkPath = "/kafka-acl-changes" AclChangedPrefix = "acl_changes_"
    IZkChildListener NodeChangeListener ZkNodeCHangeNotificaionListener In DynamicConfigManager EntityConfigChangesPath = "/config/changes" EntityConfigChangeZnodePrefix = "config_change_"
    IZkChildListener ZKRebalancerListener ZookeeperConsumerConnector BrokerIdsPath = "/brokers/ids" 同时也会被由ZKTopicPartitionChangeListener触发,然后调用syncedRebalance进行rebalance
    IZkChildListener TopicChangeListener PartitionStateMachine BrokerTopicsPath = "/brokers/topics"
    IZkChildListener BrokerChangeListener ReplicaStateMachine BrokerIdsPath = "/brokers/ids"
    IZKDateListener PreferredReplicaElectionListener KafkaController PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
    IZKDateListener ZKTopicPartitionChangeListener ZookeeperConsumerConnector "/brokers/topics/${topic_name}" consumer监控的partition的变化并作出处理。 高级api
    IZKDateListener PartitionsReassignedListener KafkaController ReassignPartitionsPath = "/admin/reassign_partitions"
    IZKDateListener LeaderChangeListener ZookeeperLeaderElector ControllerPath = "/controller" Kafka的broker服务的升controller的竞选目录
    IZKDateListener ReassignedPartitionsIsrChangeListener KafkaController "/brokers/topics/${topic_name}/partitions/${partition_id}/state"
    IZKDateListener PartitionModificationsListener PartitionStateMachine "/brokers/topics/${topic_name}
    IZkStateListener SessionExpireListener KafkaHealthcheck kafka健康检查会检查进程和zk的session状态
    IZkStateListener ZKSessionExpireListener ZookeeperConsumerConnector consumser初始化时注册,在新建session时,也会调用到用syncedRebalance进行rebalance
    IZkStateListener SessionExpirationListener KafkaController kafkacontroller会监听与zk的链接状态,如果新建session,会调用onControllerResignation,并重新竞选leader
    IZkStateListener ZkStateChangeListener ZkNodeChangeNotificationListener In SimpleAclAuthorizer AclChangedZkPath = "/kafka-acl-changes" AclChangedPrefix = "acl_changes_" 在新建seesion时会通过注册的notificaionhandler处理所有notifications
    IZkStateListener ZkStateChangeListener ZkNodeChangeNotificationListener In DynamicConfigManager EntityConfigChangesPath = "/config/changes" EntityConfigChangeZnodePrefix = "config_change_" 在新建seesion时会通过注册的notificaionhandler处理所有notifications
    IZkStateListener ZkSessionExpireListener ZookeeperTopicEventWatcher 消费时监控和zk的链接状态,低级api有效

    Kafka内部有大量的listener机制完成不同的功能。

    listener接口 listener listener 模块 监控对象 备注
    IZkChildListener DeleteTopicsListener PartitionStateMachine DeleteTopicsPath = "/admin/delete_topics"
    IZkChildListener IsrChangeNotificationListener KafkaController IsrChangeNotificationPath = "/isr_change_notification"
    IZkChildListener zkTopicEventListener ZookeeperTOpicEventWatcher BrokerTopicsPath = "/brokers/topics"
    IZkChildListener NodeChangeListener ZkNodeCHangeNotificaionListener in SimpleAclAuthorizer AclChangedZkPath = "/kafka-acl-changes" AclChangedPrefix = "acl_changes_"
    IZkChildListener NodeChangeListener ZkNodeCHangeNotificaionListener In DynamicConfigManager EntityConfigChangesPath = "/config/changes" EntityConfigChangeZnodePrefix = "config_change_"
    IZkChildListener ZKRebalancerListener ZookeeperConsumerConnector BrokerIdsPath = "/brokers/ids" 同时也会被由ZKTopicPartitionChangeListener触发,然后调用syncedRebalance进行rebalance
    IZkChildListener TopicChangeListener PartitionStateMachine BrokerTopicsPath = "/brokers/topics"
    IZkChildListener BrokerChangeListener ReplicaStateMachine BrokerIdsPath = "/brokers/ids"
    IZKDateListener PreferredReplicaElectionListener KafkaController PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
    IZKDateListener ZKTopicPartitionChangeListener ZookeeperConsumerConnector "/brokers/topics/${topic_name}" consumer监控的partition的变化并作出处理。 高级api
    IZKDateListener PartitionsReassignedListener KafkaController ReassignPartitionsPath = "/admin/reassign_partitions"
    IZKDateListener LeaderChangeListener ZookeeperLeaderElector ControllerPath = "/controller" Kafka的broker服务的升controller的竞选目录
    IZKDateListener ReassignedPartitionsIsrChangeListener KafkaController "/brokers/topics/${topic_name}/partitions/${partition_id}/state"
    IZKDateListener PartitionModificationsListener PartitionStateMachine "/brokers/topics/${topic_name}
    IZkStateListener SessionExpireListener KafkaHealthcheck kafka健康检查会检查进程和zk的session状态
    IZkStateListener ZKSessionExpireListener ZookeeperConsumerConnector consumser初始化时注册,在新建session时,也会调用到用syncedRebalance进行rebalance
    IZkStateListener SessionExpirationListener KafkaController kafkacontroller会监听与zk的链接状态,如果新建session,会调用onControllerResignation,并重新竞选leader
    IZkStateListener ZkStateChangeListener ZkNodeChangeNotificationListener In SimpleAclAuthorizer AclChangedZkPath = "/kafka-acl-changes" AclChangedPrefix = "acl_changes_" 在新建seesion时会通过注册的notificaionhandler处理所有notifications
    IZkStateListener ZkStateChangeListener ZkNodeChangeNotificationListener In DynamicConfigManager EntityConfigChangesPath = "/config/changes" EntityConfigChangeZnodePrefix = "config_change_" 在新建seesion时会通过注册的notificaionhandler处理所有notifications
    IZkStateListener ZkSessionExpireListener ZookeeperTopicEventWatcher 消费时监控和zk的链接状态,低级api有效

    相关文章

      网友评论

        本文标题:Kakfa沟通机制(APiKeys & Listener

        本文链接:https://www.haomeiwen.com/subject/dsepxftx.html