美文网首页
Kafka分布式集群使用公网IP访问

Kafka分布式集群使用公网IP访问

作者: 去个帅气的昵称咯 | 来源:发表于2020-11-11 10:17 被阅读0次

背景:在三台内网IP分别为172.16.0.1172.16.0.2172.16.0.3的Broker服务器的虚拟机上挂载了三个公网IP:124.71.32.XX1124.71.32.XX2124.71.32.XX3诉求是能通过公网生产与消费Kafka集群。

编辑Broker虚拟机的路由文件hosts(每一台Broker都需要)
vim /etc/hosts
172.16.0.1 node-str-coregzoT
172.16.0.2 node-str-coredYE
172.16.0.3 node-str-corebFuB

124.71.32.XX1 node-str-coregzoT
124.71.32.XX2 node-str-coredYEJ
124.71.32.XX3 node-str-corebFuB
修改server.properties文件(每一台Broker都需要)
advertised.listeners=PLAINTEXT://124.71.32.XX1:9092

如果原本有声明listeners的,注释掉。

重启Broker,测试
#将CLI中IP替换成公网进行消费
kafka-console-consumer.sh --bootstrap-server 124.71.32.XX1:9092 --topic test --from-beginning

Kafka生产者样例(Java)

public class KafkaProducer {

    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "124.71.32.XX1:9092");
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        prop.put(ProducerConfig.ACKS_CONFIG, "1");
        prop.put(ProducerConfig.RETRIES_CONFIG, 3);


        KafkaProducer<String, String> producer = new KafkaProducer<>(prop);

        for (int i = 0; i < 1000000; i++) {
            String key = "key-" + i;
            String message = "Message-" + i;
            producer.send(new ProducerRecord<>("test", key, message),
                    (recordMetadata, e) -> {
                        System.out.println(recordMetadata.toString());
                        System.out.println(e);
                    });
            System.out.println(key + "----" + message);
        }
        producer.close();
    }
}

Kafka消费者样例(Java)

public class KafkaConsumer {

    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-example");
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "124.71.32.XX1:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
        properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        Consumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10).getSeconds());

            long lastOffset = 0;
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                lastOffset = record.offset();
            }
            Thread.sleep(5);
            consumer.commitSync();
        }
    }
}

相关文章

网友评论

      本文标题:Kafka分布式集群使用公网IP访问

      本文链接:https://www.haomeiwen.com/subject/pwcebktx.html