Kafka API实践

作者: ac619467fef3 | 来源:发表于2018-08-17 00:19 被阅读26次

    系统学习三步骤走:理解原理、搭建系统、Api练习。
    从哪里找到Api?Document和git。
    例如,Kafka在github上的地址github.com/apache/kafka,找到example目录。
    这也算是一个小技巧/apache/xxx,就是XXX的git目录。

    Kafka文档路径更好找,就在kafka.apache.org
    别用百度搜索,再跳转一次,记住xxx.apache.org就是apache项目的主目录。

    Producer 和 Comsumer
    如图,Kafka系统中包含三种角色,(1)producer生产者(2)Kafka Cluster消息队列(3)consumer消费者。

    在上篇文章中,介绍了Kafka安装,通过启动Kafka server,实现了Kafka Cluster。而生产者消费者,可以通过Api实现写入和读取消息队列。

    一、 pom.xml文件,引入依赖

    Kafka Api 被包含在Kafka-clients包中,修改pom.xml文件。

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.10.0.1</version>
            </dependency>
    

    二、编写Producer

    1.Producer 配置

    Properties props = new Properties();
    props.put("bootstrap.servers", "hbase:9092,datanode2:9092,datanode3:9092");
    props.put("acks", "all");
    props.put("retries", 3);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    • bootstrap.servers:kafka server的地址
    • acks:写入kafka时,leader负责一个该partion读写,当写入partition时,需要将记录同步到repli节点,all是全部同步节点都返回成功,leader才返回ack。
    • retris:写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
    • batch.size:produce积累到一定数据,一次发送。
    • buffer.memory: produce积累数据一次发送,缓存大小达到buffer.memory就发送数据。
    • linger.ms :当设置了缓冲区,消息就不会即时发送,如果消息总不够条数、或者消息不够buffer大小就不发送了吗?当消息超过linger时间,也会发送。
    • key/value serializer:序列化类。

    2.KafkaProducer

    • KafkaProducer
    import org.apache.kafka.clients.producer.KafkaProducer;
    
    Properties props = getConfig();
    Producer<String, String> producer =
                            new KafkaProducer<String, String>(props);
    
    • Producer是一个接口,声明了同步send和异步send两个重要方法。
        public Future<RecordMetadata> send(ProducerRecord<K, V> record);
        public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
    
    • ProducerRecord 消息实体类,每条消息由(topic,key,value,timestamp)四元组封装。一条消息key可以为空和timestamp可以设置当前时间为默认值。
    ProducerRecord record = new ProducerRecord<String, String>
    ("exam2", Integer.toString(i), Integer.toString(i));//exam2为topic
    producer.send(record);
    

    异步发送

    long startTime = System.currentTimeMillis();
    producer.send(new ProducerRecord<>(topic,messagekey,messageValue), 
            new DemoCallBack(startTime, messagekey, messageValue));
    

    DemoCallBack异步回调接口,包含2个函数,构造函数和onCompletion函数。
    返回的对象RecordMetadata包含partition和offset两个信息。

    class DemoCallBack implements Callback {
    
        private final long startTime;
        private final String key;
        private final String message;
    
        public DemoCallBack(long startTime, String key, String message) {
            this.startTime = startTime;
            this.key = key;
            this.message = message;
        }
        /**
         * @param metadata  The metadata for the record that was sent (i.e. the partition and offset). Null if an error
         *                  occurred.
         * @param exception The exception thrown during processing of this record. Null if no error occurred.
         */
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (metadata != null) {
                System.out.println(
                    "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                        "), " +
                        "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
            } else {
                exception.printStackTrace();
            }
        }
    }
    

    控制台输出结果,能够看出回调函数不是异步执行的。

    i:0
    i:1
    message(0, 0) sent to partition(6), offset(303) in 680 ms
    i:2
    message(1, 1) sent to partition(9), offset(295) in 126 ms
    message(2, 2) sent to partition(8), offset(343) in 53 ms
    i:3
    message(3, 3) sent to partition(3), offset(331) in 18 ms
    i:4
    message(4, 4) sent to partition(3), offset(332) in 8 ms
    i:5
    message(5, 5) sent to partition(0), offset(310) in 22 ms
    i:6
    message(6, 6) sent to partition(8), offset(344) in 8 ms
    i:7
    message(7, 7) sent to partition(9), offset(296) in 19 ms
    i:8
    i:9
    message(9, 9) sent to partition(3), offset(333) in 23 ms
    message(8, 8) sent to partition(7), offset(287) in 136 ms
    i:10
    message(10, 10) sent to partition(6), offset(304) in 21 ms

    三、编写Consumer

    1.Consumer 配置

    Properties props = new Properties();
    props.put("bootstrap.servers", "hbase:9092,datanode2:9092,datanode3:9092");
    props.put("group.id", "testGroup");
    props.put("enable.auto.commit", "true");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
    • group.id:testGroup。由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名。
    • enable.auto.commit:true。设置自动提交offset。

    2.KafkaConsumer

    KafkaConsumer

    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    Properties props = getConfig();
    consumer = new KafkaConsumer<String, String>(props);
    

    Consumer接口,声明了subscribe和poll两个重要方法。KafkaConsumer实现了Consumer接口。

    public void subscribe(Collection<String> topics);
    public ConsumerRecords<K, V> poll(long timeout);
    

    可以创建多个consumer线程,并发拉取消息。由于consumer是线程不安全的,合适的做法是每个线程创建并维护一个consumer对象。

    自定义KafkaConsumerRunner是一个多线程类,维护一个KafkaConsumer对象。

    // Thread to consume kafka data
    public static class KafkaConsumerRunner implements Runnable
    {
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer<String, String> consumer;
        private final String topic;
    
        public KafkaConsumerRunner(String topic)
        {
            Properties props = getConfig();
            consumer = new KafkaConsumer<String, String>(props);
            this.topic = topic;
        }
    
        public void handleRecord(ConsumerRecord record)
        {
            System.out.println("name: " + Thread.currentThread().getName()
                    + " ; topic: " + record.topic() + "; partition:"+record.partition()+
                    " ; offset" + record.offset() + " ; key: " + record.key() + " ; value: " + record.value());
        }
    
        public void run()
        {
            try {
                // subscribe
                consumer.subscribe(Arrays.asList(topic));
                while (!closed.get()) {
                    //read data
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                    // Handle new records
                    for (ConsumerRecord<String, String> record : records) {
                        handleRecord(record);
                    }
                }
            }
            catch (WakeupException e) {
                // Ignore exception if closing
                if (!closed.get()) {
                    throw e;
                }
            }
            finally {
                consumer.close();
            }
        }
    
        // Shutdown hook which can be called from a separate thread
        public void shutdown()
        {
            closed.set(true);
            consumer.wakeup();
        }
    }
    

    线程池启动多个consumer线程,

    int numConsumers = 3;
    final String topic = "exam2";
    final ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
    final List<KafkaConsumerRunner> consumers = new ArrayList<KafkaConsumerRunner>();
    for (int i = 0; i < numConsumers; i++) {
        KafkaConsumerRunner consumer = new KafkaConsumerRunner(topic);
        consumers.add(consumer);
        executor.submit(consumer);
    }
    
    执行结果:

    name: pool-1-thread-3 ; topic: exam2; partition:9 ; offset445 ; key: 1 ; value: 1
    name: pool-1-thread-2 ; topic: exam2; partition:6 ; offset448 ; key: 0 ; value: 0
    name: pool-1-thread-3 ; topic: exam2; partition:8 ; offset508 ; key: 2 ; value: 2
    name: pool-1-thread-1 ; topic: exam2; partition:3 ; offset495 ; key: 3 ; value: 3
    name: pool-1-thread-1 ; topic: exam2; partition:3 ; offset496 ; key: 4 ; value: 4
    name: pool-1-thread-1 ; topic: exam2; partition:0 ; offset461 ; key: 5 ; value: 5
    name: pool-1-thread-3 ; topic: exam2; partition:8 ; offset509 ; key: 6 ; value: 6
    name: pool-1-thread-3 ; topic: exam2; partition:9 ; offset446 ; key: 7 ; value: 7
    name: pool-1-thread-3 ; topic: exam2; partition:7 ; offset428 ; key: 8 ; value: 8
    name: pool-1-thread-1 ; topic: exam2; partition:3 ; offset497 ; key: 9 ; value: 9
    name: pool-1-thread-2 ; topic: exam2; partition:6 ; offset449 ; key: 10 ; value: 10
    name: pool-1-thread-3 ; topic: exam2; partition:8 ; offset510 ; key: 11 ; value: 11

    观察结果
    1. 保证每个consumer线程消费不同的partition。
    2. partition之间不能保证顺序进行,里如key:1和key:0
    3. 同一个partition内保证顺序性,即offset保证在同一partition内顺序进行。

    优雅的关闭子线程

    在main函数中,添加hook进程关闭的函数。new Thread 在进程关闭时触发,调用Consumer的shutdown函数,设置while循环的退出条件while (!closed.get())

    Runtime.getRuntime().addShutdownHook(new Thread()
    {
        @Override
        public void run()
        {
            for (KafkaConsumerRunner consumer : consumers) {
                consumer.shutdown();
            }
            executor.shutdown();
            try {
                executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("process quit");
        }
    });
    

    手动控制offset

    //设置由用户触发提交offset
    props.put("enable.auto.commit", "false");
    
    for (ConsumerRecord<String, String> record : records) {
        handleRecord(record);
    }
    consumer.commitAsync();
    
    运行结果:
    1. poll拉取的数据还是顺序返回,不会反复拉取offset的数据。
    2. 重启进程,由于offset没有提交,会重头处理offset。

    四、总结

    本文测试了kafka提供的Api。
    在实际应用中kafka会和spark stream结合,采用流式计算的方式处理kafka中数据。

    相关文章

      网友评论

        本文标题:Kafka API实践

        本文链接:https://www.haomeiwen.com/subject/nmfobftx.html