flink流处理和批处理都内置了很多数据源,可以满足我们大部分使用场景,当然也可以通过实现flink提供的接口来实现其他数据源的接入。接下来我们就分别来了解下flink批处理和流处理的数据源吧。
1 批处理
批处理的数据源主要大致分为两类:集合数据源、文件数据源。
sequence
集合数据源之迭代类。平时我们所使用到的scala中Seq(),Array(),List(),Map(),Set()等都是迭代类。
env.fromCollection(data: Iterable[T])
// 示例
env.fromCollection(Seq(1, 2, 3))
env.fromCollection(Array(1, 2, 3, 4, 5))
env.fromCollection(List(1, 2, 3, 4, 5))
env.fromCollection(Map("a" -> 1, "b" -> 2, "c" -> 3))
env.fromCollection(Set(3, 5, 6))
// 并行版本
env.fromParallelCollection(iterator: SplittableIterator[T])
// 示例
env.fromParallelCollection(new NumberSequenceIterator(3, 19))
01_parallel_collection.png
集合类型源之元素数组。类似于java中的可变参数列表。
env.fromElements(data: T*)
// 示例
env.fromElements(1, 2, 3)
env.fromElements("a", "b", "c")
file
一般来说,我们的批处理程序的数据源主要都是文件数据源,包括本地文件系统中的和分布式文件系统中的文件。使用频次比较高的是readTextFile()和readCsvFile(),当然也可以通过重写readFlie(new FileInputStream[T], filePath: String)中的FileInputFormat来读取不同格式文件。
env.readFile(new FileInputStream[T], filePath: String)
env.readTextFile(filePath: String, charsetName: String = "UTF-8")
env.readTextFileWithValue(filePath: String, charsetName: String = "UTF-8")
env.readFileOfPrimitives(filePath: String, delimiter: String = "\n")
env.readCsvFile(
filePath: String, // csv文件地址
lineDelimiter: String = "\n", // 行划分符号
fieldDelimiter: String = ",", // 字段划分符号
quoteCharacter: Character = null,
ignoreFirstLine: Boolean = false, // 忽略首行
ignoreComments: String = null,
lenient: Boolean = false, // 是否对数据严格判断。false表示严格判断,缺失的数据则会被忽略
includedFields: Array[Int] = null,
pojoFields: Array[String] = null
)
readFile,自定义读取文件内容的逻辑
env.readFile(new MyFileInputFormat[String](), getPath("words.txt"))
// 重写读取文件的逻辑
class MyFileInputFormat extends FileInputFormat[String]() {
private var end = false
private var input: BufferedReader = _
// 只执行一次,此处创建输入流对象
override def open(fileSplit: FileInputSplit): Unit = {
val inputStream = new FileInputStream(this.getFilePath.getPath)
input = new BufferedReader(new InputStreamReader(inputStream))
}
// 判断是否读取到了文件尾部
override def reachedEnd(): Boolean = this.end
// 读取下一行的操作逻辑
override def nextRecord(ot: String): String = {
val str: String = input.readLine()
if (str == null) {
this.end = true
""
} else str
}
}
2 流处理
flink流处理的数据源大致分为四种:集合数据源、文件数据源、套接字数据源、自定义数据源。flink还原生给我们写好了kafka数据源,这是实时流处理中使用的最频繁没有之一的数据源(下边会单独简单展示一下)。
sequence
不用多说了吧,完全参考批处理中的sequence。
env.fromCollection(data: Iterable[T])
env.fromElements(data: T*)
env.fromParallelCollection(iterator: SplittableIterator[T])
file
一样参考批处理中的flie。只是有一点儿不同的是流处理中readFile()还能监视文件的变更状况来进行更多的处理方式,如文件新增了记录,可以重新处理文件或者直接退出。
env.readTextFile(filePath: String, charsetName: String)
env.readFile(
inputFormat: FileInputFormat[T],
filePath: String,
watchType: FileProcessingMode, // 监视路径并响应新数据,或处理一次并退出
interval: Long) // 间隔时间millis
socket
直接监视指定机子socket端口的记录。
env.socketTextStream(
hostname: String,
port: Int,
delimiter: Char = '\n',
maxRetry: Long = 0) // 如果端口监听中断,最大重试间隔时间
自定义
当上边所有的数据源都满足不了我们的场景需求时,我们可以通过继承flink暴露的SourceFunction来实现自己的数据源(下文会展示自定义MySQL数据源)。
env.addSource(function: SourceContext[T] => Unit)
env.addSource(function: SourceFunction[T])
3 kafka connector
kafka connector是flink提供给我们的自定义连接器,可以直接实例化FlinkKafkaConsumer对象来消费kafka中记录。
object FlinkStreamDataSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// kafka properties参数
val props = new Properties()
props.setProperty("bootstrap.servers", "172.16.57.101:9092,172.16.57.102:9092,172.16.57.103:9092")
props.setProperty("zookeeper.connect", "172.16.57.101:2181,172.16.57.102:2181,172.16.57.103:2181")
props.setProperty("group.id", "leslie")
props.setProperty("auto.offset.reset", "latest")
val original: DataStream[String] = env
.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), props))
original
.flatMap(_.split(","))
.map(_ + "_test")
.print()
env.execute("flink_streaming_data_source")
}
}
01_datasource_kafka.png
4 自定义DataSource
想要实现自定义的数据源十分简单,只需继承flink的SourceFunction接口并重写其中的run(),cancel()方法。
object FlinkStreamCustomerDataSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 自定义dataSource
env.addSource(new MysqlDataSource).print()
env.execute("customer_date_source")
}
}
下文代码MysqlDataSource类是继承了RichSourceFunction类,RichSourceFunction实现SourceFunction接口并同时继承AbstractRichFunction抽象类,AbstractRichFunction抽象类又实现RichFunction接口。为什么我们的自定义数据源要多继承RichSourceFunction类呢?原因就在这个Rich上!我们将这样带有"Rich"前缀的函数类称为富函数,既然是富函数了那么一定是比普通的函数多给我们带来一些功能。RichSourceFunction中的open(),close()就是“富”出来的方法,open()方法仅在函数类实例化的时候调用一次(通常用来建立连接),close()则是在实例对象销毁前调用一次(通常用来关闭连接),可以避免重复进行创建连接销毁连接操作。(当然富函数不仅仅只“富”了这么一点点,还“富”出来运行时上下文,这可是个好东西。此处不扩展哦,以后用到再来讨论)
02_rich_function.png我们需要进行获取外部存储组件数据的操作就在SourceFunction的run(),cancel()方法中实现。
class MysqlDataSource extends RichSourceFunction[String] {
private var pStmt: PreparedStatement = _
private var conn: Connection = _
// 开始方法,只执行一次,建立和mysql的连接
override def open(parameters: Configuration): Unit = {
Class.forName("com.mysql.jdbc.Driver")
val url = "jdbc:mysql://localhost:3306/test_for_mysql?useSSL=false"
val username = "root"
val password = "123456"
conn = DriverManager.getConnection(url, username, password);
val sql =
"""
|select id, name, sex, age from user;
|""".stripMargin
pStmt = conn.prepareStatement(sql)
}
// 结束方法,只执行一次,关闭连接
override def close(): Unit = {
// 关闭连接
if (pStmt != null) pStmt.close()
if (conn != null) conn.close()
}
// 主体执行方法
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
val rs: ResultSet = pStmt.executeQuery()
while (rs.next()) {
val id: Int = rs.getInt("id")
val name: String = rs.getString("name")
val sex: String = rs.getString("sex")
val age: Int = rs.getInt("age")
ctx.collect(s"id: $id, name: $name, sex: $sex, age:$age") // 收集记录到上下文中
}
}
override def cancel(): Unit = {}
}
02_customer_mysql.png
网友评论