美文网首页程序员
Flink-SQL如何连接外部资源

Flink-SQL如何连接外部资源

作者: 梧上擎天 | 来源:发表于2019-03-12 17:42 被阅读0次

    Flink-SQL如何连接外部资源

    最近项目中需要把FlinkSQL对标SparkSQL做一套可视化页面,但网上针对Flink的相关博客很少,官网的例子给的也不太全。目前大多数人给的结论就是现在Flink Table/SQL的功能还不稳定,都在等待阿里的Blink和Flink合并后在使用。这篇我想用现在最新发行版1.7.2给大家点参考demo。

    目前官网支持的Connectors

    官网支持的连接器和对应依赖

    **注意 :目前1.72版本的CSV只支持批处理不支持流处理,如果流处理的环境使用会报下面的错误。
    Exception in thread "main"org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
    the classpath.

    Reason: No context matches.
    **

    编程范式

    tableEnvironment
      .connect(...)      //  需要传入继承ConnectorDescriptor的实现类 eg/ Kafka,FileSystem
      .withFormat(...)  // 需要传入继承FormatDescriptor的实现类 eg/ Json,Avor,Csv 
      .withSchema(...)  // 需要传入new Schema() 这里边的Schema是FIink注册表的字段和类型
      .inAppendMode()  
      /** 支持三种格式
        inAppendMode(只支持动态表的insert过来的数据)
        inRetractMode (支持动态刷新数据表 包括update 和 delete,但性能会受影响)
        inUpsertMode (因为操作的是单条数据,所以性能高于inRetractMode)
    **/
      .registerTableSource("MyTable") // 注册的表名称
    

    Example1 Kafka连接器 JSON -> CSV

    package mqz.connector;
    
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.core.fs.FileSystem;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.table.descriptors.Json;
    import org.apache.flink.table.descriptors.Kafka;
    import org.apache.flink.table.descriptors.Schema;
    import org.apache.flink.table.sinks.CsvTableSink;
    import org.apache.flink.types.Row;
    
    /**
     * @author maqingze
     * @version v1.0
     * @date 2019/3/7 11:24
     */
    public class KafkaConnectorFormatJSON2CSV {
        private final static String SOURCE_TOPIC = "source";
        private final static String SINK_TOPIC = "sink";
        private final static String ZOOKEEPER_CONNECT = "hadoop003:2181,hadoop004:2181";
        private final static String GROUP_ID = "group1";
        private final static String METADATA_BROKER_LIST = "hadoop003:9092,hadoop004:9092";
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
            tEnv.connect(
                new Kafka()
                    .version("0.10")
                    .topic(SOURCE_TOPIC)
                    .startFromEarliest()
                    .property("zookeeper.connect", ZOOKEEPER_CONNECT)
                    .property("bootstrap.servers", METADATA_BROKER_LIST)
            )
                .withFormat(
                new Json()
                    .schema(
                        org.apache.flink.table.api.Types.ROW(
                            new String[]{"id", "product", "amount"},
                            new TypeInformation[]{
                                org.apache.flink.table.api.Types.LONG()
                                , org.apache.flink.table.api.Types.STRING()
                                , org.apache.flink.table.api.Types.INT()
                            }))
                    .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default
                )
                .withSchema(
                    new Schema()
                        .field("id", Types.LONG)
                        .field("product", Types.STRING)
                        .field("amount", Types.INT)
                )
                .inAppendMode()
                .registerTableSource("sourceTable");
    
            Table result = tEnv.sqlQuery("select * from sourceTable ");
    
            DataStream<Row> rowDataStream = tEnv.toAppendStream(result, Row.class);
    
            rowDataStream.print();
    
            CsvTableSink sink = new CsvTableSink(
                "D:\\Aupload\\flink\\sink.csv",                  // 输出路径
                "|",                   // 字段分隔符
                1,                     // 写入的文件个数
                FileSystem.WriteMode.OVERWRITE);  // 是否覆盖原文件 还有NO_OVERWRITE模式
    
            tEnv.registerTableSink(
                "csvOutputTable",
                new String[]{"f0", "f1", "f2"},
                new TypeInformation[]{Types.LONG, Types.STRING, Types.INT},
                sink);
    
            result.insertInto("csvOutputTable");
    
            env.execute(" tesst kafka connector demo");
    
        }
    
    }
    
    

    Example2 Kafka连接器 JSON -> JSON

    package mqz.connector;
    
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.core.fs.FileSystem;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.table.descriptors.Json;
    import org.apache.flink.table.descriptors.Kafka;
    import org.apache.flink.table.descriptors.Schema;
    import org.apache.flink.table.sinks.CsvTableSink;
    import org.apache.flink.types.Row;
    
    /**
     * @author maqingze
     * @version v1.0
     * @date 2019/3/7 11:24
     */
    public class KafkaConnectorFormatJSON2JSON {
        private final static String SOURCE_TOPIC = "source";
        private final static String SINK_TOPIC = "sink";
        private final static String ZOOKEEPER_CONNECT = "hadoop003:2181,hadoop004:2181";
        private final static String GROUP_ID = "group1";
        private final static String METADATA_BROKER_LIST = "hadoop003:9092,hadoop004:9092";
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
            tEnv.connect(
                new Kafka()
                    .version("0.10")
                    .topic(SOURCE_TOPIC)
                    .startFromEarliest()
                    .property("zookeeper.connect", ZOOKEEPER_CONNECT)
                    .property("bootstrap.servers", METADATA_BROKER_LIST)
            )
                .withFormat(
                    new Json()
                        .schema(
                            org.apache.flink.table.api.Types.ROW(
                                new String[]{"id", "product", "amount"},
                                new TypeInformation[]{
                                    org.apache.flink.table.api.Types.LONG()
                                    , org.apache.flink.table.api.Types.STRING()
                                    , org.apache.flink.table.api.Types.INT()
                                }))
                        .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default
                )
                .withSchema(
                    new Schema()
                        .field("id", Types.LONG)
                        .field("product", Types.STRING)
                        .field("amount", Types.INT)
                )
                .inAppendMode()
                .registerTableSource("sourceTable");
    
            tEnv.connect(
                new Kafka()
                    .version("0.10")    // required: valid connector versions are
                    //   "0.8", "0.9", "0.10", "0.11", and "universal"
                    .topic(SINK_TOPIC)       // required: topic name from which the table is read
                    // optional: connector specific properties
                    .property("zookeeper.connect", ZOOKEEPER_CONNECT)
                    .property("bootstrap.servers", METADATA_BROKER_LIST)
                    .property("group.id", GROUP_ID)
                    // optional: select a startup mode for Kafka offsets
                    .startFromEarliest()
                    .sinkPartitionerFixed()         // each Flink partition ends up in at-most one Kafka partition (default)
            ).withFormat(
                new Json()
                    .schema(
                        org.apache.flink.table.api.Types.ROW(
                            new String[]{"yid", "yproduct", "yamount"},
                            new TypeInformation[]{
                                org.apache.flink.table.api.Types.LONG()
                                , org.apache.flink.table.api.Types.STRING()
                                , org.apache.flink.table.api.Types.INT()
                            }))
                    .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default
            )
                .withSchema(
                    new Schema()
                        .field("id", Types.LONG)
                        .field("product", Types.STRING)
                        .field("amount", Types.INT)
                )
                .inAppendMode()
                .registerTableSink("sinkTable");
    
    
            tEnv.sqlUpdate("insert into sinkTable(id,product,amount) select id,product,33 from sourceTable  ");
    
            env.execute(" tesst kafka connector demo");
    
        }
    
    }
    
    
    

    项目全代码

    GitHub

    相关文章

      网友评论

        本文标题:Flink-SQL如何连接外部资源

        本文链接:https://www.haomeiwen.com/subject/ndmnpqtx.html