美文网首页
Kafka学习(一)

Kafka学习(一)

作者: 金融非耐斯 | 来源:发表于2023-01-22 12:07 被阅读0次
    • 下载安装

    wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
    # 解压
    tar -zxvf  kafka_2.13-3.3.1.tgz -C /opt/module/
    mv  kafka_2.13-3.3.1/ kafka
    # 修改配置文件
    cd config/
    vi server.properties #设置broker.id、log.dirs目录、zookeeper.connect
     # 添加到系统变量
    export KAFKA_HOME=/opt/module/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    
    
    • kafka启停脚本

    #!/bin/bash
    
    case $1 in 
    "start")
        for i in hadoop1 hadoop2 hadoop3
        do
            echo "--- 启动 $i kafka ---"
            ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
            done
    ;;
    "stop")
        for i in hadoop1 hadoop2 hadoop3
        do
            echo "--- 停止 $i kafka ---"
            ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
            done
    ;;
    esac
    
    • kafka主题使用

    # 创建主题
    bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --topic one --create --partitions 1 --replication-factor 3
    # 查看所有主题
    bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --list
    # 查看主题详细描述
    bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --topic one --describe
    
    • kafka生产者使用

    bin/kafka-console-producer.sh --bootstrap-server hadoop1:9092 --topic one 
    

    生产者有main线程和sender线程,main中的分区器默认32M,DQuene默认16K
    batch.size:16K,数据积累到16K后,sender才会发送或者
    linger.ms:发果长时间不到batch.size,可以设置等待时间,默认0ms
    应答机制
    0:生产者不需要等待数据落盘应答
    1:生产者要等Leader收到数据后应答
    -1:生产者要等Leader和ISR队列中所有节点收齐数据后应答

    异步发送

    # 导入依赖
    <dependencies>  
        <dependency>  
            <groupId>org.apache.kafka</groupId>  
            <artifactId>kafka-clients</artifactId>  
            <version>3.0.0</version>  
        </dependency>  
    </dependencies>
    

    KafkaProducer类

    //callback换成.get()就是同步
    public class KafkaProducer {
     
        public static void main(String[] args) {
            Properties properties = new Properties();
            //连接集群
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092,hadoop2:9092");
            //指定序列化类型
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); 
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
            properties.put("acks", "all");
            properties.put("retries", 0);
            properties.put("batch.size", 16384);
    1       properties.put("linger.ms", 1);
            properties.put("buffer.memory", 33554432);
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
            for (int i = 1; i <= 600; i++) {
                //参数1:topic名, 参数2:消息文本; ProducerRecord多个重载的构造方法
                kafkaProducer.send(new ProducerRecord<String, String>("one", "message"+i),new Callback(){
                     @Override
                     public void onCompletion(RecordMetadata metadata, Exception exception){
                           if(exception == null){
                               System.out.println("主题:"+ metadata.topic() + "  分区: "+metadata.partition());
                           }
                     }
                });
                System.out.println("message"+i);
            }
            kafkaProducer.close();
        }
    }
    
    • kafka消费者使用

    bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic one 
    # --from-beginning 从开始(历史数据也接收)接收
    
    • kafka自定义分区

    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.test.kafka.MyPartition");   //partition类名全路径
    
    public class MyPartition implements Partitioner {
      private Random random = new Random();
      @Override
      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
           String msgValues = value.toString();
            int partition;
            if(msgValues.contains("hello")){
                partition = 0;
            }else{
                partition = 1; 
           }
      }
    
      @Override
      public void close() {}
    
      @Override
      public void configure(Map<String, ?> configs) {}
    }
    
    • 提高生产者吞吐量

    # KafkaProducer类中加入
    //缓冲区大小
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
    //批次大小
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
    //linger.ms
    properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
    //压缩,压缩可配置gzip,snappy,lz4,zstd
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
    
    • ACK应答级别

    1.acks=0,生产者发送过来的数据就不管,可靠性差,效率高
    2.acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等
    3.acks=-1,生产者发送过来数据Leader和ISR队列里所有Follwer应答,可靠性高,效率低。

    //具体配置
    properties.put(ProducerConfig.ACKS_CONFIG,"all");
    //重试次数
    properties.put(ProducerConfig.RETRIES_CONFIG,3);
    
    • kafka事务

    注意:开启事务,必须开启幂等性,每个broker都有一个事务协调器,

    //幂等性默认是开启的
    enable.idempotence=true
    

    生产者事务

    # KafkaProducer类中加入
    //0.指定事务id
    properties.put(ProducerConfig.TRANSCATIONAL_ID_CONFIG,"transcational_id");
    //1.初始化事务
    kafkaProducer.initTransaction();
    //2.开启事务
    kafkaProducer.beginTransaction();
    //3.发送数据
    try{
        for (int i = 1; i <= 600; i++) {
                //参数1:topic名, 参数2:消息文本; ProducerRecord多个重载的构造方法
                kafkaProducer.send(new ProducerRecord<String, String>("one", "message"+i));
        }
        //4.提交事务
        kafkaProducer.commitTransaction();
    }catch(Exception e){
        kafkaProducer.abortTransaction();
    }finally{
        //关闭资源
        kafkaProducer.close();
    }
    
    • 数据的有序与乱序

    1.kafka在1.x版本之前保证数据单分区有序,条件是max.in.flight.requests.connection=1,无需考虑是否开启幂等性
    2.1.x以后的版本,分为未开启幂等性max.in.flight.requests.connection=1,需要设置为1,开启幂等性,max.in.flight.requests.connection=需要设置小于等于5.
    原因:因为1.x后,启用幂等后,kafka服务器会缓存producer发来的最近5个request的元数据,
    故无论如何,都可以保证最近5个request的数据都是有序的。

    相关文章

      网友评论

          本文标题:Kafka学习(一)

          本文链接:https://www.haomeiwen.com/subject/lebthdtx.html