美文网首页
kafka开发笔记

kafka开发笔记

作者: 中科院_白乔 | 来源:发表于2017-09-05 20:19 被阅读0次

最近又要用上kafka,发现原来趟过的坑又趟过了一次
在这里做一下笔记,提醒以后注意:

  1. 如果连不上brokers,consumer.poll()会阻塞
  2. 同一个topic、同一个group的consumer,会彼此影响,哪怕前面的Test跑完了,后面创建的是新的consumer(这个至关重要)
  3. poll的正确使用方式是在死循环里面一直调用它
  4. 不同partition取到的records是乱序的

最后贴一段代码:

    val LINES2 = Array[String]("hello", "world", "bye", "world");
    val ROWS2 = LINES2.map(Row(_));

    class ConsumerThread(topic: String, groupId: String, buffer: ArrayBuffer[String]) extends Thread {
        //consumer
        val props = new Properties();
        props.put("group.id", groupId);
        props.put("bootstrap.servers", "vm105:9092,vm106:9092,vm107:9092,vm181:9092,vm182:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");

        val consumer = new KafkaConsumer[String, String](props);
        consumer.subscribe(Arrays.asList(topic));

        override def run {
            while (true) {
                val records = consumer.poll(100);
                records.foreach(record ⇒
                    println("key:" + record.key() + " value=" + record.value() + " partition:" + record.partition() + " offset=" + record.offset()));
                buffer ++= records.map(_.value()).toSeq;
                Thread.sleep(100);
            }
        }
    }

    @Test
    def testKafka() {
        val propsProducer = new Properties();

        propsProducer.put("bootstrap.servers", "vm105:9092,vm106:9092,vm107:9092,vm181:9092,vm182:9092");
        propsProducer.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        propsProducer.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        val producer = new KafkaProducer[String, String](propsProducer);

        val topic = "kafka-topic2";
        var index = -1;
        for (row ← ROWS2) {
            index += 1;
            val key = "" + index;
            //TODO: send an array instead of a string value?
            val value = row(0).toString();
            val record = new ProducerRecord[String, String](topic, key, value);
            producer.send(record, new Callback() {
                def onCompletion(metadata: RecordMetadata, e: Exception) = {
                    if (metadata != null) {
                        val offset = metadata.offset();
                        val partition = metadata.partition();
                        val topic = metadata.topic();
                        println(s"record is sent to kafka: topic=$topic, key=$key, value=$value, partition=$partition, offset=$offset");
                    }
                }
            });
        }

        Thread.sleep(1000);

        val buffer = ArrayBuffer[String]();
        val thread = new ConsumerThread("kafka-topic2", "g1", buffer);
        thread.start();
        Thread.sleep(10000);

        val records = buffer.toArray;
        thread.stop();
        println(records.toSeq);
        Assert.assertArrayEquals(LINES2.sorted.asInstanceOf[Array[Object]], records.sorted.asInstanceOf[Array[Object]]);
    }

相关文章

  • kafka开发笔记

    最近又要用上kafka,发现原来趟过的坑又趟过了一次在这里做一下笔记,提醒以后注意: 如果连不上brokers,c...

  • kafka 入门详解

    Kafka Kafka 核心概念 什么是 Kafka Kafka是由Apache软件基金会开发的一个开源流处理平台...

  • Kafka学习笔记

    kafka笔记 0. Kafka 安装 下载 wget http://mirrors.shu.edu.cn/apa...

  • 玩转大数据计算之Kafka

    Kafka版本:我们使用目前最新的版本:0.10.2 Kafka架构:Kafka是由LinkedIn开发的一个分布...

  • Apache Kafka

    Apache Kafka 关于Kafka Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala...

  • Kafka(什么是Kafka?Kafka的设计与实现!顺便教你如

    一、Kafka 简介 1.Kafka 创建背景 Kafka 是一个消息系统,原本开发自 LinkedIn,用作 L...

  • 这可能是最详细的Kafka应用了

    Kafka kafka是什么?kafka仅仅是属于消息 中间件吗? kafka在设计之初的时候 开发人员们在除了消...

  • 入门 Kafka必看

    初识 Kafka 什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区...

  • 真的,Kafka 入门一篇文章就够了

    初识 Kafka 什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区...

  • kafka

    kafka的概念 什么是kafka kafka是一个消息系统,由linkedin于2011年设计开发,用作link...

网友评论

      本文标题:kafka开发笔记

      本文链接:https://www.haomeiwen.com/subject/pvspjxtx.html