美文网首页
Flink流式处理DataSource 的数据源

Flink流式处理DataSource 的数据源

作者: wudl | 来源:发表于2020-12-02 11:28 被阅读0次

    1. 先看一下Flink 的编程模型

    graph LR
    A[Environment] -->B[DataSource] --> C[Transformattion]-->D[Sink    ]
    E[上下文环境 ]-->F[多个数据源]-->G[ 操作转化]-->Q[数据输出]
    

    1.1 Environment

    Environment 创建一个执行环境,通过 getExecutionEnvironment 返回你需要的环境。
    1.有返回本地环境:CreateLocalEnvironment
    如:val env = StreamExecutionEnvironment.createLocalEnvironment(1);
    2.返回集群执行环境: CreateRemoteEnvironment
    如:val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")
    

    2.Flink 的DataSource的数据源有:

    1.文件的数据源
    2.集合的数据源
    3.Kafka 的数据源
    4.自定义的Source  数据源
    

    2.1 文件的数据源

    java 代码
    
    package com.wudl.core;
    import org.apache.flink.streaming.api.scala.DataStream;
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
    /**
     * @ClassName : SourceFile
     * @Description : 读取文件
     * @Author :wudl
     * @Date: 2020-10-22 00:34
     */
    public class SourceFile {
        public static void main(String[] args) {
            // 获取上下文的环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<String> dataStream = env.readTextFile("D:\\ideaWorkSpace\\learning\\Flinklearning\\wudl-flink-java\\src\\main\\java\\com\\wudl\\core\\SourceFile.java");
            dataStream.print();
            env.execute();
        }
    }
    

    Scala 版本:

    package com.wudl.flink.core
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    /**
     * @ClassName : SourceFile
     * @Description : 读取文件的数据源
     * @Author :wudl
     * @Date: 2020-12-02 00:34
     */
    object FlinkFlieSource {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // 设置并行度
        env.setParallelism(1)
        //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
        import org.apache.flink.streaming.api.scala._
        val dataStream:DataStream[String] = env.readTextFile("F:\\ideaWorkSpace2020\\jg\\Flinklearning\\flink-core\\src\\main\\scala\\com\\wudl\\flink\\core\\FlinkFlieSource.scala")
        dataStream.print()
        env.execute("从文件读取数据源")
      }
    }
    

    2.2 从集合中的数据源

    package com.wudl.core;
    import com.wudl.bean.WaterSensor;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    /**
     * @ClassName : SourceConllection
     * @Description : 读取集合
     * @Author :wudl
     * @Date: 2020-10-22 00:20
     */
    public class SourceConllection {
        public static void main(String[] args) throws Exception {
    
            // 获取上下文的环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            List<WaterSensor> ts =  Arrays.asList(
                            new WaterSensor("sensor_1", 15321312412L, 41),
                            new WaterSensor("sensor_2", 15321763412L, 47),
                            new WaterSensor("sensor_3", 15369732412L, 49)
            );
            DataStreamSource<WaterSensor> dataStream = env.fromCollection(ts );
            dataStream.print();
            env.execute();
        }
    }
    

    2.3 从Kafka获取到的数据源

    package com.wudl.realproces
    import java.util.Properties
    import com.alibaba.fastjson.JSON
    import com.wudl.realproces.bean.{ClickLog, Message}
    import com.wudl.realproces.utils.GlobalConfigUtil
    import org.apache.commons.math.stat.descriptive.rank.Max
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.api.scala._
    import org.apache.flink.runtime.state.filesystem.FsStateBackend
    import org.apache.flink.streaming.api.environment.CheckpointConfig
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
    object App {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // 设置流式时间为EventTime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        // 设置并行度为1
        env.setParallelism(1)
        //  测试一下
        env.fromCollection(List("hadoop", "hive")).print()
        /**
         * 添加 Checkpoint
         * 保证程序长时间运行的安全性进行checkpoint操作
         */
        // 5秒钟启动一次checkpoint
        env.enableCheckpointing(5000)
        // 设置checkpoint 只 checkpoint 一次
     env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
        // 设置两次checkpoint 的最小时间间隔
        env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000)
        // checkpoint 的超时时间
        env.getCheckpointConfig.setCheckpointTimeout(60000)
        // 允许的最大checkpoint 的并行度
        env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1)
        // 当程序关闭时, 触发额外的checkpoint
     env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
        // 设置checkpoint 的地址
        env.setStateBackend(new FsStateBackend("hdfs://node01.com:8020/fink-checkpoint/"))
        /**
         * ***************************flink  整合kafka*******************************
         */
        val properties = new Properties()
        //    # Kafka集群地址
        properties.setProperty("bootstrap.servers", GlobalConfigUtil.bootstrapServers)
        //    # ZooKeeper集群地址
        properties.setProperty("zookeeper.connect", GlobalConfigUtil.zookeeperConnect)
        //    # Kafka Topic名称
        properties.setProperty("input.topic", GlobalConfigUtil.inputTopic)
        //    # 消费组ID
        properties.setProperty("group.id", GlobalConfigUtil.groupId)
        //    # 自动提交拉取到消费端的消息offset到kafka
        properties.setProperty("enable.auto.commit", GlobalConfigUtil.enableAutoCommit)
        //    # 自动提交offset到zookeeper的时间间隔单位(毫秒)
        properties.setProperty("auto.commit.interval.ms", GlobalConfigUtil.autoCommitIntervalMs)
        //    # 每次消费最新的数据
        properties.setProperty("auto.offset.reset", GlobalConfigUtil.autoOffsetReset)
        // 反序列化器   属性集合
        val consumer = new FlinkKafkaConsumer010[String](GlobalConfigUtil.inputTopic, new SimpleStringSchema(), properties)
        val kafkaStream: DataStream[String] = env.addSource(consumer)
        //    kafkaStream.print()
        //  将json 转化为元组
        val tunlpDataStream = kafkaStream.map {
          msgjson =>
            val jsonObject = JSON.parseObject(msgjson)
            val message = jsonObject.getString("message")
            val count = jsonObject.getLong("count")
            val timeStamp = jsonObject.getLong("timeStamp")
            Message(ClickLog(message), count, timeStamp)
        }
        tunlpDataStream.print()
    
        /**
         * ------------------------------- Flink 添加水印的支持--------------------
         */
        val watemarkDataStream = tunlpDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Message] {
          // 当前时间搓
          var currentTimeStamp = 0l
          // 延迟时间
          var maxDelayTime = 2000l
          //获取当前的时间戳
          override def getCurrentWatermark: Watermark = {
            new Watermark(currentTimeStamp - maxDelayTime)
          }
          //获取事件的时间
          override def extractTimestamp(element: Message, previousElementTimestamp: Long): Long = {
            currentTimeStamp = Math.max(element.timeStamp, previousElementTimestamp)
            currentTimeStamp
          }
        })
        // 数据的预处理
        env.execute()
      }
    
    }
    
    

    2.4 自定义Source的数据源

    package com.wudl.flink.core
    
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    
    import scala.util.Random
    /**
     * @ClassName : CustomerSource
     * @Description : 自定义数据源
     * @Author :wudl
     * @Date: 2020-12-02 11:23
     */
    object CustomerSource {
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        import org.apache.flink.streaming.api.scala._
        val stream: DataStream[StationLog] = env.addSource(new MyCustomerSource)
        stream.print()
        env.execute()
      }
      //写一个实现SourceFunction接口
      class MyCustomerSource extends SourceFunction[StationLog] {
        //是否终止数据流的标记
        var flag = true;
    
        /**
         * 主要的方法
         * 启动一个Source
         * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
         *
         * @param sourceContext
         * @throws Exception
         */
        override def run(sourceContext: SourceFunction.SourceContext[StationLog]):
        Unit = {
          val random = new Random()
          var types = Array("fail", "busy", "barring", "success")
          38
          while (flag) { //如果流没有终止,继续获取数据
            1.to(5).map(i => {
              var callOut = "1860000%04d".format(random.nextInt(10000))
              var callIn = "1890000%04d".format(random.nextInt(10000))
              new
                  StationLog("station_" + random.nextInt(10), callOut, callIn, types(random.nextInt(4
                  )), System.currentTimeMillis(), 0)
            }).foreach(sourceContext.collect(_)) //发数据
            Thread.sleep(2000) //每发送一次数据休眠2秒
          }
        } //终止数据流
        override def cancel(): Unit = flag = false
      }
    
    
    }
    
    
    在这里插入图片描述

    相关文章

      网友评论

          本文标题:Flink流式处理DataSource 的数据源

          本文链接:https://www.haomeiwen.com/subject/hnmywktx.html