美文网首页
八、Kafka Connector

八、Kafka Connector

作者: 木戎 | 来源:发表于2019-05-21 17:45 被阅读0次

    Flink-kafka-connector

    • 读写kafka
      Kafka中的partition机制和Flink的并行度机制结合
    • 实现数据恢复
      Kafka可以作为Flink的source和sink 任务失败,通过设置kafka的offset来恢复应用

    配置

    • kafka启动服务
    nohup zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
    nohup kafka-server-start /usr/local/etc/kafka/server.properties &
    

    Flink消费Kafka注意事项

    • setStartFromGroupOffsets()【默认消费策略】

    默认读取上次保存的offset信息 如果是应用第一次启动,读取不到上次的offset信息,则会根据这个参数auto.offset.reset的值来进行消费数据

    • setStartFromEarliest() 从最早的数据开始进行消费,忽略存储的offset信息

    • setStartFromLatest() 从最新的数据进行消费,忽略存储的offset信息

    • setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>) 从指定位置进行消费

    • 当checkpoint机制开启的时候,KafkaConsumer会定期把kafka的offset信息还有其他operator的状态信息一块保存起来。当job失败重启的时候,Flink会从最近一次的checkpoint中进行恢复数据,重新消费kafka中的数据。

    • 为了能够使用支持容错的kafka Consumer,需要开启checkpoint env.enableCheckpointing(5000); // 每5s checkpoint一次。

    SerializationSchema&&DeserializationSchema


    如何将从 kafka 中获取的字节流转换为 Java Object,则通过 DeserializationSchema 来实现转换。其中 SimpleStringSchema 将 kafka 获取的字节流转换为字符串。
    其中 KeyedDeserializationSchema 支持 Key, Value 反序列化。

    示例

     <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka-0.9_2.12</artifactId>
          <version>1.8.0</version>
     </dependency>
    
    import org.apache.flink.streaming.api.datastream.DataStreamSink;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    import java.util.Random;
    
    public class KafkaDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1);
    
            Properties properties = new Properties();// kafka&&zk 配置参数
            properties.setProperty("bootstrap.servers", "localhost:9092");
            //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
            FlinkKafkaProducer09<String> producer = new FlinkKafkaProducer09("test",new SimpleStringSchema(),properties);
    /*
            //event-timestamp事件的发生时间
            producer.setWriteTimestampToKafka(true);
    */
            text.addSink(producer);
            env.execute();
        }
    
        public static class MyNoParalleSource implements SourceFunction<String> {//1
    
            //private long count = 1L;
            private boolean isRunning = true;
    
            /**
             * 主要的方法
             * 启动一个source
             * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
             *
             * @param ctx
             * @throws Exception
             */
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while(isRunning){
                    //图书的排行榜
                    List<String> books = new ArrayList<>();
                    books.add("Pyhton从入门到放弃");//10
                    books.add("Java从入门到放弃");//8
                    books.add("Php从入门到放弃");//5
                    books.add("C++从入门到放弃");//3
                    books.add("Scala从入门到放弃");//0-4
                    int i = new Random().nextInt(5);
                    // 此处类同于storm的emit操作
                    ctx.collect(books.get(i));
    
                    //每2秒产生一条数据
                    Thread.sleep(2000);
                }
            }
            //取消一个cancel的时候会调用的方法
            @Override
            public void cancel() {
                isRunning = false;
            }
        }
    }
    
    
    kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
    

    相关文章

      网友评论

          本文标题:八、Kafka Connector

          本文链接:https://www.haomeiwen.com/subject/klnjzqtx.html