美文网首页
Kafka实战 - 快速入门

Kafka实战 - 快速入门

作者: FX_SKY | 来源:发表于2017-04-14 11:11 被阅读408次

    Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。

    Kafka.png

    快速入门

    maven依赖

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.0</version>
    </dependency>
    

    生产者

    package com.bytebeats.mq.kafka;
    
    import java.io.IOException;
    import java.util.Properties;
    
    import com.bytebeats.mq.kafka.util.PropertyUtils;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class KafkaProducerDemo {
    
        private int total = 1000000;
        
        public void send(){
            
            long start = System.currentTimeMillis();
            System.out.println("Kafka Producer send msg start,total msgs:"+total);
            
            // set up the producer
            Producer<String, String> producer = null;
            try {
                Properties props = PropertyUtils.load("producer_config.properties");
                producer = new KafkaProducer<>(props);
                
                for (int i = 0; i < total; i++){
                    producer.send(new ProducerRecord<String, String>("hello",
                            String.valueOf(i), String.format("{\"type\":\"test\", \"t\":%d, \"k\":%d}", System.currentTimeMillis(), i)));
                    
                    // every so often send to a different topic
                    if (i % 1000 == 0) {
                        producer.send(new ProducerRecord<String, String>("test", String.format("{\"type\":\"marker\", \"t\":%d, \"k\":%d}", System.currentTimeMillis(), i)));
                        producer.send(new ProducerRecord<String, String>("hello", String.format("{\"type\":\"marker\", \"t\":%d, \"k\":%d}", System.currentTimeMillis(), i)));
                        
                        producer.flush();
                        System.out.println("Sent msg number " + i);
                    }
                    
                }
                
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                producer.close();
            }
            
            System.out.println("Kafka Producer send msg over,cost time:"+(System.currentTimeMillis()-start)+"ms");
        }
    }
    

    producer_config.properties

    bootstrap.servers=172.18.19.206:9092,172.18.19.207:9092,172.18.19.208:9092
    acks=all
    retries=0
    batch.size=16384
    linger.ms=1
    buffer.memory=33554432
    auto.commit.interval.ms=1000
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    

    消费者

    package com.bytebeats.mq.kafka;
    
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.Properties;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import com.google.gson.JsonObject;
    import com.google.gson.JsonParser;
    import com.bytebeats.mq.kafka.util.PropertyUtils;
    
    public class KafkaConsumerDemo {
    
        public void consume() {
            
            JsonParser jsonParser = new JsonParser();  
            
            // and the consumer
            KafkaConsumer<String, String> consumer = null;
            try {
                Properties props = PropertyUtils.load("consumer_config.properties");
                consumer = new KafkaConsumer<>(props);
                
                //subscribe topics
                consumer.subscribe(Arrays.asList("hello", "test"));
                
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (ConsumerRecord<String, String> record : records){
                        
    //                  System.out.printf("offset -> %d, key -> %s, value -> %s",
    //                          record.offset(), record.key(), record.value());
                        
                        switch (record.topic()) {
                            case "hello":
                            
                                JsonObject jObj = (JsonObject)jsonParser.parse(record.value()); 
                                switch (jObj.get("type").getAsString()) {
                                case "test":
                                    
                                    long latency = System.currentTimeMillis() - jObj.get("t").getAsLong();
                                    System.out.println(latency);
                                    
                                    break;
                                case "marker":
                                    
                                    break;
                                default:
                                    break;
                                }
                            break;
                            case "test":
                                
                                break;
                            default:
                                throw new IllegalStateException("Shouldn't be possible to get message on topic " + record.topic());
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }finally{
                if(consumer!=null){
                    consumer.close();
                }
            }
        }
    }
    

    consumer_config.properties

    bootstrap.servers=172.18.19.206:9092,172.18.19.207:9092,172.18.19.208:9092
    group.id=test
    enable.auto.commit=true
    auto.commit.interval.ms=1000
    session.timeout.ms=30000
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    

    PropertyUtils

    package com.bytebeats.mq.kafka.util;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Properties;
    
    public class PropertyUtils {
    
        private PropertyUtils(){
            
        }
        
        public static Properties load(File file) throws IOException{
            
            InputStream in = null;
            try {
                in = new FileInputStream(file);
                Properties props = new Properties();
                props.load(in);
                
                return props;
                
            }finally{
                IoUtils.closeQuietly(in);
            }
        }
        
        public static Properties load(String path) throws IOException{
            
            InputStream in = null;
            try {
                in = PropertyUtils.class.getClassLoader().getResourceAsStream(path);
                Properties props = new Properties();
                props.load(in);
                
                return props;
                
            }finally{
                IoUtils.closeQuietly(in);
            }
        }
    }
    

    源码下载

    https://github.com/TiFG/mq-in-action/tree/master/kafka-sample

    参考资料

    Kafka Introduction
    Kafka 0.9.0 Documentation

    相关文章

      网友评论

          本文标题:Kafka实战 - 快速入门

          本文链接:https://www.haomeiwen.com/subject/iydwattx.html