美文网首页flink入门
flink学习之六-数据持久化to-kafka

flink学习之六-数据持久化to-kafka

作者: AlanKim | 来源:发表于2019-03-14 06:45 被阅读50次

上面将数据从kafka搬运到了mysql中,而很多时候,在处理之后也可以继续放到kafka中,供下游消费。

FlinkKafkaProducer

Flink提供了kafka的对应sink,FlinkKafkaProducer010,下面看看对应实现。

public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
    private static final long serialVersionUID = 1L;
    private boolean writeTimestampToKafka;

    public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) {
        this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
    }

    public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) {
        this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), producerConfig, (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
    }

    public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, @Nullable FlinkKafkaPartitioner<T> customPartitioner) {
        this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), producerConfig, (FlinkKafkaPartitioner)customPartitioner);
    }

    public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) {
        this(topicId, (KeyedSerializationSchema)serializationSchema, getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
    }

    public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
        this(topicId, (KeyedSerializationSchema)serializationSchema, producerConfig, (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
    }

    public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, @Nullable FlinkKafkaPartitioner<T> customPartitioner) {
        super(topicId, serializationSchema, producerConfig, customPartitioner);
        this.writeTimestampToKafka = false;
    }

    public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
        this.writeTimestampToKafka = writeTimestampToKafka;
    }
    
    ......

}

基本上都是构造函数,而追到上层,可以看到最终还是实现了RichSinkFunction:

public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {}

From Kafka To Kafka

下面的示例从kafka中获取到数据,然后写入到另一个kakfa topic中,当然中间可以做其他处理,这里为了求简单,省掉这一步,代码如下:

import java.util.Properties;

public class KafkaToKakfaJob {

    public static void main(String[] args) throws Exception {

        // 以kafka作为datasource
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 以event time 为准
        // event time: 数据在源头的发生时间,跟flink无关,数据产生时就已经确定过了
        // processing time : 数据在flink中开始被处理的时间,跟flink有关
        // ingestion time : 数据到达flink集群中的时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 构造kafka 及 zk的链接服务器时间
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("zookeeper.connect", "localhost:2181");
        properties.put("group.id", "metric-group");
        properties.put("auto.offset.reset", "latest");// 始终消费最新的数据
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        SingleOutputStreamOperator<String> dataStreamSource =
                env.addSource(new FlinkKafkaConsumer010<String>(
                        "testjin", // topic
                        new SimpleStringSchema(),
                        properties
                )).setParallelism(1)
//                .map(string -> JSON.parseObject(string,UrlInfo.class))
                ;

        // 写入kafka,可以通过kafka-topic.sh查看是否新增了一个 dest_testjin的topic
        dataStreamSource.addSink(new FlinkKafkaProducer010<String>(
                "localhost:9092",
                "dest_testjin",//dest topic,会重新创建一个
                new SimpleStringSchema()
        )).setParallelism(1).name("add to kafka dest topic");

        env.execute("execute from kafka to kafka");

可以看到,这里直接addSink(new FlinkKafkaProducer010),构造函数中传入kafka的broker、topic,以及序列化方式(继续采用最简单的SimpleStringSchema),执行后可以验证,在对应的kafka broker中可以看到新增加的topic dest_testjin.

相关文章

  • flink学习之六-数据持久化to-kafka

    上面将数据从kafka搬运到了mysql中,而很多时候,在处理之后也可以继续放到kafka中,供下游消费。 Fli...

  • Docker学习(13) 卷与持久化数据

    Docker学习(13) 卷与持久化数据 卷与持久化数据——简介 数据主要分为两种:持久化和非持久化。 持久化:就...

  • flink学习之五-数据持久化to-mysql

    flink中数据的落地,是使用sink来处理的。 上面例子中已经可以看到可以使用DataStream.addSin...

  • 第一行代码(六)

    第六章内容主讲数据持久化技术 一、数据持久化技术简介   数据持久化技术就是将瞬时数据(存储在内存中,有可能会因为...

  • [Android] Realm 的初探

    关于Realm 在学习 Android 的时候 , 我们知道数据持久化的问题,对于数据持久化通常有以下几种方式: ...

  • [Flink State] 从源码解析State的保存过程

    1 前言 State要能发挥作用,就需要持久化到可靠存储中,flink中持久化的动作就是checkpointing...

  • iOS本地数据持久化

    iOS本地数据持久化 iOS本地数据持久化

  • Redis-2 数据持久化及持久化配置

    一、数据持久化 开启持久化功能后,重启redis,数据会自动通过持久化文件恢复!! 1、redis持久化 – 两种...

  • SQLite数据库

    在学习SQLite之前,首先了解下数据持久化的几种方式: 定义:数据持久化是通过文件将数据存储在磁盘上 IOS下主...

  • 面试相关

    数据持久化 什么是持久化狭义的理解: “持久化”仅仅指把域对象永久保存到数据库中;广义的理解,“持久化”包括和数据...

网友评论

    本文标题:flink学习之六-数据持久化to-kafka

    本文链接:https://www.haomeiwen.com/subject/nwznuqtx.html