美文网首页
Flink流处理API

Flink流处理API

作者: 安申 | 来源:发表于2020-06-14 22:39 被阅读0次

    1.Flink的三大处理过程

    Flink处理过程

    2.Environment

    1)getExecutionEnvironment

    创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

    // 批处理环境

    val env = ExecutionEnvironment.getExecutionEnvironment

    // 流式数据处理环境

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    2)createLocalEnvironment

    返回本地执行环境,需要在调用时指定默认的并行度。

    val env = StreamExecutionEnvironment.createLocalEnvironment(1)

    3)createRemoteEnvironment

    返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

    val env =ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname",6123,"YOURPATH//wordcount.jar")

    3.Source

    1)从集合中读取数据

     val sensorDS:DataStream[WaterSensor] = env.fromCollection(

                List(

                   WaterSensor("ws_001", 1577844001, 45.0),

                   WaterSensor("ws_002", 1577844015, 43.0),

                   WaterSensor("ws_003", 1577844020, 42.0)

                )

            )

    2)从文件中读取数据

    val fileDS: DataStream[String] = env.readTextFile("input/data.txt")

    3)以kafka消息队列的数据作为来源

    (1)引入kafka连接器的依赖:

    <dependency>

       <groupId>org.apache.flink</groupId>

      <artifactId> flink-connector-kafka-0.11_2.11</artifactId>

       <version>1.7.2</version>

    </dependency>

    (2)代码实现:

    val kafkaDS: DataStream[String] = env.addSource(

    newFlinkKafkaConsumer011[String]("sensor",

    new SimpleStringSchema(), properties))

    4)自定义Source

    3.Transform:相当于Spark中的算子

    map

    flatMap

    filter

    keyBy

    4.Sink

    Flink没有类似于spark中foreach方法,让用户进行迭代的操作。所有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作

    stream.addSink(new MySink(xxxx))

    Flink支持的一些主流的Sink

    相关文章

      网友评论

          本文标题:Flink流处理API

          本文链接:https://www.haomeiwen.com/subject/nvtqxktx.html