美文网首页
Flink开发流程

Flink开发流程

作者: ATNOW | 来源:发表于2019-03-13 10:35 被阅读0次

1.获取execution environment

StreamExecutionEnvironment是所有流式Flink程序的基础,可以通过静态方法获取environment:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(host: String, port: Int, jarFiles: String*)

2.加载/创建初始化数据

  • 基于文件
  • readTextFile(path)
    
    readFile(fileInputFormat, path)
    
    readFile(fileInputFormat, path, watchType, interval, pathFilter)
    
  • 基于socket
  • socketTextStream
    
  • 基于collection
  • fromCollection(Seq)
    
    fromCollection(Iterator)
    
    fromElements(elements: _*)
    
    fromParallelCollection(SplittableIterator)
    
    generateSequence(from, to)
    
    addSource()
    

3.数据的转换

不同的算子,例如常用的几种数据流和算子(参考http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams/

Flink 中目前支持的主要几种流的类型,以及它们之间的转换关系

4.指定放置计算结果的位置

writeAsText() / TextOutputFormat: 以字符串的形式逐行写入元素。字符串是通过调用每个元素的toString()方法获得的。

writeAsCsv(...) / CsvOutputFormat :将元组写入以逗号分隔的value文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。

print() / printToErr() :在标准输出/标准错误流上print每个元素的toString()值。还可以选择在输出之前增加prefix(msg)来帮助区分不同的打印调用如果并行度大于1,输出还将加上生成输出的任务的标识符。

writeUsingOutputFormat() / FileOutputFormat: 方法和基类自定义文件输出,支持自定义对象到字节的转换。

writeToSocket: 根据SerializationSchema将元素写入Socket。

addSink: 调用自定义sink函数,Flink附带了到其他系统(如Apache Kafka)的连接器,这些连接器实现了sink函数。

5.触发程序执行

在local模式下执行程序

execute()

将程序达成jar运行在线上

./bin/flink run \

-m node21:8081 \

./examples/batch/WordCount.jar \(jar包所在位置)

--input  hdfs:********** \

--output  hdfs:**********   \

相关文章

网友评论

      本文标题:Flink开发流程

      本文链接:https://www.haomeiwen.com/subject/loaapqtx.html