美文网首页
Kakfa入门到使用

Kakfa入门到使用

作者: 依弗布德甘 | 来源:发表于2021-03-18 17:38 被阅读0次

    Kafka简介

    • Kafka是linkedin公司使用Scala语言编写的一款具有高水平扩展和高吞吐量的分布式消息系统
    • Kafka强依赖zookeeper,无论是Kafka集群,生产者(producer)和消费者(consumer)都依赖于zookeeper(zk)来保证系统可用性
    • 支持数据存储、集群与zk动态扩容,但仅支持仿AMQP协议

    kafka架构

    相关概念

    • Broker
      一台kafka服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic;

    • Producer
      消息生产者,就是向kafka broker发消息的客户端

    • Consumer
      消息消费者,向kafka broker取消息的客户端

    • Topic(消息队列)
      是数据主题,是kafka中用来代表一个数据流的抽象。发布数据时,可以用topic对数据进行分类,也作为订阅数据时的主题。一个topic同时可有多个producer、consumer

    • Consumer Group (CG)
      kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段,一个topic可以有多个CG

    • Partition(数据分片)
      每个partition是一个顺序的、不可变的record序列,partition中的record被分配一个自增长id(offset)

    • Offset
      Partition中自增长id序列

    分片:将一份完整的数据拆分成多个,分开存储;提高并发度,负载均衡;降低丢失数据的风险;

    副本:将数据复制一份出来存储,提高读取并发度(降低写入性能),提高数据可用性,主要用来解决服务器故障导致的数据丢失问题
    Leader:主副本,数据读写
    Follower:从副本,从主副本同步过来的数据

    • Record (记录)
      每条记录都有key、value、timestamp、offset 信息
      key时用来处理数据分发,分发规则类似 hash(key) % partition_count ;key为null则轮询partition,key固定一个则只会将数据发送到一个partition中

    Kafka 主要参数

    公共参数
    • bootstrap.servers
      kafka服务器连接地址

    • key.serializer
      key 序列化接口配置;如:org.apache.kafka.common.serialization.Serializer

    • value.serialize
      value序列化接口配置;可以和key.serializer相同,也可以不同,只要消费端保持一致


    Producer API
    • buffer.memory
      用于设置缓存消息的缓冲区大小,单位是字节,默认值是 33554432,即 32MB

    • batch.size
      发往同一分区的多条消息封装的批次大小,默认16k,控制发送数据的吞吐量

    • max.request.size
      发送单条最大消息大小,默认的 1048576 字节(1MB)

    • linger.ms
      缓冲区中的数据在达到batch.size前,需要等待的时间,默认值0

    • acks
      用来配置请求成功的标准,默认1;acks分别是0、1、all

    0: 数据发送出去
    1: 数据发送出去,并存储在Leader(主副本)
    all: 数据发送出去,并存储在Leader(主副本)并同步所有Follower(从副本)

    • retries
      发送消息失败重试的次数,默认值是 0,表示不进行重试(重试可能造成消息的重复发送、消息的乱序)
    • request.timeout.ms
      发送消息响应时间,默认30 秒,超过时间无响应抛TimeoutException异常

    Consumer API
    • group.id
      消费组,同一份数据可以由多个消费组来消费

    • auto.offset. reset
      数据消费的位置
      earliest:指定从最早的位移开始消费
      latest:指定从最新处位移开始消费
      none :指定如果未发现位移信息或位移越界,则抛出异常

    • enable.auto.commit
      offset提交方式,用来提交位移;ture自动提交,false手动提交

    • session.timeout.ms
      消费者组协调者(group coordinator) 检测消费组(group)失败的时间,导致不必要的 rebalance(数据平衡);默认10 秒

    • max.poll.interval.ms
      设置消息处理逻辑的最大时间,超过则会对consumer 崩溃检测,做rebalance(数据平衡)

    • max.partition.fetch.bytes
      指定了服务器从每个分区里返回给消费者的最大字节数,默认值是lMB

    • fetch.max.bytes
      consumer 端单次获取数据的最大字节数,单次超过则无法消费

    • max.poll.records
      单次 poll 调用返回的最大消息数;默认的 500 条

    • heartbeat.interval.ms
      consumer 间的心跳检测,做数据平衡用

    • connections.max.idle.ms
      关闭空闲 Socket 连接时间;当前默认值是 9 分钟;不关闭设置-1


    Consumer相关

    1. 客户端一定要配置kafka服务端的hosts,即使你使用的是IP来访问的
    2. Commit Offset的作用是为了将最新消费的位置保留在服务端.

    很多人认为,Commit offset的作用是让consumer在下次消费时,知道该从哪里继续消费,这种想法是错误的。commit offset只作为group下次启动时消费的起始位置

    • 自动 Offset Commite
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "192.168.90.131:9092");
    props.setProperty("group.id", "test");
    props.setProperty("enable.auto.commit", "true");
    props.setProperty("auto.commit.interval.ms", "1000");
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
    consumer.subscribe(Arrays.asList("test"));
    while (true) {
        System.out.println("getting...");
        //每次poll时,会将上次的get到的数据commit offset
        ConsumerRecords<String, String> records = consumer.poll(8000);
        for (ConsumerRecord<String, String> record : records){
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
    

    自动提交offset,具体是什么时候提交呢?
    consumer关闭 或 下次poll时,会将上次poll的消息的offset commit

    可能导致重复消费:
    成功消费一批消息后,commit offset前 consumer程序挂了,可能导致重复消费

    • 手动 Offset Commite
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "192.168.90.131:9092");
    props.setProperty("group.id", "test");
    props.setProperty("enable.auto.commit", "false");
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("test"));
    
    final int minBatchSize = 20;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    
    while (true) {
        System.out.println("getting...");
        ConsumerRecords<String, String> records = consumer.poll(4000);
    
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("received msg:" + record.value());
            buffer.add(record);
        }
        if (buffer.size() >= minBatchSize) {
            insertToDB(buffer);        //打印出内容
    
            System.out.println("======》》》offset commit。。。");
            consumer.commitSync();      //手动提交offset
            buffer.clear();
        }
    }
    

    在这种场景下,使用手动提交,还可能导致消息遗漏:在数据插入DB之后而宕机,程序下次启动时就会从上次commit的offset开始消费所以这种场景下,kafka提供的消费语义是at-least-once;需要采用事物来控制commit或在外部存储offset;

    3. 外部存储offset

    producer不是一定要将offset保存在Kafka中的,他们可以在自己的存储中保留offset,这样做可以允许程序在同一系统中落地数据并存储offset;
    1、关闭自动提交enable.auto.commit=false
    2、存储ConsumerRecord中提供的offset
    3、重启时,使用seek方法恢复consumer的消费位置

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "192.168.90.131:9092");
    props.setProperty("group.id", "test");
    props.setProperty("enable.auto.commit", "false");
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("test"));
    
    //kafka的分区逻辑是在poll方法里执行的,所以执行seek方法之前先执行一次poll方法
    //获取当前消费者消费分区的情况
    Set<TopicPartition> assignment = new HashSet<>();
    while (assignment.size() == 0) {
        //如果没有分配到分区,就一直循环下去
        kafkaConsumer.poll(100L);
        assignment = consumer.assignment();
    }
    for (TopicPartition tp : assignment) {
        //消费第当前分区的offset为10的消息
        kafkaConsumer.seek(tp, 10);
    }
     
    while (true) {
        System.out.println("getting...");
        ConsumerRecords<String, String> records = consumer.poll(4000);
        System.out.println("拉取的消息数量:" + records .count());
        System.out.println("消息集合是否为空:" + records .isEmpty());
    }
    

    Spring集成Kafka

    • 依赖包
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.1.4.RELEASE</version>
    </dependency>
    
    • 生产者Producer
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    import org.apache.kafka.clients.CommonClientConfigs
    import org.apache.kafka.common.config.SaslConfigs
    
    @Component
    public class Producer {
        
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    /*      
            //SASL Plaintest 权限
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") 
            props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512")
            props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"123456\";")
    */
            return new DefaultKafkaProducerFactory<>(props);
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
      
        @Autowired
        private KafkaTemplate<String, String> template;
    
        public void send(String data) {
            this.template.send("test_topic", data);
        }
    }
    
    • 消费者
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.config.KafkaListenerContainerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.listener.AbstractMessageListenerContainer;
    
    
    @Configuration
    @EnableKafka
    public class TestConsumerConfig {
    
        @Value("${kafka.test-consumer.username}")
        private String username;
        @Value("${kafka.test-consumer.password}")
        private String password;
    
        @Bean
        public KafkaListenerContainerFactory<?> testFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs(groupId)));
            factory.setBatchListener(true);
            factory.getContainerProperties().setPollTimeout(15000);
            factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
            return factory;
        }
    
        private Map<String, Object> consumerConfigs(String groupId) {
            Map<String, Object> propsMap = new HashMap<>(20);
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test_01");
            propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
            propsMap.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, ConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES * 10);
    /*
            //SASL Plaintest 权限
            String config = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";",username,password);
            propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            propsMap.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
            propsMap.put("sasl.jaas.config",config);
    */
            return propsMap;
        }
    
    }
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Component;
    
    import java.util.List;
    
    
    @Component
    public class Consumer{
     
        @KafkaListener(topics = "test_topic", containerFactory = "testFactory")
        public void handMessage(List<ConsumerRecord<?, ?>> recordList, Acknowledgment ack) { 
            try{
                for (ConsumerRecord<?, ?> record : recordList) {
                    ......
                }
                ack.acknowledge();
            }catch (Exception e){
                ......
            }
        }
    }
    

    Kafka 限流

    kafka客户端是认证的,那么可以使用userId和clientId两种认证方式。如果没有认证只能使用clientId限流

    bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1048576,consumer_byte_rate=1024'  --entity-type   clients    --entity-name clientA
    

    对clientId=clientA的客户端添加限流设置。producer_byte_rate表示每秒最多能写入到消息量,单位为byte/sec。consumer_byte_rate表示每秒最多能消费的消息了,单位也为byte/sec。设置后立即生效。

    Producer & Consumer设置
    // Producer:
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "clientA");
    // Consumer:
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "clientA");
    
    查看用户流量限制

    bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name clientA
    输出结果显示如下:
    Configs for user-principal ‘clientA’ are producer_byte_rate=1024,consumer_byte_rate=2048


    Kafka Tool 工具

    • 下载地址

    http://www.kafkatool.com/download.html

    • 注意事项

    1. 安装目录最好不要有带空格的文件路径下,在带参数启动时会报参数命令无效
    2. hosts 文件中,添加入 kafka 的集群域名,是要重启KafkaTools才会生效
    3. 在多集群下,bootstrap servers如果设置了,必须把所有的地址设置上
    • SASL加密连接

    2.0.4版本链接加密方式,需要在启动后命令后加入后缀,并指向一个配置文件地址 启动时需要修改快捷方式的目录路径,注意路径不能有代空格的文件夹地址
    • 客户端 jass.conf 配置文件
    KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
      username="admin"
      password="123456";
    };
    
    • 新版本解决需要配置文件的问题
    2.0.7版本中已新增的JAAS Config配置项解决上述问题,可直接在此配置

    相关文章

      网友评论

          本文标题:Kakfa入门到使用

          本文链接:https://www.haomeiwen.com/subject/rkiecltx.html