这个跑通了呢~
一,代码
package org.bbk.flink
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.api.{Table, Types}
import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
import org.apache.flink.table.sinks.CsvTableSink
object Demo {
def main(args:Array[String]):Unit = {
val envStream = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val tableEnv = StreamTableEnvironment.create(envStream)
val kafka = new Kafka()
.version("0.11")
.topic("kafka_source_table")
.startFromLatest()
//.property("group.id", "test_group")
.property("bootstrap.servers", "192.168.1.111:9092")
val json = new Json().failOnMissingField(false).deriveSchema()
val schema = new Schema()
.field("userId", Types.INT)
.field("day", Types.STRING)
.field("begintime", Types.LONG)
.field("endtime", Types.LONG)
tableEnv
.connect(kafka)
.withFormat(json)
.withSchema(schema)
.inAppendMode()
.registerTableSource("user_log")
val table: Table = tableEnv.sqlQuery("select userId, 'day', begintime, endtime from user_log")
table.printSchema()
val sink = new CsvTableSink("D:\\tmp\\flink_kafka.csv", ",",1, WriteMode.OVERWRITE)
tableEnv.registerTableSink("csvSink",
Array[String]("f0", "f1", "f2", "f3"),
Array[TypeInformation[_]](Types.INT, Types.STRING, Types.LONG, Types.LONG),
sink)
table.insertInto("csvSink")
envStream.execute("kafkaSource")
}
}
二,kafka测试数据
建kafka的topic
kafka-topics.sh --create --bootstrap-server localhost:9092 --topic kafka_source_table
生产者向kafka发送消息
kafka-console-producer.sh --topic kafka_source_table --broker-list localhost:9092
bash-5.1# kafka-console-producer.sh --topic kafka_source_table --broker-list localhost:9092
>{"userId": 1119, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1120, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1121, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1122, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1123, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}>>>>
>{"userId": 1119, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1120, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1121, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1122, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1123, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}>>>>
>{"userId": 1119, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1120, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1121, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1122, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1123, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}>>>>
三,输出数据

网友评论