背景:在三台内网IP分别为172.16.0.1,172.16.0.2,172.16.0.3的Broker服务器的虚拟机上挂载了三个公网IP:124.71.32.XX1,124.71.32.XX2,124.71.32.XX3。诉求是能通过公网生产与消费Kafka集群。
编辑Broker虚拟机的路由文件hosts
(每一台Broker都需要)
vim /etc/hosts
172.16.0.1 node-str-coregzoT
172.16.0.2 node-str-coredYE
172.16.0.3 node-str-corebFuB
124.71.32.XX1 node-str-coregzoT
124.71.32.XX2 node-str-coredYEJ
124.71.32.XX3 node-str-corebFuB
修改server.properties
文件(每一台Broker都需要)
advertised.listeners=PLAINTEXT://124.71.32.XX1:9092
如果原本有声明listeners
的,注释掉。
重启Broker,测试
#将CLI中IP替换成公网进行消费
kafka-console-consumer.sh --bootstrap-server 124.71.32.XX1:9092 --topic test --from-beginning
Kafka生产者样例(Java)
public class KafkaProducer {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "124.71.32.XX1:9092");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ProducerConfig.ACKS_CONFIG, "1");
prop.put(ProducerConfig.RETRIES_CONFIG, 3);
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
for (int i = 0; i < 1000000; i++) {
String key = "key-" + i;
String message = "Message-" + i;
producer.send(new ProducerRecord<>("test", key, message),
(recordMetadata, e) -> {
System.out.println(recordMetadata.toString());
System.out.println(e);
});
System.out.println(key + "----" + message);
}
producer.close();
}
}
Kafka消费者样例(Java)
public class KafkaConsumer {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-example");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "124.71.32.XX1:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10).getSeconds());
long lastOffset = 0;
for (ConsumerRecord<String, String> record : records) {
System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
lastOffset = record.offset();
}
Thread.sleep(5);
consumer.commitSync();
}
}
}
网友评论