美文网首页
湖数据运维(Data Lake Ops)

湖数据运维(Data Lake Ops)

作者: Coder小咚 | 来源:发表于2022-10-06 11:01 被阅读0次

前言

最近在做一个用户画像的项目,我主要负责数据模块,架构大概是:

  • 首先从各业务库和数仓接入数据,经过mapping处理后统一存储在hudi中,以database、table字段分区。
  • 平台通过创建关联关系启动关联任务,flink流读hudi原始数据,关联上实体信息之后发送到下游kafka。
  • 流处理程序接收携带实体信息的数据,以实体为单位构建用户特征数据,推荐系统中称之为user embedding,我们内部叫做pp数据。
  • 算法团队根据特征数据和配置信息生产标签。

今天我要介绍的是原始数据接入及存储过程中的运维工作,因为所有的原始数据都存在一张hudi大表中,所以源数据的运维很重要,目前hudi源数据遇到的问题:

  • 存储节点网络抖动引起hudi不可读,且程序自动重启后流读会从起点开始,导致大量数据回溯。
  • hudi数据源来自于kafka,不支持严格意义上的回撤,所以有时候需要手动对hudi的数据进行物理删除。

下面以spark为例介绍下如何删除hudi中的脏数据,特别注意hudi的同一张表是不允许多个程序同是写的,在进行删除时需要停止别的写程序。

Save

package labeling

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession

object TestSave {

    val tableName = "ods_hudi"
    val basePath = "file:///Users/dengpengfei/file/hudi/ods_hudi"

