美文网首页
Flink_Connector 连接器_kafka

Flink_Connector 连接器_kafka

作者: Eqo | 来源:发表于2022-08-20 00:03 被阅读0次
    image.png
    Flink API 提供的专门访问其他 存储系统的一套api 只需要创建对象.调用API内的方法就可以实现 对各种数据库的访问
    https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/

    Kafka Connector

    image.png

    1. 从kafka中读取数据

    • FlinkkafkaConsumer
    • KafkaSource 1.14版本之后 新增的 必须掌握

    2. 向Kafka中写入数据

    -FlinkKakfaProducer

    • KafkaSource 1.14版本之后 新增 必须掌握

    Kafka 分布式消息队列 ,分布式消息中间件
    Topic 主题队列 partition 分区 ( Sagment 片段) offset 偏移量
    主从架构 broker节点 副本机制 主备副本
    基于liux 磁盘 页缓存 零拷贝


    Kafka消费数据需要

    kafka集群地址
    消费者组
    topic / partition 一个topic 或者一个列表 多个topic
    反序列化化类 字符数组->字符串

    FlinkKafkaConsumer 可以指定消费位置

            // todo: 1. 最早偏移量位置消费数据
             flinkKafkaConsumer.setStartFromEarliest();
    
            // todo: 2. 最新偏移量位置消费数据, 不设置时,默认值
             flinkKafkaConsumer.setStartFromLatest() ;
    
            // todo: 3. 从消费组上次消费的位置开始消费数据
            flinkKafkaConsumer.setStartFromGroupOffsets();
    
            // todo: 4. 指定时间戳开始消费数据
             flinkKafkaConsumer.setStartFromTimestamp(1660725338991L) ;
    
            // todo: 5. 指定具体分配偏移量位置消费数据
            Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>() ;
            specificStartupOffsets.put(new KafkaTopicPartition("flink-topic", 0), 5L);
            specificStartupOffsets.put(new KafkaTopicPartition("flink-topic", 1), 4L);
            specificStartupOffsets.put(new KafkaTopicPartition("flink-topic", 2), 9L);
            flinkKafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
    
    

    kafka source之 新增Topic 和分区发现

    场景一 当我的FlinkKfkaConsumer 程序正在执行的时候,kafka中新增了几个topic
    但是不想更改我的job代码
    场景二 本来Topic中只有三个分区,然后新增了几个分区,我不想更改我的job代码 怎么办

    解决方案

    • 新增topic 发现 在指定分区的时候 使用正则表达式,且Kafka在创建 Topic的时候按照一定规则创建
                // 
            FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<String>(
                // todo: topic名称,使用正则表达式,每topic名称设计的时候,最好符合一定规范
                "flink-topic-[0~9]",
                new SimpleStringSchema(),
                props
            );
    
    • 新增 分区发现
      只需要 指定配置文件时 设置动态发现分区 并写上 刷新时间
       // todo: 设置属性,动态分区发现
     props.setProperty("flink.partition-discovery.interval-millis", "5000") ;
    

    Kafka source

    1.14版本建议使用Kafkasource

            // 2. 数据源-source
            // 2-1. 创建Source数据源实例对象,设置属性值
            KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092")
                .setTopics("flink-topic")
                .setGroupId("flink-gid-2")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.latest())
                    //设置分区发现
                .setProperty("partition.discovery.interval.ms", "5000")
                .build();
            // 2-2. 从数据源中获取数据
            DataStreamSource<String> kafkaStream = env.fromSource(
                source, WatermarkStrategy.noWatermarks(), "Kafka Source"
            );
    
    
            // 3. 数据转换-transformation
    
            // 4. 数据终端-sink
            kafkaStream.printToErr();
    
            // 5. 触发执行-execute
            env.execute("ConnectorKafkaSourceDemo");
        }
    

    相关文章

      网友评论

          本文标题:Flink_Connector 连接器_kafka

          本文链接:https://www.haomeiwen.com/subject/zebugrtx.html