个人电脑,到这个阶段,stream的socket形式就不再测试了,直接进入kafka环境吧。
一, 代码
package org.bbk.flink
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.api.{Table, Types}
import org.apache.flink.table.sinks.CsvTableSink
object Demo {
def main(args:Array[String]):Unit = {
val envStream = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val tableEnv = StreamTableEnvironment.create(envStream)
val source = CsvTableSource.builder()
.field("id", Types.INT)
.field("name", Types.STRING)
.field("age", Types.INT)
.fieldDelimiter(",")
.ignoreFirstLine()
.ignoreParseErrors()
.lineDelimiter("\r\n")
.path("D:\\tmp\\count.txt")
.build()
tableEnv.registerTableSource("user", source)
val result = tableEnv.scan("user").filter("age > 12")
val sink = new CsvTableSink("D:\\tmp\\sink.csv", ",", 1, WriteMode.OVERWRITE)
result.writeToSink(sink)
envStream.execute()
}
}
二,测试数据
1,jack,45
2,but,33
3,aka,12
4,bif,18
5,bb,88
三,输出数据
5,bb,88
2,but,33
4,bif,18
网友评论