美文网首页
PySpark + MinIO + Hudi问题集锦

PySpark + MinIO + Hudi问题集锦

作者: shaun_x | 来源:发表于2024-03-10 23:48 被阅读0次

    示例代码

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName("MinioTest") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
        .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
        .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") \
        .getOrCreate()
    
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "xxxxx")
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "xxxxx")
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://127.0.0.1:9000")
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    spark.sparkContext._jsc.hadoopConfiguration().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
    df = spark.read.csv('s3a://data-warehouse/input.txt',header=True)
    df.show()
    df.select("name","age").write.parquet("s3a://data-warehouse/test.parquet", mode="overwrite")
    
    df = spark.read.parquet('s3a://data-warehouse/test.parquet')
    df.show()
    
    from pyspark.sql.functions import lit, col
    columns = ["ts","uuid","rider","driver","fare","city"]
    data =[(1695159649087,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
           (1695091554788,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
           (1695046462179,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
           (1695516137016,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"),
           (1695115999911,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai")]
    inserts = spark.createDataFrame(data).toDF(*columns)
    
    inserts.show()
    
    hudi_options = {
        'hoodie.table.name': 'huditable',
        'hoodie.datasource.write.recordkey.field': 'uuid',
        'hoodie.datasource.write.table.name': 'huditable',
        'hoodie.datasource.write.partitionpath.field': 'city',
        'hoodie.datasource.write.operation': 'insert',
        'hoodie.upsert.shuffle.parallelism': 2,
        'hoodie.insert.shuffle.parallelism': 2
    }
    
    inserts.write.format("hudi"). \
        options(**hudi_options). \
        mode("overwrite"). \
        save("s3a://data-warehouse/test-hudi2")
    

    问题集锦

    1. HTTP_PROXY / HTTPS_PROXY

    读minio csv文件没问题
    写parquet到minio没问题
    写hudi到本地磁盘也没问题
    写hudi到minio代码就会一直阻塞
    去掉环境变量 HTTP_PROXY / HTTPS_PROXY后重启notebook后正常

    相关文章

      网友评论

          本文标题:PySpark + MinIO + Hudi问题集锦

          本文链接:https://www.haomeiwen.com/subject/ohxmzdtx.html