美文网首页
Kafka demo

Kafka demo

作者: 小玉1991 | 来源:发表于2021-05-08 20:59 被阅读0次

    kafka在工作中用到,需要有生产者提供数据。最后自己做了一个demo出来。可以正常发送和接受数据。数据可以通过可视化工具 Offset Explorer 2 查看。
    先上代码:

    pom.xml

    <properties>
            <java.version>1.8</java.version>
            <kafka-clients.version>0.9.0.1</kafka-clients.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <!-- kafka begin -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>${kafka-clients.version}</version>
            </dependency>
            <!-- kafka end -->
        </dependencies>
    

    productor 注意:这个要用send().get()方式同步发送消息。没有get()是异步发生,因为main程序很快就结束了,所以消息就没有发送出去。

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class Producer {
    
        private final org.apache.kafka.clients.producer.KafkaProducer producer;
    //    public final static String TOPIC = "op_move_error_event";
        public final static String TOPIC = "op_move_event_dev";
    
        private Producer() {
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "kafka01.dev.in.songguo7.com:9092,kafka02.dev.in.songguo7.com:9092,kafka03.dev.in.songguo7.com:9092\n");
            props.put("acks", "all");
            props.put("retries", 3);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            producer = new KafkaProducer(props);
        }
    
        void produce() {
            try {
                producer.send(new ProducerRecord<>("op_move_event_dev", "1111", "{move_type:123}")).get();
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("error--->"+e);
            }
            System.out.println("ok");
        }
    
        public static void main(String[] args) {
            new Producer().produce();
        }
    }
    

    Consumer:

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    public class Consumer {
    
        private final KafkaConsumer consumer;
    
        private Consumer() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "kafka01.dev.in.songguo7.com:9092,kafka02.dev.in.songguo7.com:9092,kafka03.dev.in.songguo7.com:9092\n");
            //group 代表一个消费组
            props.put("group.id", "7_bike_geo_new");
    
            props.put("enable.auto.commit", true);
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            consumer=new KafkaConsumer(props);
            consumer.subscribe(Arrays.asList(Producer.TOPIC));
        }
    
        void consume() {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(1);
                if (!consumerRecords.isEmpty()) {
                    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                        System.out.println("TopicName: " + consumerRecord.topic() + " Partition:" +
                                consumerRecord.partition() + " Offset:" + consumerRecord.offset() + "" +
                                " Msg:" + consumerRecord.value());
                        //进行逻辑处理
                    }
    
                }
            }
        }
        public static void main(String[] args) {
            new Consumer().consume();
        }
    }
    

    结果:


    可视化工具
    consumer

    相关文章

      网友评论

          本文标题:Kafka demo

          本文链接:https://www.haomeiwen.com/subject/wqrpdltx.html