    def main(args: Array[String]): Unit = {
        val spark = SparkSession
            .builder()
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .config("spark.sql.hive.convertMetastoreParquet", "false")
            .appName("labeling_data_ops")
            .master("local[*]")
            .getOrCreate()

        import spark.implicits._

        val data = Seq(
            ("test_time:test_update:test001", "test001", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"),
            ("test_time:test_update:test002", "test002", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"),
            ("test_time:test_update:test003", "test003", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"),
            ("test_time:test_update:test004", "test004", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"),
            ("test_time:test_update:test005", "test005", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"),
            ("test_time:test_update:test006", "test006", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"),
            ("test_time:test_update:test007", "test007", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"),
            ("test_time:test_update:test008", "test008", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"))


        val rdd = spark.sparkContext.parallelize(data)

        val df = rdd.toDF("id","primary_key", "source_database", "source_table", "source_detail", "timestamp", "mapping_database", "mapping_table", "mapping_detail")

        df.write.format("org.apache.hudi")
            .option("hoodie.table.name", tableName)
            .option("hoodie.insert.shuffle.parallelism", "2")
            .option("hoodie.upsert.shuffle.parallelism", "2")
            .option("hoodie.datasource.write.storage.type", "MERGE_ON_READ")
            .option("hoodie.datasource.write.precombine.field", "primary_key")
            .option("hoodie.datasource.write.recordkey.field", "id")
            .option("hoodie.datasource.write.partitionpath.field", "mapping_database, mapping_table")
            .mode(SaveMode.Overwrite)
            .save(basePath)

    }
}

Query

package labeling

import org.apache.spark.sql.SparkSession

object TestQuery {

    val tableName = "ods_hudi"
    val basePath = "file:///Users/dengpengfei/file/hudi/ods_hudi"

    def main(args: Array[String]): Unit = {
        val spark = SparkSession
            .builder()
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .config("spark.sql.hive.convertMetastoreParquet", "true")
            .appName("labeling_data_ops")
            .master("local[*]")
            .getOrCreate()

        val df = spark.read.format("org.apache.hudi").load(basePath)
        df.createOrReplaceTempView(tableName)
        spark.sql("select * from ods_hudi").show()
    }
}

Delete

package labeling

import org.apache.spark.sql.{SaveMode, SparkSession}

object TestDelete {

    val tableName = "ods_hudi"
    val basePath = "file:///Users/dengpengfei/file/hudi/ods_hudi"

    def main(args: Array[String]): Unit = {
        val spark = SparkSession
            .builder()
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .config("spark.sql.hive.convertMetastoreParquet", "false")
            .appName("labeling_data_ops")
            .master("local[*]")
            .getOrCreate()

        val read_df = spark.read.format("org.apache.hudi").load(basePath)

        read_df.createOrReplaceTempView("ods_hudi")

        val del_df = spark.sql("select * from ods_hudi where id = 'test_time:test_update:test001'")

        del_df.printSchema()

        del_df.write.format("org.apache.hudi")
            .option("hoodie.datasource.write.table.name", tableName)
            .option("hoodie.insert.shuffle.parallelism", "2")
            .option("hoodie.upsert.shuffle.parallelism", "2")
            .option("hoodie.datasource.write.storage.type", "MERGE_ON_READ")
            .option("hoodie.datasource.write.operation", "delete")
            .option("hoodie.datasource.write.precombine.field", "primary_key")
            .option("hoodie.datasource.write.recordkey.field", "id")
            .option("hoodie.datasource.write.partitionpath.field", "mapping_database,mapping_table")
            .mode(SaveMode.Append)
            .save(basePath)
    }
}

以上在idea执行即可

集群执行命令

bin/spark-submit  \
--master yarn  \
--deploy-mode cluster  \
--driver-memory 2g  \
--executor-memory 2g  \
--executor-cores 10  \
--conf spark.dynamicAllocation.minExecutors=10  \
--queue t1  \
--conf spark.yarn.submit.waitAppCompletion=true  \
--name  labeling_dataops  \
--class labeling.OriginDataOps  \
/home/pengfei.dong/DataLakeOps-1.0-SNAPSHOT.jar

问题总结

  • 技术选型上,hudi支持的比较多,flink sql、spark sql、但是由于版本迭代快的原因,会出现各种各样的问题,笔者目前采用的是Spark DataFrame API。
  • avro依赖版本升级到1.1.1,否则会报org.apache.avro.AvroRuntimeException: Not a valid schema field。
  • hudi地址为base_file + /*/*,有几层分区需要加几个星号。
  • 注意hudi编译时采用的spark和scala的版本,由于hudi jar包名称上没有包含spark和scala的版本信息,所以使用时需要注意,本地我是自己重新打了个包,命令:mvn clean install -DskipTests -Dspark2.4 -Dscala-2.11。
  • 如果打的时fat包,切记要将集群里的jar包移除,避免冲突,比如:spark jars目录下的hudi-spark-bundle_2.11-0.10.1.jar。

附录

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.evydtech</groupId>
    <artifactId>DataLakeOps</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.version>2.11.12</scala.version>
        <phase.scala.version>2.11</phase.scala.version>
        <spark.version>2.4.8</spark.version>
        <hadoop.version>2.7.3</hadoop.version>
        <hudi.version>0.10.1</hudi.version>
        <encoding>UTF-8</encoding>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-xml</artifactId>
            <version>2.11.0-M4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${phase.scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!--https://mvnrepository.com/artifact/org.apache.hudi/hudi-spark-->
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-spark_${phase.scala.version}</artifactId>
            <version>${hudi.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-avro -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_${phase.scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-cli</artifactId>
            <version>${hudi.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-avro</artifactId>
            <version>1.12.3</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.github.scopt/scopt -->
        <dependency>
            <groupId>com.github.scopt</groupId>
            <artifactId>scopt_${phase.scala.version}</artifactId>
            <version>4.0.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.6.4</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.6.4</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            <overWriteReleases>false</overWriteReleases>
                            <overWriteSnapshots>false</overWriteSnapshots>
                            <overWriteIfNewer>true</overWriteIfNewer>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                        <descriptorRef>src</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.0.1</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

相关文章

网友评论

      本文标题:湖数据运维(Data Lake Ops)

      本文链接:https://www.haomeiwen.com/subject/qoavartx.html