美文网首页
Kafka核心API——AdminClient API

Kafka核心API——AdminClient API

作者: 端碗吹水 | 来源:发表于2020-05-16 18:52 被阅读0次

    五类Kafka客户端作用和区别

    上文中介绍了如何搭建一个Kafka服务,那么在开发中我们要如何去访问、集成Kafka呢?这就需要使用到本文将要介绍的Kafka客户端API。下图是官方文档中的一个图,形象表示了能与Kafka集成的客户端类型:

    image.png

    这些客户端通过API与Kafka进行集成,Kafka的五类客户端API类型如下:

    • AdminClient API:允许管理和检测Topic、broker以及其他Kafka实例,与Kafka自带的脚本命令作用类似
    • Producer API:发布消息到1个或多个Topic,也就是生产者或者说发布方需要用到的API
    • Consumer API:订阅1个或多个Topic,并处理产生的消息,也就是消费者或者说订阅方需要用到的API
    • Stream API:高效地将输入流转换到输出流,通常应用在一些流处理场景
    • Connector API:从一些源系统或应用程序拉取数据到Kafka,如上图中的DB

    创建工程

    在接下来的篇章中将会演示AdminClient API的具体使用,其余的API则会在后续的文章中进行介绍。首先,我们在IDEA中创建一个Spring Boot工程,该工程的pom.xml文件内容如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.0.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.zj.study</groupId>
        <artifactId>kafka-study</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>kafka-study</name>
        <description>Kafka study project for Spring Boot</description>
    
        <properties>
            <java.version>11</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- Kafka 客户端依赖 -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.5.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    AdminClient客户端的建立

    常用的AdminClient API对象如下:


    image.png

    显然,操作AdminClient API的前提是需要创建一个AdminClient实例。代码示例:

    /**
     * 配置并创建AdminClient
     */
    public static AdminClient adminClient() {
        Properties properties = new Properties();
        // 配置Kafka服务的访问地址及端口号
        properties.setProperty(AdminClientConfig.
                BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    
        // 创建AdminClient实例
        return AdminClient.create(properties);
    }
    

    创建了AdminClient的实例对象后,我们就可以通过它提供的方法操作Kafka,常用的方法如下:

    方法名称 作用
    createTopics 创建一个或多个Topic
    listTopics 查询Topic列表
    deleteTopics 删除一个或多个Topic
    describeTopics 查询Topic的描述信息
    describeConfigs 查询Topic、Broker等的所有配置项信息
    alterConfigs 用于修改Topic、Broker等的配置项信息(该方法在新版本中被标记为已过期)
    incrementalAlterConfigs 同样也是用于修改Topic、Broker等的配置项信息,但功能更多、更灵活,用于代替alterConfigs
    createPartitions 用于调整Topic的Partition数量,只能增加不能减少或删除,也就是说新设置的Partition数量必须大于等于之前的Partition数量

    Tips:

    • describeTopicsdescribeConfigs的意义主要是在监控上,很多用于监控Kafka的组件都会使用到这两个API,因为通过这两个API可以获取到Topic自身和周边的详细信息

    创建Topic

    使用createTopics方法可以创建Topic,传入的参数也与kafka-topics.sh命令脚本的参数一样。代码示例:

    /**
     * 创建topic
     */
    public static void createTopic() throws ExecutionException, InterruptedException {
        AdminClient adminClient = adminClient();
        // topic的名称
        String name = "MyTopic3";
        // partition数量
        int numPartitions = 1;
        // 副本数量
        short replicationFactor = 1;
        NewTopic topic = new NewTopic(name, numPartitions, replicationFactor);
        CreateTopicsResult result = adminClient.createTopics(List.of(topic));
        // 避免客户端连接太快断开而导致Topic没有创建成功
        Thread.sleep(500);
        // 获取topic设置的partition数量
        System.out.println(result.numPartitions(name).get());
    }
    

    查看Topic列表

    listTopics方法用于查询Topic列表,通过传入ListTopicsOptions参数可以设置一些可选项。代码示例:

    /**
     * 查询Topic列表
     */
    public static void topicLists() throws ExecutionException, InterruptedException {
        AdminClient adminClient = adminClient();
        ListTopicsResult result1 = adminClient.listTopics();
        // 打印Topic的名称
        System.out.println(result1.names().get());
        // 打印Topic的信息
        System.out.println(result1.listings().get());
    
        ListTopicsOptions options = new ListTopicsOptions();
        // 是否列出内部使用的Topic
        options.listInternal(true);
        ListTopicsResult result2 = adminClient.listTopics(options);
        System.out.println(result2.names().get());
    }
    

    关于listInternal选项:

    listInternal选项在Kafka 0.x的版本里是没有的,因为在0.x版本中Kafka是将consumer的offset信息存储在Zookeeper里,但由于Zookeeper同步consumer的offset信息比较慢,于是在1.x后就迁移到Kafka的Topic中进行存储了,这也是为了提高吞吐量和性能


    删除Topic

    deleteTopics方法可以删除一个或多个Topic,代码示例:

    /**
     * 删除Topic
     */
    public static void delTopics() throws ExecutionException, InterruptedException {
        AdminClient adminClient = adminClient();
        DeleteTopicsResult result = adminClient.deleteTopics(List.of("MyTopic1"));
        System.out.println(result.all().get());
    }
    

    Topic的描述信息查看

    一个Topic会有自身的描述信息,例如:partition的数量,副本集的数量,是否为internal等等。AdminClient中提供了describeTopics方法来查询这些描述信息。代码示例:

    /**
     * 查询Topic的描述信息
     */
    public static void describeTopics() throws ExecutionException, InterruptedException {
        AdminClient adminClient = adminClient();
        DescribeTopicsResult result = adminClient.describeTopics(List.of("MyTopic"));
        Map<String, TopicDescription> descriptionMap = result.all().get();
        descriptionMap.forEach((key, value) ->
                System.out.println("name: " + key + ", desc: " + value));
    }
    

    输出的内容如下:

    name: MyTopic, desc: (name=MyTopic, internal=false, partitions=(partition=0, leader=127.0.0.1:9092 (id: 0 rack: null), replicas=127.0.0.1:9092 (id: 0 rack: null), isr=127.0.0.1:9092 (id: 0 rack: null)), authorizedOperations=null)
    

    Topic配置信息查看

    除了Kafka自身的配置项外,其内部的Topic也会有非常多的配置项,我们可以通过describeConfigs方法来获取某个Topic中的配置项信息。代码示例:

    /**
     * 查询Topic的配置信息
     */
    public static void describeConfig() throws ExecutionException, InterruptedException {
        AdminClient adminClient = adminClient();
        ConfigResource configResource = new ConfigResource(
                ConfigResource.Type.TOPIC, "MyTopic"
        );
        DescribeConfigsResult result = adminClient.describeConfigs(List.of(configResource));
        Map<ConfigResource, Config> map = result.all().get();
        map.forEach((key, value) ->
                System.out.println("name: " + key.name() + ", desc: " + value));
    }
    

    输出的内容如下,会输出所有的配置信息,内容比较多:

    name: ConfigResource(type=TOPIC, name='MyTopic'), desc: Config(entries=[ConfigEntry(name=compression.type, value=producer, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=leader.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.downconversion.enable, value=true, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.insync.replicas, value=1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.jitter.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=cleanup.policy, value=delete, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.bytes, value=1073741824, source=STATIC_BROKER_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.messages, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.format.version, value=2.5-IV0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.compaction.lag.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=file.delete.delay.ms, value=60000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.message.bytes, value=1048588, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.compaction.lag.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=preallocate, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=index.interval.bytes, value=4096, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=unclean.leader.election.enable, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.bytes, value=-1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=delete.retention.ms, value=86400000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.index.bytes, value=10485760, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])])
    

    Topic配置信息修改

    除了可以查看Topic的配置项信息外,AdminClient还提供了相关方法来修改Topic配置项的值。在早期版本中,使用alterConfigs方法来修改配置项。代码示例:

    /**
     * 修改Topic的配置信息
     */
    public static void alterConfig() throws Exception {
        // 指定ConfigResource的类型及名称
        ConfigResource configResource = new ConfigResource(
                ConfigResource.Type.TOPIC, "MyTopic"
        );
        // 配置项以ConfigEntry形式存在
        Config config = new Config(List.of(
                new ConfigEntry("preallocate", "true")
        ));
    
        AdminClient adminClient = adminClient();
        Map<ConfigResource, Config> configMaps = new HashMap<>();
        configMaps.put(configResource, config);
        AlterConfigsResult result = adminClient.alterConfigs(configMaps);
        System.out.println(result.all().get());
    }
    
     public static void main(String[] args) throws Exception {
        alterConfig();
        describeConfig();
    }
    

    执行以上代码,控制台输出如下,可以看到成功将preallocate配置项的值改为了true

    image.png

    在新版本中则是使用incrementalAlterConfigs方法来修改Topic的配置项,该方法使用起来相对于alterConfigs要略微复杂一些,但因此功能更多、更灵活。代码示例:

    /**
     * 修改Topic的配置信息
     */
    public static void incrementalAlterConfig() throws Exception {
        // 指定ConfigResource的类型及名称
        ConfigResource configResource = new ConfigResource(
                ConfigResource.Type.TOPIC, "MyTopic"
        );
        // 配置项同样以ConfigEntry形式存在,只不过增加了操作类型
        // 以及能够支持操作多个配置项,相对来说功能更多、更灵活
        Collection<AlterConfigOp> configs = List.of(
                new AlterConfigOp(
                        new ConfigEntry("preallocate", "false"),
                        AlterConfigOp.OpType.SET
                )
        );
    
        AdminClient adminClient = adminClient();
        Map<ConfigResource, Collection<AlterConfigOp>> configMaps = new HashMap<>();
        configMaps.put(configResource, configs);
        AlterConfigsResult result = adminClient.incrementalAlterConfigs(configMaps);
        System.out.println(result.all().get());
    }
    
    public static void main(String[] args) throws Exception {
        incrementalAlterConfig();
        describeConfig();
    }   
    
    • Tips:在某些版本中,incrementalAlterConfigs方法可能会存在些问题,对单实例的Kafka支持得不是很好,会出现无法成功修改配置项的情况,此时就可以使用alterConfigs方法来代替,这也是为什么这里要介绍两种方法的使用方式

    执行以上代码,控制台输出如下,可以看到成功将preallocate配置项的值改为了false

    image.png

    调整Topic的Partition数量

    在创建Topic时我们需要设定Partition的数量,但如果觉得初始设置的Partition数量太少了,那么就可以使用createPartitions方法来调整Topic的Partition数量,但是需要注意在Kafka中Partition只能增加不能减少。代码示例:

    /**
     * 增加Partition数量,目前Kafka不支持删除或减少Partition
     */
    public static void incrPartitions() throws ExecutionException, InterruptedException {
        AdminClient adminClient = adminClient();
        Map<String, NewPartitions> newPartitions = new HashMap<>();
        // 将MyTopic的Partition数量调整为2
        newPartitions.put("MyTopic", NewPartitions.increaseTo(2));
        CreatePartitionsResult result = adminClient.createPartitions(newPartitions);
        System.out.println(result.all().get());
    }
    
    public static void main(String[] args) throws Exception {
        incrPartitions();
        describeTopics();
    }  
    

    执行以上代码,控制台输出如下,可以看到成功为该Topic增加了一个Partition:


    image.png
    • Tips:Partition的索引从0开始,所以第一个partition=0,第二个partition=1

    相关文章

      网友评论

          本文标题:Kafka核心API——AdminClient API

          本文链接:https://www.haomeiwen.com/subject/nwmrohtx.html