kafka简介

作者: 利伊奥克儿 | 来源:发表于2019-07-17 21:07 被阅读0次

    Apache Kafka是一个分布式流媒体平台

    • 流媒体平台有三个关键功能:
    1. 发布和订阅记录流,类似于消息队列或企业消息传递系统。
    2. 以容错的持久方式存储记录流。
    3. 记录发生时处理流。
    • Kafka通常用于两大类应用
    1. 构建可在系统或应用程序之间可靠获取数据的实时流数据管道
    2. 构建转换或响应数据流的实时流应用程序
    • 几个概念
    1. Kafka作为一个集群运行在一个或多个可跨多个数据中心的服务器上。
    2. Kafka集群以 topics存储记录流。
    3. 每条记录由一个键,一个值和一个时间戳组成。
    • 四个核心API
    1. Producer API允许应用程序发布的记录流至一个或多个 kafka 的 topics。
    2. Consumer API允许应用程序订阅一个或多个 topics,并处理所产生的数据流。
    3. Streams API允许应用程序充当流处理器,消费一个或多个topics 并输出一个或多个 topics,有效地进行输入流、输出流转换。
    4. Connector API允许构建和运行连接kafka topics到现有的应用程序或数据系统中可重用的生产者或消费者。例如,关系数据库的连接器可能捕获对表的每个更改。

    囊括几个主要API的案例

    创建带回调函数、自定义分区、拦截器的生产者

    //自定义分区
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    
    import java.util.Map;
    
    /**
     * @author lillcol
     * 2019/7/17-18:01
     */
    public class PartitionerNewProducer implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            return 0;//返回分区
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    
    //其实这两个拦截器可以写在一起的,只是为了验证先后顺序所以写了两个
    //时间拦截器
    package com.aaa.test;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     * @author lillcol
     * 2019/7/17-18:11
     */
    public class TimeInterceptor implements ProducerInterceptor {
        @Override
        public ProducerRecord onSend(ProducerRecord record) {
    //  该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。
    //  Producer 确保在消息被序列化以及计算分区前调用该方法。
    //  用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。
            // 创建一个新的record,把时间戳写入消息体的最前部
            return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
                    System.currentTimeMillis() + "," + record.value().toString());
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    //  该方法会在消息被应答或消息发送失败时调用,并且通常都是在 producer 回调逻辑触发之前。
    //  onAcknowledgement 运行在 producer 的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。
        }
    
        @Override
        public void close() {
    //  关闭 interceptor,主要用于执行一些资源清理工作。
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    //  获取配置信息和初始化数据时调用。
    
        }
    }
    
    
    //计数拦截器
    package com.aaa.test;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     * @author lillcol
     * 2019/7/17-18:16
     */
    public class CounterInterceptor implements ProducerInterceptor {
        private int errorCounter = 0;
        private int successCounter = 0;
    
        @Override
        public ProducerRecord onSend(ProducerRecord record) {
            return record;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            // 统计成功和失败的次数
            if (exception == null) {
                successCounter++;
            } else {
                errorCounter++;
            }
        }
    
        @Override
        public void close() {
            // 保存结果
            System.out.println("Successful sent: " + successCounter);
            System.out.println("Failed sent: " + errorCounter);
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    
    package com.aaa.test;
    
    import org.apache.kafka.clients.producer.*;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    
    /**
     * @author lillcol
     * 2019/7/17-17:56
     */
    public class CallBackNewProducer {
        public static void main(String[] args) throws InterruptedException {
    
            Properties props;
            props = new Properties();
            props.put("bootstrap.servers", "192.168.101.201:9092,192.168.101.203:9092");// Kafka服务端的主机名和端口号
            props.put("acks", "all"); // 等待所有副本节点的应答(应答级别)all等价于-1
            // props.put(ProducerConfig.ACKS_CONFIG, "all"); // 二者等价
            props.put("retries", 0);  // 消息发送最大尝试次数
            props.put("batch.size", 16384); // 一批消息处理大小
            props.put("linger.ms", 1);// 请求延时
            props.put("buffer.memory", 33554432);  // 发送缓存区内存大小(32M)
            props.put("partitioner.class", "com.aaa.test.PartitionerNewProducer");// 关联自定义分区
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // key序列化
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化
    
    
            List<String> interceptors = new ArrayList<String>(); //  构建拦截链
            interceptors.add("com.aaa.test.TimeInterceptor");
            interceptors.add("com.aaa.test.CounterInterceptor");
    
            props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); //配置拦截器
    
            // 创建生产者对象
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            // 测试循环发送数据
            for (int i = 0; i < 10; i++) {
                Thread.sleep(1000);
                String value = "count" + i;
                String key =   i+"";
                producer.send(new ProducerRecord<String, String>("ProductorTest", key,value), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (metadata != null) {
                            System.err.println(metadata.partition() + ":" + metadata.offset());
                        }
                    }
                });
            }
            // 关闭资源
            producer.close();
        }
    }
    //生产结果
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    0:915
    0:916
    0:917
    0:918
    0:919
    0:920
    0:921
    0:922
    0:923
    0:924
    Successful sent: 10 //计数拦截器生效
    Failed sent: 0
    
    //消费结果  通过value可知 添加时间拦截器生效
    log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.consumer.ConsumerConfig).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    offset = 915, key = 0, value = 1563359880298,count0
    offset = 916, key = 1, value = 1563359881453,count1
    offset = 917, key = 2, value = 1563359882453,count2
    offset = 918, key = 3, value = 1563359883453,count3
    offset = 919, key = 4, value = 1563359884453,count4
    offset = 920, key = 5, value = 1563359885453,count5
    offset = 921, key = 6, value = 1563359886453,count6
    offset = 922, key = 7, value = 1563359887453,count7
    offset = 923, key = 8, value = 1563359888453,count8
    offset = 924, key = 9, value = 1563359889453,count9
    
    
    

    消费者API

    package com.aaa.test;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * @author lillcol
     * 2019/6/18-14:50
     */
    public class ConsumerTest {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.101.201:9092");
            props.put("group.id", "ProductorTest_groupId");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("ProductorTest"));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
    
    
    

    Kafka Stream

    关于Kafka Stream
    个人觉得这东东 如果你都已经用了SparkStreaming、Storm或flink的话,这个就很鸡肋了。
    如果没用的话,这个上手也不难。
    网上各种对比 优略的也就是强行找些理由。
    在没有大数据框架的情况下就用,
    有的话就没必要用 Kafka Stream。
    仅仅是个人观点,也许我还没发现这东东的强大之处。


    疑问

    Q:kafka、flume都可以做流式数据采集,看起来功能差不多,那么要如何选择?

    A: 两者还是有些区别的:
    flume:
    适合多个生产者;(一个生产者对应一个 Agent 任务)
    适合下游数据消费者不多的情况;(多 channel 多 sink 会耗费很多内存)
    适合数据安全性要求不高的操作;(实际中更多使用 Memory Channel)
    适合与 Hadoop 生态圈对接的操作。

    kafka:
    适合数据下游消费者众多的情况;(开启更多的消费者任务即可,与 Kafka 集群无关)
    适合数据安全性要求较高的操作,支持replication。(数据放在磁盘里)

    一般生产常用的模型为:
    线上数据 --> flume(采集) --> kafka(离线/实时) --> flume(根据情景增删该流程) --> HDFS

    线上数据 --> flume(采集) --> kafka(离线/实时) --> 流出里框架(storm、spark、flink) --> HDFS(. . .)
    . . .

    Q: followers 什么时候同步logs?

    A:根据ACK应答机制,生产者可以设定该参数request.required.acks:
    0:表示不需要leader确认,生产者只管发,不管leader是否收到
    1:表示只需要leader确认,不管followers时候备份完成
    -1(all):表示leader和followers都需要确认。


    Q:如何保证生产者不丢失数据?

    A:生产者将ack参数设置为all即可


    Q:kafka 写入数据流程 ack=-1(all)

    A:producer 先从 zookeeper 的

    1. "/brokers/…/state"节点找到该 partition 的 leader
    2. producer 将消息发送给该 leader
    3. leader 将消息写入本地 log
    4. followers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK
    5. leader 收到所有ISR中的 replication 的 ACK 后,增加 HW(high watermark,最后 commit 的offset)并向 producer 发送 ACK

    Q:如何删除旧数据?

    A:有两个策略可以删除旧数据

    1. 基于时间:log.retention.hours(默认168小时即7天)
    2. 基于大小:log.retention.bytes

    Q:consumer为什么采用 pull(拉)模式从 broker 中读取数据?

    A:因为如果采用push(推),那么消息推送速率有broker决定。
    推送的数据量可能和consumer自身能力不匹配,推少了顶多浪费资源,推多了consumer 来不及处理消息就可能出现拒绝服务以及网络拥塞等问题。
    而采用 pull consumer就可以根据自己的胃口来消费数据。

    Q:consumer为什么采用 pull(拉)模式从 broker 中读取数据有缺点吗?

    A:如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。
    为了避免这种情况,我们在我们的 pull请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞(并且可选地等待给定的字节数,以确保传输大小)。


    相关文章

      网友评论

        本文标题:kafka简介

        本文链接:https://www.haomeiwen.com/subject/mgthlctx.html