前言
最近在做一个用户画像的项目,我主要负责数据模块,架构大概是:
- 首先从各业务库和数仓接入数据,经过mapping处理后统一存储在hudi中,以database、table字段分区。
- 平台通过创建关联关系启动关联任务,flink流读hudi原始数据,关联上实体信息之后发送到下游kafka。
- 流处理程序接收携带实体信息的数据,以实体为单位构建用户特征数据,推荐系统中称之为user embedding,我们内部叫做pp数据。
- 算法团队根据特征数据和配置信息生产标签。
今天我要介绍的是原始数据接入及存储过程中的运维工作,因为所有的原始数据都存在一张hudi大表中,所以源数据的运维很重要,目前hudi源数据遇到的问题:
- 存储节点网络抖动引起hudi不可读,且程序自动重启后流读会从起点开始,导致大量数据回溯。
- hudi数据源来自于kafka,不支持严格意义上的回撤,所以有时候需要手动对hudi的数据进行物理删除。
下面以spark为例介绍下如何删除hudi中的脏数据,特别注意hudi的同一张表是不允许多个程序同是写的,在进行删除时需要停止别的写程序。
Save
package labeling
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
object TestSave {
val tableName = "ods_hudi"
val basePath = "file:///Users/dengpengfei/file/hudi/ods_hudi"
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.appName("labeling_data_ops")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val data = Seq(
("test_time:test_update:test001", "test001", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"),
("test_time:test_update:test002", "test002", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"),
("test_time:test_update:test003", "test003", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"),
("test_time:test_update:test004", "test004", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"),
("test_time:test_update:test005", "test005", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"),
("test_time:test_update:test006", "test006", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"),
("test_time:test_update:test007", "test007", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"),
("test_time:test_update:test008", "test008", "test2", "test_update_db", "{}", "1663836554036", "test_time", "test_time", "{}"))
val rdd = spark.sparkContext.parallelize(data)
val df = rdd.toDF("id","primary_key", "source_database", "source_table", "source_detail", "timestamp", "mapping_database", "mapping_table", "mapping_detail")
df.write.format("org.apache.hudi")
.option("hoodie.table.name", tableName)
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option("hoodie.datasource.write.storage.type", "MERGE_ON_READ")
.option("hoodie.datasource.write.precombine.field", "primary_key")
.option("hoodie.datasource.write.recordkey.field", "id")
.option("hoodie.datasource.write.partitionpath.field", "mapping_database, mapping_table")
.mode(SaveMode.Overwrite)
.save(basePath)
}
}
Query
package labeling
import org.apache.spark.sql.SparkSession
object TestQuery {
val tableName = "ods_hudi"
val basePath = "file:///Users/dengpengfei/file/hudi/ods_hudi"
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.hive.convertMetastoreParquet", "true")
.appName("labeling_data_ops")
.master("local[*]")
.getOrCreate()
val df = spark.read.format("org.apache.hudi").load(basePath)
df.createOrReplaceTempView(tableName)
spark.sql("select * from ods_hudi").show()
}
}
Delete
package labeling
import org.apache.spark.sql.{SaveMode, SparkSession}
object TestDelete {
val tableName = "ods_hudi"
val basePath = "file:///Users/dengpengfei/file/hudi/ods_hudi"
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.appName("labeling_data_ops")
.master("local[*]")
.getOrCreate()
val read_df = spark.read.format("org.apache.hudi").load(basePath)
read_df.createOrReplaceTempView("ods_hudi")
val del_df = spark.sql("select * from ods_hudi where id = 'test_time:test_update:test001'")
del_df.printSchema()
del_df.write.format("org.apache.hudi")
.option("hoodie.datasource.write.table.name", tableName)
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option("hoodie.datasource.write.storage.type", "MERGE_ON_READ")
.option("hoodie.datasource.write.operation", "delete")
.option("hoodie.datasource.write.precombine.field", "primary_key")
.option("hoodie.datasource.write.recordkey.field", "id")
.option("hoodie.datasource.write.partitionpath.field", "mapping_database,mapping_table")
.mode(SaveMode.Append)
.save(basePath)
}
}
以上在idea执行即可
集群执行命令
bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 2g \
--executor-memory 2g \
--executor-cores 10 \
--conf spark.dynamicAllocation.minExecutors=10 \
--queue t1 \
--conf spark.yarn.submit.waitAppCompletion=true \
--name labeling_dataops \
--class labeling.OriginDataOps \
/home/pengfei.dong/DataLakeOps-1.0-SNAPSHOT.jar
问题总结
- 技术选型上,hudi支持的比较多,flink sql、spark sql、但是由于版本迭代快的原因,会出现各种各样的问题,笔者目前采用的是Spark DataFrame API。
- avro依赖版本升级到1.1.1,否则会报org.apache.avro.AvroRuntimeException: Not a valid schema field。
- hudi地址为base_file + /*/*,有几层分区需要加几个星号。
- 注意hudi编译时采用的spark和scala的版本,由于hudi jar包名称上没有包含spark和scala的版本信息,所以使用时需要注意,本地我是自己重新打了个包,命令:mvn clean install -DskipTests -Dspark2.4 -Dscala-2.11。
- 如果打的时fat包,切记要将集群里的jar包移除,避免冲突,比如:spark jars目录下的hudi-spark-bundle_2.11-0.10.1.jar。
附录
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.evydtech</groupId>
<artifactId>DataLakeOps</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.version>2.11.12</scala.version>
<phase.scala.version>2.11</phase.scala.version>
<spark.version>2.4.8</spark.version>
<hadoop.version>2.7.3</hadoop.version>
<hudi.version>0.10.1</hudi.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-xml</artifactId>
<version>2.11.0-M4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${phase.scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--https://mvnrepository.com/artifact/org.apache.hudi/hudi-spark-->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark_${phase.scala.version}</artifactId>
<version>${hudi.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-avro -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${phase.scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-cli</artifactId>
<version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.github.scopt/scopt -->
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_${phase.scala.version}</artifactId>
<version>4.0.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.4</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>utf-8</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
<descriptorRef>src</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.0.1</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
</project>
网友评论