美文网首页
flink的SQL处理kafka的JSON格式数据

flink的SQL处理kafka的JSON格式数据

作者: 万州客 | 来源:发表于2022-05-10 08:09 被阅读0次

这个跑通了呢~

一,代码

package org.bbk.flink


import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.api.{Table, Types}
import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
import org.apache.flink.table.sinks.CsvTableSink


object Demo {
  def main(args:Array[String]):Unit = {
    val envStream = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val tableEnv = StreamTableEnvironment.create(envStream)
    val kafka = new Kafka()
      .version("0.11")
      .topic("kafka_source_table")
      .startFromLatest()
      //.property("group.id", "test_group")
      .property("bootstrap.servers", "192.168.1.111:9092")
    val json = new Json().failOnMissingField(false).deriveSchema()
    val schema = new Schema()
      .field("userId", Types.INT)
      .field("day", Types.STRING)
      .field("begintime", Types.LONG)
      .field("endtime", Types.LONG)
    tableEnv
      .connect(kafka)
      .withFormat(json)
      .withSchema(schema)
      .inAppendMode()
      .registerTableSource("user_log")
    val table: Table = tableEnv.sqlQuery("select userId, 'day', begintime, endtime from user_log")
    table.printSchema()
    val sink = new CsvTableSink("D:\\tmp\\flink_kafka.csv", ",",1, WriteMode.OVERWRITE)
    tableEnv.registerTableSink("csvSink",
      Array[String]("f0", "f1", "f2", "f3"),
      Array[TypeInformation[_]](Types.INT, Types.STRING, Types.LONG, Types.LONG),
      sink)
    table.insertInto("csvSink")

    envStream.execute("kafkaSource")
  }
}

二,kafka测试数据
建kafka的topic
kafka-topics.sh --create --bootstrap-server localhost:9092 --topic kafka_source_table
生产者向kafka发送消息
kafka-console-producer.sh --topic kafka_source_table --broker-list localhost:9092

bash-5.1# kafka-console-producer.sh --topic kafka_source_table --broker-list localhost:9092
>{"userId": 1119, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1120, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1121, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1122, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1123, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}>>>>
>{"userId": 1119, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1120, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1121, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1122, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1123, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}>>>>
>{"userId": 1119, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1120, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1121, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1122, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}
{"userId": 1123, "day": "2017-03-02", "begintime": 1488326400000, "endtime": 1488327000000}>>>>

三,输出数据

image.png

相关文章

网友评论

      本文标题:flink的SQL处理kafka的JSON格式数据

      本文链接:https://www.haomeiwen.com/subject/uuphurtx.html