美文网首页
Flink SQL实战演练之Kafka To Hive

Flink SQL实战演练之Kafka To Hive

作者: Coder小咚 | 来源:发表于2021-12-23 17:43 被阅读0次

    前言

    实时数仓的构建一般是基于kafka的,一般分为ods层、dwd层和dws层。

    基于成本考虑,当前大数据架构多采用kappa架构,故kafka流表中每一层的数据都需要落地到hive中。

    Hive Catalog

    • hadoop & hadoop依赖添加
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>${hive.version}</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    
    • flink 集成 hive 时,不支持 embedded metastore,需要配置hive-site.xml
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://localhost:9083</value>
    </property>
    

    并在后台启动元数据服务,启动命令: nohup hive --service metastore >> ~/metastore.log 2>&1 &

    StreamingSql

    package com.dpf.flink;
    
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.configuration.RestOptions;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.SqlDialect;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.catalog.hive.HiveCatalog;
    
    /**
     * Skeleton for a Flink Streaming SQL.
     *
     * <p>For a tutorial how to write a Flink streaming application, check the
     * tutorials and examples on the <a href="https://flink.apache.org/docs/stable/">Flink Website</a>.
     *
     * <p>To package your application into a JAR file for execution, run
     * 'mvn clean package' on the command line.
     *
     * <p>If you change the name of the main class (with the public static void main(String[] args))
     * method, change the respective entry in the POM.xml file (simply search for 'mainClass').
     */
    public class StreamingSql {
    
        public static String hiveCatalog = "hive";
        public static String defaultCatalog = "default_catalog";
        public static String defaultDatabase = "default";
        public static String hiveConfDir = "/Users/dengpengfei/bigdata/apache-hive-3.1.2-bin/conf";
    
        public static void main(String[] args) {
            Configuration conf = new Configuration();
            conf.setString(RestOptions.BIND_PORT,"8081");
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            env.setParallelism(1);
            env.disableOperatorChaining();
            TableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            HiveCatalog hive = new HiveCatalog(hiveCatalog, defaultDatabase, hiveConfDir);
            tableEnv.registerCatalog(hiveCatalog, hive);
    
            String hiveTableSql = "" +
                    "CREATE TABLE IF NOT EXISTS hive_table (" +
                    "  user_id STRING," +
                    "  order_amount BIGINT" +
                    ") PARTITIONED BY (dt STRING, hour STRING, min STRING) STORED AS parquet " +
                    "TBLPROPERTIES (" +
                    "  'is_generic'='false'," +
                    "  'partition.time-extractor.timestamp-pattern'='$dt $hour:$min:00'," +
                    "  'sink.partition-commit.trigger'='partition-time'," +
                    "  'sink.partition-commit.delay'='1 min'," +
                    "  'sink.partition-commit.watermark-time-zone'='Asia/Shanghai'," +
                    "  'sink.partition-commit.policy.kind'='metastore,success-file'" +
                    ")";
    
            String kafkaTableSql = "" +
                    " CREATE TABLE IF NOT EXISTS kafka_table ( " +
                    "  user_id STRING," +
                    "  order_amount BIGINT," +
                    "  ts TIMESTAMP(3) METADATA FROM 'timestamp'," +
                    "  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND" +
                    " ) WITH ( " +
                    "  'connector' = 'kafka', " +
                    "  'topic' = 'source_order', " +
                    "  'scan.startup.mode' = 'earliest-offset', " +
                    "  'properties.zookeeper.connect' = '127.0.0.1:2181', " +
                    "  'properties.bootstrap.servers' = '127.0.0.1:9092', " +
                    "  'properties.group.id' = 'testGroup'," +
                    "  'format' = 'json'," +
                    "  'json.fail-on-missing-field' = 'false'," +
                    "  'json.ignore-parse-errors' = 'true'" +
                    " )";
    
            String etlSql = "" +
                    "INSERT INTO `hive`.`default`.`hive_table` " +
                    "SELECT user_id, order_amount, DATE_FORMAT(`ts`, 'yyyy-MM-dd'), DATE_FORMAT(`ts`, 'HH'), CONCAT(LPAD(DATE_FORMAT(`ts`, 'mm'), 1, '??'), '0') FROM kafka_table";
    
            // to user hive catalog
            tableEnv.useCatalog(hiveCatalog);
            // to use hive dialect
            tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
            tableEnv.executeSql(hiveTableSql);
    
            // to user default catalog
            tableEnv.useCatalog(defaultCatalog);
            // to use default dialect
            tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
            tableEnv.executeSql(kafkaTableSql);
    
            tableEnv.executeSql(etlSql);
        }
    }
    

    BatchSql

    package com.dpf.flink;
    
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.catalog.hive.HiveCatalog;
    
    /**
     * Skeleton for a Flink Batch Job.
     *
     * <p>For a tutorial how to write a Flink batch application, check the
     * tutorials and examples on the <a href="https://flink.apache.org/docs/stable/">Flink Website</a>.
     *
     * <p>To package your application into a JAR file for execution,
     * change the main class in the POM.xml file to this class (simply search for 'mainClass')
     * and run 'mvn clean package' on the command line.
     */
    public class BatchSql {
    
        public static String hiveCatalog = "hive";
        public static String defaultDatabase = "default";
        public static String hiveConfDir = "/Users/dengpengfei/bigdata/apache-hive-3.1.2-bin/conf";
    
        public static void main(String[] args) {
            EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
            TableEnvironment tableEnv = TableEnvironment.create(settings);
    
            HiveCatalog hive = new HiveCatalog(hiveCatalog, defaultDatabase, hiveConfDir);
            tableEnv.registerCatalog(hiveCatalog, hive);
    
            tableEnv.useCatalog(hiveCatalog);
    
            tableEnv.executeSql("select * from hive_table").print();
        }
    }
    

    本案例采用flink1.14开发,hive connector原理将在后续进行讲述!

    相关文章

      网友评论

          本文标题:Flink SQL实战演练之Kafka To Hive

          本文链接:https://www.haomeiwen.com/subject/dlvqqrtx.html