美文网首页实时计算框架Flink
Flink8:Flink流处理Api之Source

Flink8:Flink流处理Api之Source

作者: 勇于自信 | 来源:发表于2020-05-02 16:01 被阅读0次

    流处理基本步骤:


    Source

    1. 从集合读取数据

    package wordcount
    
    import org.apache.flink.streaming.api.scala._
    
    // 定义样例类,传感器id,时间戳,温度
    case class SensorReading(id:String,timestamp:Long,temperature:Double)
    
    object Sensor {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        // 1. 从集合中读取数据
        val stream1 = env.fromCollection(List(
          SensorReading("sensor_1", 1547718199, 35.80018327300259),
          SensorReading("sensor_6", 1547718201, 15.402984393403084),
          SensorReading("sensor_7", 1547718202, 6.720945201171228),
          SensorReading("sensor_10", 1547718205, 38.101067604893444)
        ))
        stream1.print("stream1:").setParallelism(1)
        env.execute()
      }
    }
    

    运行代码,打印结果:

    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    stream1:> SensorReading(sensor_1,1547718199,35.80018327300259)
    stream1:> SensorReading(sensor_6,1547718201,15.402984393403084)
    stream1:> SensorReading(sensor_7,1547718202,6.720945201171228)
    stream1:> SensorReading(sensor_10,1547718205,38.101067604893444)

    Process finished with exit code 0

    2. 从文件读取数据

    val stream2 = env.readTextFile("YOUR_FILE_PATH")
    

    3. 以kafka消息队列的数据作为来源

    1. 首先,pom配置如下:
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>flink-study</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>1.7.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>1.7.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
                <version>1.7.2</version>
            </dependency>
    
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.4.6</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
    <!--                <version>3.0.0</version>-->
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    
    1. 集成开发代码:
    package com.stream;
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
    
    import java.util.Properties;
    
    public class KafkaStream {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            Properties prop = new Properties();
            prop.setProperty("bootstrap.servers", "localhost:9092")
            prop.setProperty("group.id", "consumer-group")
            prop.setProperty("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer")
            prop.setProperty("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer")
            prop.setProperty("auto.offset.reset", "latest")
    
            DataStreamSource dstream = environment.addSource(new FlinkKafkaConsumer011("senser", new SimpleStringSchema(), prop));
            dstream.print("kafka test").setParallelism(1);
            environment.execute();
        }
    }
    
    
    1. 运行,上面开发是重点,因为演示环境麻烦,这里只给出操作步骤:
      3.1 在虚拟机启动zookeeper和kafka服务
      3.2 在虚拟机启动代码里响应配置的topic的producer
      3.3 进入flink主目录下执行./bin/start-cluster.sh启动flink
      3.4 assembly方式打包上传jar包到虚拟机执行:
      ./flink run -c com.stream.KafkaStream flink-study-1.0-SNAPSHOT-jar-with-dependencies.jar
      3.5 在producer输入数据,控制台没有结果,可以访问<u>h</u><u>ttp://hadoop1:8081</u>
      在TaskManager上看输出日志。

    2. Flink+kafka是如何实现exactly-once语义的:
      Flink通过checkpoint来保存数据是否处理完成的状态:
      由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。
      执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。
      如果宕机需要通过StateBackend进行恢复,只能恢复所有确认提交的操作。


    4. 自定义Source
    除了以上的source数据来源,我们还可以自定义source。需要做的,只是传入一个SourceFunction就可以。具体调用如下:

    package wordcount
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.api.scala._
    object SourceTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        //自定义source
        val stream = env.addSource(new MySensorSource())
        stream.print("stream").setParallelism(1)
        env.execute("source test")
      }
    }
    
    

    我们希望可以随机生成传感器数据,MySensorSource具体的代码实现如下:

    package wordcount
    
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    
    import scala.util.Random
    
    class MySensorSource extends SourceFunction[SensorReading]{
      //flag:表示数据源是否还在正常运行
      var runing: Boolean = true
    
      override def cancel(): Unit = {
        runing=false
      }
    
      override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
        //初始化一个随机数发生器
        val rand = new Random()
        //初始化定义一组传感器温度数据
        var curTemp=1.to(10).map(
          i=>("sensor_"+i,65+rand.nextGaussian()*20)
        )
        while(runing){
          //在前一次温度的基础上更新温度值
          curTemp = curTemp.map(
            t=>(t._1,t._2*rand.nextGaussian())
          )
    
          //获取当前时间戳
          val curTime = System.currentTimeMillis();
          curTemp.foreach(
            t=>ctx.collect(SensorReading(t._1,curTime,t._2))
          )
          //设置时间间隔
          Thread.sleep(500)
        }
      }
    }
    

    运行调用代码,结果控制台一直在输出不断随机生成的数据,如下:


    相关文章

      网友评论

        本文标题:Flink8:Flink流处理Api之Source

        本文链接:https://www.haomeiwen.com/subject/ijbkghtx.html