美文网首页Kafka入门系列
Kafka入门系列—6. Kafka 常用命令及Java API

Kafka入门系列—6. Kafka 常用命令及Java API

作者: ted005 | 来源:发表于2018-12-06 16:26 被阅读41次

    常用命令

    • 启动Zookeeper
    ./zkServer.sh start-foreground
    

    可选参数:

    ./zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}
    
    • 启动ZooInspector,可以查看注册到Zookeeper的Kafka broker和topic情况:
    java -jar zookeeper-dev-ZooInspector.jar
    
    屏幕快照 2018-12-06 上午10.56.40.png
    • 启动Kafka
    ./kafka-server-start.sh ../config/server.properties
    
    • 创建topic,指定3个分区,每个分区1个副本
    ./kafka-topics.sh --create -topic testtopic -partitions 3 -replication-factor 1  -zookeeper localhost:2181
    
    • 列出所有topic
    ./kafka-topics.sh --list -zookeeper localhost:2181
    
    • 删除topic
    ./kafka-topics.sh --delete -topic testtopic -zookeeper localhost:2181
    
    • 使用producer命令行工具
    ./kafka-console-producer.sh -topic testtopic  --broker-list localhost:9092
    
    • 使用consumer命令行工具

    注意:--from-beginning会从初始offset位置开始接收消息;不加该参数从当前offset位置开始。

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic testtopic --from-beginning
    

    Java API使用

    • Producer API
    public class SampleProducer {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducer.class);
    
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "localhost:9092");
            properties.put("client.id", "DemoProducer");
            //序列化器
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
            ProducerRecord<String, String> record = new ProducerRecord<>("testtopic", "hello world");
    
            Future<RecordMetadata> future = producer.send(record);
            RecordMetadata recordMetadata = null;
            try {
                recordMetadata = future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println(recordMetadata);
    
        }
    }
    
    • Consumer API
    public class SampleConsumer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            //指定消费者组
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");
            //关闭自动位移提交
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
            //反序列化器
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            //订阅topic
            consumer.subscribe(Arrays.asList("testtopic"));
    
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
            records.forEach((ConsumerRecord<String, String> record) -> {
                System.out.println(record.value());
            });
    
            //手动提交位移
            consumer.commitAsync();
    
        }
    }
    
    

    相关文章

      网友评论

        本文标题:Kafka入门系列—6. Kafka 常用命令及Java API

        本文链接:https://www.haomeiwen.com/subject/znpocqtx.html