美文网首页
flink使用Table,SQL实现CSV查询和导出

flink使用Table,SQL实现CSV查询和导出

作者: 万州客 | 来源:发表于2022-05-16 15:21 被阅读0次

个人电脑,到这个阶段,stream的socket形式就不再测试了,直接进入kafka环境吧。

一, 代码

package org.bbk.flink


import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.api.{Table, Types}
import org.apache.flink.table.sinks.CsvTableSink


object Demo {
  def main(args:Array[String]):Unit = {
    val envStream = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val tableEnv = StreamTableEnvironment.create(envStream)
    val source = CsvTableSource.builder()
      .field("id", Types.INT)
      .field("name", Types.STRING)
      .field("age", Types.INT)
      .fieldDelimiter(",")
      .ignoreFirstLine()
      .ignoreParseErrors()
      .lineDelimiter("\r\n")
      .path("D:\\tmp\\count.txt")
      .build()
    tableEnv.registerTableSource("user", source)
    val result = tableEnv.scan("user").filter("age > 12")
    val sink = new CsvTableSink("D:\\tmp\\sink.csv", ",", 1, WriteMode.OVERWRITE)
    result.writeToSink(sink)
    envStream.execute()
  }
}

二,测试数据

1,jack,45
2,but,33
3,aka,12
4,bif,18
5,bb,88

三,输出数据

5,bb,88
2,but,33
4,bif,18

相关文章

网友评论

      本文标题:flink使用Table,SQL实现CSV查询和导出

      本文链接:https://www.haomeiwen.com/subject/snkhurtx.html