美文网首页
Kafka入门教程(2)

Kafka入门教程(2)

作者: 文歆云 | 来源:发表于2018-11-20 09:57 被阅读0次

Apache Kafka发布了Java版本的Producer,如果读者想要使用其他语言向Kafka生产数据,可以去Kafka官网对应的库,但要注意的是除了Java版本的意外,其他版本都是非官方支持的,笔者这里就使用Java来做演示了。

Java想要引入kafka客户端,maven你懂得

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>

gradle如下:

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.0.0'

引入的过程中读者或许还需要引入slf4j,应当注意一下。

当我们成功引入kafka-client后,就可以撸代码了,下面先演示一个最简单的消费者:

package cn.gaowenhao;
/*
-----------------------------------------------------
    Author : 高文豪
    Github : https://github.com/gaowenhao
    Blog   : https://gaowenhao.cn
-----------------------------------------------------
*/
 
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
 
public class FirstConsumer {
    public static void main(String[] args) {
        // 创建配置文件
        Properties props = new Properties();
 
        // 指定broker
        props.put("bootstrap.servers", "localhost:9092");
 
        // 指定消费者组
        props.put("group.id", "mygroup");
 
        // 指定序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
        // 创建消费者对象
        KafkaConsumer consumer = new KafkaConsumer<String, String>(props);
 
        // 订阅topic,这个方法需要的参数是Collection对象
        consumer.subscribe(Collections.singletonList("mytopic"));
 
        // 开启无限循环
        while (true) {
            // poll数据, 指定一个超时时间
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
            // 遍历拿到的数据集合
            for (var record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

当我们运行起上面这段代码,那么我们的consumer就挂起运行了,这时候consumer会从topic中不断地取出最早进入的数据然后打印,下面我们撸一个producer,给他产生一个数据

package cn.gaowenhao;
/*
-----------------------------------------------------
    Author : 高文豪
    Github : https://github.com/gaowenhao
    Blog   : https://gaowenhao.cn
-----------------------------------------------------
*/
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
 
public class FirstProducer {
    public static void main(String[] args) {
        // 设置配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
 
        // 指定序列化类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
        // 创建producer
        KafkaProducer producer = new KafkaProducer<String, String>(props);
 
        // 给mytopic,发送一条消息
        producer.send(new ProducerRecord("mytopic", "Hey --By Java"));
 
        // 关闭producer, 读者应该知道几乎所有的close里面都有个flush
        producer.close();
    }
}

如此我们可以运行上面的代码,这样读者consumer端就会收到类似这样一条消息:

offset = 2, key = null, value = Hey --By Java

读者的offset可能和笔者的不一样,但这里的offset是有序的,如果读者的offset也是2再发一条就是3。
key这个值是可以在producer端指定的,在我们ProducerRecord的时候可以加上一个key值,如果没加consumer这边将会显示null。
value则是ProducerRecord的第三个参数。

更多内容欢迎来笔者的个人博客 https://gaowenhao.cn

相关文章

网友评论

      本文标题:Kafka入门教程(2)

      本文链接:https://www.haomeiwen.com/subject/btrtqqtx.html