美文网首页
vertx连接kafka

vertx连接kafka

作者: 淡看江湖等雨 | 来源:发表于2019-11-03 22:02 被阅读0次

    vertx的官方自我介绍

    Eclipse Vert.x is a tool-kit for building reactive applications on the JVM.

    也就是说vertx是一个JVM上面的响应式工具集。

    之所以选择vertx主要是考虑到springboot那一套过于笨重了,另外在响应式的支持上也不是很完善。

    引入依赖

            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-kafka-client</artifactId>
                <version>3.8.3</version>
                <exclusions><!--去掉log4j的依赖,改为logback -->
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
                    <!--lombok 一个简化开发的利器-->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <scope>provided</scope>
            </dependency>
    
            <!-- 反射工具包 -->
            <dependency>
                <groupId>org.reflections</groupId>
                <artifactId>reflections</artifactId>
                <version>0.9.11</version>
            </dependency>
    
    
            <!-- 导入slf4j的接口包以及对应日志框架的驱动包 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
            </dependency>
    
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
            </dependency>
    
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-core</artifactId>
            </dependency>
        </dependencies>
    

    消费者

    import java.util.HashMap;
    import java.util.Map;
    
    import io.vertx.core.Vertx;
    import io.vertx.kafka.client.consumer.KafkaConsumer;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class KafkaListener {
    
        public static final KafkaListener instance = new KafkaListener();
    
        KafkaConsumer<String, String> consumer;
    
        private KafkaListener() {
            Map<String, String> config = new HashMap<>();
            config.put("bootstrap.servers", "localhost:9092");
            config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            config.put("group.id", "my_group");
            config.put("auto.offset.reset", "earliest");
            config.put("enable.auto.commit", "false");
    
            Vertx vertx = Vertx.vertx();
            // 创建一个Kafka Consumer
            consumer = KafkaConsumer.create(vertx, config);
        }
    
        public void listen(String topic) {
            consumer.subscribe(topic).handler(r -> {
                log.info("收到消息");
            });
        }
    }
    
    

    生产者

    
    import java.util.Properties;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import io.vertx.core.Vertx;
    import io.vertx.kafka.client.producer.KafkaProducer;
    import io.vertx.kafka.client.producer.KafkaProducerRecord;
    import io.vertx.kafka.client.producer.impl.KafkaProducerRecordImpl;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class KafkaSender {
    
        static KafkaProducer<String, String> producer;
    
        static {
            Vertx vertx = Vertx.vertx();
            Properties config = new Properties();
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            config.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group");
            config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    
            // 注意这里的第三和第四个参数
            producer = KafkaProducer.create(vertx, config, String.class, String.class);
        }
    
        @SuppressWarnings({ "rawtypes", "unchecked" })
        public static void send(String topic, String message) {
            KafkaProducerRecord<String, String> record = new KafkaProducerRecordImpl(topic, message);
            producer.send(record, f -> {
                log.info("kafka主题{}消息{}发送{}", topic, message, f.succeeded() ? "成功" : "失败");
            });
        }
    }
    ···

    相关文章

      网友评论

          本文标题:vertx连接kafka

          本文链接:https://www.haomeiwen.com/subject/vgilbctx.html