美文网首页dbkafka
java Kafka 简单应用实例

java Kafka 简单应用实例

作者: 狂奔的蜗牛_zxf | 来源:发表于2018-05-06 10:49 被阅读0次

    1、安装zookeeper

    下载zookeeper-3.4.9.tar;

    解压tar -zxvf zookeeper-3.4.9.tar;

    进入zookeeper-3.4.9/conf目录创建zoo.cfg文件,内容如下:

    tickTime=2000

    dataDir=/usr/myenv/zookeeper-3.4.8/data(填写自己的data目录)

    dataLogDir=/usr/myenv/zookeeper-3.4.8/logs

    clientPort=2181

    启动zookeeper:

    ./yourZookeeperDir/bin/zkServer.sh start

    2、安装kafka

    下载kafka:http://kafka.apache.org/downloads

    解压kafka:tar -zxvf kafka_2.10-0.8.2.1.tar

    修改config/server.propertie配置文件中zookeeper的host配置,由于zookeeper是在本地启动所以不需要修改:


    server.propertie配置

    启动kafka
    ./yourKafkaDir/bin/kafka-server-start.sh /yourKafkaDir/config/server.properties

    3、kafka java 应用demo

    kafka Producer

    package kafkaTest;
    
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    import java.util.Properties;
    
    
    public class KafkaProducer {
        private final Producer<String,String>producer;
        public final static String TOPIC = "TEST-TOPIC";
        public KafkaProducer() {
            Properties props = new Properties();
            props.put("metadata.broker.list","192.168.1.103:9092");
            props.put("serializer.class","kafka.serializer.StringEncoder");
            props.put("key.serializer.class", "kafka.serializer.StringEncoder");
            props.put("request.required.acks","-1");
            producer = new Producer<String, String>(new ProducerConfig(props));
        }
    
        public void produce(){
            int messageNo = 1000;
            final int COUNT = 10000;
            while (messageNo < COUNT){
                String key = String.valueOf(messageNo);
                String data = "@@@@@hello kafka message"+key;
                producer.send(new KeyedMessage<String, String>(TOPIC,key,data));
                System.out.println(data);
                messageNo++;
            }
        }
    
        public static void main(String[] args) {
            new KafkaProducer().produce();
        }
    
    }
    
    

    kafka consumer

    package kafkaTest;
    
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.serializer.StringDecoder;
    import kafka.utils.VerifiableProperties;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    
    public class KafkaConsumer {
        private final ConsumerConnector consumer;
        public KafkaConsumer() {
            Properties props = new Properties();
            props.put("zookeeper.connect","127.0.0.1:2181");
            props.put("group.id","test-group");
            props.put("zookeeper.session.timeout.ms", "4000");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            props.put("auto.offset.reset", "smallest");
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        }
        public void consume(){
            Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
            topicCountMap.put(KafkaProducer.TOPIC,new Integer(1));
            StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
            StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
            Map<String,List<KafkaStream<String,String>>> consumerMap =
                    consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
            KafkaStream<String,String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0);
            ConsumerIterator<String,String> it = stream.iterator();
            while (it.hasNext()){
                System.out.println(it.next().message());
            }
        }
    
        public static void main(String[] args) {
            new KafkaConsumer().consume();
        }
    }
    
    

    相关文章

      网友评论

        本文标题:java Kafka 简单应用实例

        本文链接:https://www.haomeiwen.com/subject/vinyrftx.html