
Flink API 提供的专门访问其他 存储系统的一套api 只需要创建对象.调用API内的方法就可以实现 对各种数据库的访问
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/
Kafka Connector

1. 从kafka中读取数据
- FlinkkafkaConsumer
- KafkaSource 1.14版本之后 新增的 必须掌握
2. 向Kafka中写入数据
-FlinkKakfaProducer
- KafkaSource 1.14版本之后 新增 必须掌握
Kafka 分布式消息队列 ,分布式消息中间件
Topic 主题队列 partition 分区 ( Sagment 片段) offset 偏移量
主从架构 broker节点 副本机制 主备副本
基于liux 磁盘 页缓存 零拷贝
Kafka消费数据需要
kafka集群地址
消费者组
topic / partition 一个topic 或者一个列表 多个topic
反序列化化类 字符数组->字符串
FlinkKafkaConsumer 可以指定消费位置
// todo: 1. 最早偏移量位置消费数据
flinkKafkaConsumer.setStartFromEarliest();
// todo: 2. 最新偏移量位置消费数据, 不设置时,默认值
flinkKafkaConsumer.setStartFromLatest() ;
// todo: 3. 从消费组上次消费的位置开始消费数据
flinkKafkaConsumer.setStartFromGroupOffsets();
// todo: 4. 指定时间戳开始消费数据
flinkKafkaConsumer.setStartFromTimestamp(1660725338991L) ;
// todo: 5. 指定具体分配偏移量位置消费数据
Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>() ;
specificStartupOffsets.put(new KafkaTopicPartition("flink-topic", 0), 5L);
specificStartupOffsets.put(new KafkaTopicPartition("flink-topic", 1), 4L);
specificStartupOffsets.put(new KafkaTopicPartition("flink-topic", 2), 9L);
flinkKafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
kafka source之 新增Topic 和分区发现
场景一 当我的FlinkKfkaConsumer 程序正在执行的时候,kafka中新增了几个topic
但是不想更改我的job代码
场景二 本来Topic中只有三个分区,然后新增了几个分区,我不想更改我的job代码 怎么办
解决方案
- 新增topic 发现 在指定分区的时候 使用正则表达式,且Kafka在创建 Topic的时候按照一定规则创建
//
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<String>(
// todo: topic名称,使用正则表达式,每topic名称设计的时候,最好符合一定规范
"flink-topic-[0~9]",
new SimpleStringSchema(),
props
);
- 新增 分区发现
只需要 指定配置文件时 设置动态发现分区 并写上 刷新时间
// todo: 设置属性,动态分区发现 props.setProperty("flink.partition-discovery.interval-millis", "5000") ;
Kafka source
1.14版本建议使用Kafkasource
// 2. 数据源-source
// 2-1. 创建Source数据源实例对象,设置属性值
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092")
.setTopics("flink-topic")
.setGroupId("flink-gid-2")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.latest())
//设置分区发现
.setProperty("partition.discovery.interval.ms", "5000")
.build();
// 2-2. 从数据源中获取数据
DataStreamSource<String> kafkaStream = env.fromSource(
source, WatermarkStrategy.noWatermarks(), "Kafka Source"
);
// 3. 数据转换-transformation
// 4. 数据终端-sink
kafkaStream.printToErr();
// 5. 触发执行-execute
env.execute("ConnectorKafkaSourceDemo");
}
网友评论