美文网首页
Kafka分布式集群使用公网IP访问

Kafka分布式集群使用公网IP访问

作者: 去个帅气的昵称咯 | 来源:发表于2020-11-11 10:17 被阅读0次

    背景:在三台内网IP分别为172.16.0.1172.16.0.2172.16.0.3的Broker服务器的虚拟机上挂载了三个公网IP:124.71.32.XX1124.71.32.XX2124.71.32.XX3诉求是能通过公网生产与消费Kafka集群。

    编辑Broker虚拟机的路由文件hosts(每一台Broker都需要)
    vim /etc/hosts
    172.16.0.1 node-str-coregzoT
    172.16.0.2 node-str-coredYE
    172.16.0.3 node-str-corebFuB
    
    124.71.32.XX1 node-str-coregzoT
    124.71.32.XX2 node-str-coredYEJ
    124.71.32.XX3 node-str-corebFuB
    
    修改server.properties文件(每一台Broker都需要)
    advertised.listeners=PLAINTEXT://124.71.32.XX1:9092
    

    如果原本有声明listeners的,注释掉。

    重启Broker,测试
    #将CLI中IP替换成公网进行消费
    kafka-console-consumer.sh --bootstrap-server 124.71.32.XX1:9092 --topic test --from-beginning
    

    Kafka生产者样例(Java)

    public class KafkaProducer {
    
        public static void main(String[] args) {
            Properties prop = new Properties();
            prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "124.71.32.XX1:9092");
            prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            prop.put(ProducerConfig.ACKS_CONFIG, "1");
            prop.put(ProducerConfig.RETRIES_CONFIG, 3);
    
    
            KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
    
            for (int i = 0; i < 1000000; i++) {
                String key = "key-" + i;
                String message = "Message-" + i;
                producer.send(new ProducerRecord<>("test", key, message),
                        (recordMetadata, e) -> {
                            System.out.println(recordMetadata.toString());
                            System.out.println(e);
                        });
                System.out.println(key + "----" + message);
            }
            producer.close();
        }
    }
    

    Kafka消费者样例(Java)

    public class KafkaConsumer {
    
        public static void main(String[] args) throws InterruptedException {
            Properties properties = new Properties();
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-example");
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "124.71.32.XX1:9092");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
            properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
            properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            Consumer<String, String> consumer = new KafkaConsumer<>(properties);
            consumer.subscribe(Collections.singletonList("test"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10).getSeconds());
    
                long lastOffset = 0;
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                    lastOffset = record.offset();
                }
                Thread.sleep(5);
                consumer.commitSync();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Kafka分布式集群使用公网IP访问

          本文链接:https://www.haomeiwen.com/subject/pwcebktx.html