美文网首页
Kafka - 新消费者

Kafka - 新消费者

作者: 寒沧 | 来源:发表于2018-04-20 13:10 被阅读15次

    Kafka - 新消费者


    一、数据来源

    数据使用上一个博文所配置的 Flume,将文本数据写入到 Kafka中。不过这次有所改变,数据的监控目录 有所改变,写入的Kafka的主题名也变更为A25

    flume写入数据.png-36kBflume写入数据.png-36kB

    这里我们可以看到 Flume 对于新传上去的 A91 数据已经完成消费。

    二、消费者代码

    2.1 创建消费者

    创建消费者所使用的属性和生产者使用的属性差距不是很大:

    1. bootstrap.servers:指定了 Kafka 集群的连接字符串。
    2. key.deserializer 和 value.deserializer 与生产者的 serializer 定义也很类似,不过它们不是使用指定的类把 Java 对象转成字节数组,而是使用指定的类把字节数组转成 Java 对象。
    3. group.id:非必需,指定了 KafkaConsumer 属于哪一个消费者群组。

    创建消费者的代码如下:

    Properties props = new Properties();
    props.put("bootstrap.servers", "master:9092");
    props.put("group.id", "TestConsumer");
    props.put("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    

    2.2 订阅主题

    consumer.subscribe(Collections.singletonList("A25"));
    

    因为我们使用Flume对数据进行 Sinks 消费的时候,指定的主题为A25,因此我们这里在对数据进行订阅的时候,也是A25。

    同时,可以对订阅的主题传递正则表达式进行匹配,一次订阅多个主题。

    2.3 轮询消费

    try {
        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
    
            logger.info("records length = {}", records.count());
    
            for (ConsumerRecord record : records) {
                logger.info("topic = {}, partition = {}, offset = {}, key = {}, value = {}\n",
                        record.topic(), record.partition(), record.offset(),
                        record.key(), record.value());
            }
        }
    } finally {
        consumer.close();
    }
    

    这是一个无限循环。消费者实际上是一个长期运行的应用程序,它通过持续轮询向 Kafka 请求数据。

    消费者必须持续对 Kafka 进行轮询,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。传给 poll() 方法的参数是一个超时时间,用于控制 poll() 方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。如果该参数被设为 0,poll() 会立即返回,否则它会在指定的毫秒数内一直等待 broker 返回数据。

    poll() 方法返回一个记录列表。每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。我们一般会遍历这个列表,逐条处理这些记录。

    在退出应用程序之前使用 close() 方法关闭消费者。网络连接和 socket 也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳并认定它已死亡,因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息。

    运行结果如下:

    <center> kafka消费结果.png-30kBkafka消费结果.png-30kB

    </center>

    三、其他配置

    3.1 pom文件

     <properties>
        <java.version>1.8</java.version>
        <kafka.version>1.1.0</kafka.version>
    </properties>
    
    
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>
    
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
        </dependency>
    </dependencies>
    

    3.2 log4j.properties

    log4j.rootLogger=INFO,console
    log4j.additivity.org.apache=true
    
    # 控制台(console)
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.Threshold=DEBUG
    log4j.appender.console.ImmediateFlush=true
    log4j.appender.console.Target=System.err
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%p] %c  -  %m%n
    

    相关文章

      网友评论

          本文标题:Kafka - 新消费者

          本文链接:https://www.haomeiwen.com/subject/cqbakftx.html