美文网首页
openlake:搭建云原生数据湖

openlake:搭建云原生数据湖

作者: 大雄good | 来源:发表于2023-05-25 22:42 被阅读0次

    大数据和云原生一直都是基建方面的两个热点,而现在越来越多的大数据基建逐渐往云原生方向发展,例如云原生的消息队列Pulsar,又例如Snowflake提供云原生的数仓。因此笔者想要探索大数据和云原生的结合,于是发现了一个非常有意思的项目Openlake,该项目挂在minio下,是在kubernetes环境下,利用minion,spark,kafka,dremio,iceberg搭建一套数据湖,非常适合学习,本文主要就是记录搭建过程和心得。

    0.准备kubernetes环境

    如果已经有集群可以跳过本节,我这边想快速做实验所以采用docker-compose的方式在linux上搭建k3s

    安装docker compose,参考guide,执行如下命令:

    sudo curl -L "https://github.com/docker/compose/releases/download/1.25.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
    sudo chmod +x /usr/local/bin/docker-compose
    
    

    本地创建一个目录用于保存k3s的配置和数据:

    mkdir -p /home/xiowang/k3s
    mkdir -p /home/xiowang/k3s/data
    

    创建docker-compose.yaml用于k3s启动:

    #vim /home/xiowang/k3s/docker-compose.yaml
    version: '3'
    services:
      server:
        image: rancher/k3s:v1.20.2-k3s1
        container_name: k3s_server
        hostname: xiowang.dev
        command: server --tls-san=xiowang.dev
        tmpfs:
        - /run
        - /var/run
        privileged: true
        restart: always
        ports:
        - 6443:6443
        - 443:30443
        - 80:30080
        - 50050:30050
        - 50051:30051
        environment:
        - K3S_TOKEN=16963276443662
        - K3S_KUBECONFIG_OUTPUT=/root/.kube/config
        - K3S_KUBECONFIG_MODE=600
        volumes:
        - /var/lib/rancher/k3s:/var/lib/rancher/k3s
        - /etc/rancher:/etc/rancher
        - /home/xiowang/k3s/.kube:/root/.kube
        - /home/xiowang/k3s/data:/data:shared,rw
    

    上面的配置,我们主要做了:

    1. 通过本机的6443用于访问kubernetes的apiserver,方便kubectl进行管理;
    2. 通过本机的443和80分别映射集群的nodeport:30443和30080;
    3. 把kubeconfig保存到/home/xiowang/k3s/.kube;
    4. 挂载集群的/data目录到/home/xiowang/k3s/data

    开始启动k3s:

    cd /home/xiowang/k3s
    docker-compose up -d
    

    因为本机80和443对应集群的nodeport:30443和30080,所以这里改一下trafik的service,将其80和443的nodeport分别指向30080和30443:

    #kubectl -n kube-system edit svc traefik
    apiVersion: v1
    kind: Service
    metadata:
      annotations:
        meta.helm.sh/release-name: traefik
        meta.helm.sh/release-namespace: kube-system
      creationTimestamp: "2023-05-17T09:29:42Z"
      labels:
        app: traefik
        app.kubernetes.io/managed-by: Helm
        chart: traefik-1.81.0
        heritage: Helm
        release: traefik
      name: traefik
      namespace: kube-system
      resourceVersion: "368985"
      uid: 7bbb1758-ca01-4e84-b166-dae950613adf
    spec:
      clusterIP: 10.43.115.234
      clusterIPs:
      - 10.43.115.234
      externalTrafficPolicy: Cluster
      ports:
      - name: http
        nodePort: 30080
        port: 80
        protocol: TCP
        targetPort: http
      - name: https
        nodePort: 30443
        port: 443
        protocol: TCP
        targetPort: https
      selector:
        app: traefik
        release: traefik
      sessionAffinity: None
      type: LoadBalancer
    

    拷贝/home/xiowang/k3s/.kube/config到本机的~/.kube/config(建议使用kubecm来管理)

    cp /home/xiowang/k3s/.kube/config ~/.kube/config
    

    接下来我们就可以开始openlake之旅了

    1.安装和配置minio

    在kubernetes上安装minio,有两种推荐方式:

    1. operator的方式安装minio,参考guide
    2. helm安装minio,参考guide

    这里暂时没时间学习operator的CRD,所以采用helm安装minio,步骤如下

    helm repo add minio https://charts.min.io/
    #设置密码和磁盘大小,默认也没有tls(根据需求改一下,我就20Gi用于测试)
    helm install --namespace minio --set rootUser=rootuser,rootPassword=rootpass123,persistence.size=20Gi,resources.requests.memory=100Mi,resources.limits.memory=2Gi,replicas=3 one --create-namespace minio/minio
    

    成功部署日志:

    espace minio/minio
    NAME: one
    LAST DEPLOYED: Mon May 22 12:22:31 2023
    NAMESPACE: minio
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    NOTES:
    MinIO can be accessed via port 9000 on the following DNS name from within your cluster:
    one-minio.minio.svc.cluster.local
    
    To access MinIO from localhost, run the below commands:
    
      1. export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")
    
      2. kubectl port-forward $POD_NAME 9000 --namespace minio
    
    Read more about port forwarding here: http://kubernetes.io/docs/user-guide/kubectl/kubectl_port-forward/
    
    You can now access MinIO server on http://localhost:9000. Follow the below steps to connect to MinIO server with mc client:
    
      1. Download the MinIO mc client - https://min.io/docs/minio/linux/reference/minio-mc.html#quickstart
    
      2. export MC_HOST_one-minio-local=http://$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootUser}" | base64 --decode):$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootPassword}" | base64 --decode)@localhost:9000
    
      3. mc ls one-minio-local
    

    这里注意上面日志中的MC_HOST_one-minio-local环境变量名似乎是非法的,所以我换成了MC_HOST_one_minio_local:

      2. export MC_HOST_one_minio_local=http://$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootUser}" | base64 --decode):$(kubectl get secret --namespace minio one-minio -o jsonpath="{.data.rootPassword}" | base64 --decode)@localhost:9000
    
      3. mc ls one_minio_local
    

    若想访问minio的console控制台,则forward 9001端口,再用root账号登陆localhost:9001:

    export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")
    
    kubectl port-forward $POD_NAME 9001 --namespace minio
    

    而访问bucket则是9000端口

    export POD_NAME=$(kubectl get pods --namespace minio -l "release=one" -o jsonpath="{.items[0].metadata.name}")
    kubectl port-forward $POD_NAME 9000 --namespace minio
    

    2.搭建spark on k8s

    spark on k8s是google发起的一个开源项目(不是google的官方产品),使用operator对k8s的资源进行调度,方便spark对接k8s。

    spark on k8s采用helm安装,安装命令如下:

    helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
    helm install my-release spark-operator/spark-operator \
    --namespace spark-operator \
    --set webhook.enable=true \
    --set image.repository=openlake/spark-operator \
    --set image.tag=3.3.2 \
    --create-namespace
    

    验证部署结果:

    kubectl get pods -n spark-operator
    

    应该能看到my-release的pod:

    NAME                                           READY   STATUS      RESTARTS   AGE
    my-release-spark-operator-6547984586-xzw4p     1/1     Running     2          4d20h
    

    参考例子部署一个spark应用,保存为spark-pi.yaml用于计算pi(这里注意之前helm会在spark-operator的namespace中为serviceAccount: my-release-spark部署rbac,因此这里spark app都在spark-operator的namespace中,使用serviceAccount: my-release-spark运行):

    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
        name: pyspark-pi
        namespace: spark-operator
    spec:
        type: Python
        pythonVersion: "3"
        mode: cluster
        image: "openlake/spark-py:3.3.2"
        imagePullPolicy: Always
        mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
        sparkVersion: "3.3.2"
        restartPolicy:
            type: OnFailure
            onFailureRetries: 3
            onFailureRetryInterval: 10
            onSubmissionFailureRetries: 5
            onSubmissionFailureRetryInterval: 20
        driver:
            cores: 1
            coreLimit: "1200m"
            memory: "512m"
            labels:
                version: 3.3.2
            serviceAccount: my-release-spark
        executor:
            cores: 1
            instances: 1
            memory: "512m"
            labels:
                version: 3.3.2
    
    kubectl apply -f spark-pi.yaml
    

    查看sparkapp和pods:

    #kubectl -n spark-operator get sparkapp,pod
    NAME                                               STATUS      ATTEMPTS   START                  FINISH                 AGE
    sparkapplication.sparkoperator.k8s.io/pyspark-pi   COMPLETED   1          2023-05-22T06:43:24Z   2023-05-22T06:44:37Z   4m50s
    
    NAME                                               READY   STATUS      RESTARTS   AGE
    pod/my-release-spark-operator-webhook-init-xzx9c   0/1     Completed   0          4d20h
    pod/my-release-spark-operator-6547984586-xzw4p     1/1     Running     2          4d20h
    pod/pyspark-pi-driver                              0/1     Completed   0          4m46s
    

    查看最后20行log, 可以看到DAGScheduler调度完成的日志:

    #kubectl logs pyspark-pi-driver -n spark-operator --tail 10
    23/05/22 06:44:35 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
    23/05/22 06:44:35 INFO DAGScheduler: ResultStage 0 (reduce at /opt/spark/examples/src/main/python/pi.py:42) finished in 1.571 s
    23/05/22 06:44:35 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
    23/05/22 06:44:35 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
    23/05/22 06:44:35 INFO DAGScheduler: Job 0 finished: reduce at /opt/spark/examples/src/main/python/pi.py:42, took 1.615592 s
    Pi is roughly 3.145160
    23/05/22 06:44:35 INFO SparkUI: Stopped Spark web UI at http://pyspark-pi-60e9d1884232c31f-driver-svc.spark-operator.svc:4040
    23/05/22 06:44:35 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
    23/05/22 06:44:35 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
    23/05/22 06:44:35 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
    23/05/22 06:44:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    23/05/22 06:44:36 INFO MemoryStore: MemoryStore cleared
    23/05/22 06:44:36 INFO BlockManager: BlockManager stopped
    23/05/22 06:44:36 INFO BlockManagerMaster: BlockManagerMaster stopped
    23/05/22 06:44:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    23/05/22 06:44:36 INFO SparkContext: Successfully stopped SparkContext
    23/05/22 06:44:36 INFO ShutdownHookManager: Shutdown hook called
    23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-4e8e1507-e941-4437-b0b5-18818fc8865f/spark-4f604216-aeab-4679-83ce-f2527613ec66
    23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-00ddbc21-98da-4ad8-9905-fcf0e8d64129
    23/05/22 06:44:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-4e8e1507-e941-4437-b0b5-18818fc8865f/spark-4f604216-aeab-4679-83ce-f2527613ec66/pyspark-10201045-5307-40dc-b6b2-d7e5447763c4
    

    3.使用spark分析minio上的数据

    准备好dockerfile,因为和编译命令,因为后续会经常编辑py文件和推镜像.
    这里直接使用openlake原文的镜像作为baseimage,因为里面提前安装spark的各种依赖

    FROM openlake/sparkjob-demo:3.3.2
    
    WORKDIR /app
    
    COPY *.py .
    

    编译和push dockerimage命令如下(因为dockerhub经常挂,所以我在华为云上开了一个镜像仓库,可以根据自身情况修改一下):

    docker build -t xiowang/spark-minio:3.3.2 .
    docker tag xiowang/spark-minio:3.3.2 swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2
     docker push swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2
    

    下载纽约出租车司机数据(~112M rows and ~10GB in size):

    wget  https://data.cityofnewyork.us/api/views/t29m-gskq/rows.csv ./
    

    minio中创建bucket:

     mc mb one_minio_local/openlake/spark/sample-data
    

    拷贝出租车数据到minio

    mc cp rows.csv one_minio_local/openlake/spark/sample-data/
    

    进到jupyter中安装pyspark

    pip3 install pyspark
    

    在minio中创建对one_minio_local/openlake的读写权限的用户,并申请key和secret,如下

    export AWS_ACCESS_KEY_ID=3436ZpuHMvI5EEoR
    export AWS_SECRET_ACCESS_KEY=6US0FDsSFdlg5DzbWPPJtS1UeL75Rb0G
    export ENDPOINT=one-minio.minio:9000
    export OUTPUT_PATH=s3a://openlake/spark/result/taxi
    export INPUT_PATH=s3a://openlake/spark/sample-data/rows.csv
    

    创建k8s secret,保存上述信息:

    kubectl create secret generic minio-secret \
        --from-literal=AWS_ACCESS_KEY_ID=3436ZpuHMvI5EEoR \
        --from-literal=AWS_SECRET_ACCESS_KEY=6US0FDsSFdlg5DzbWPPJtS1UeL75Rb0G \
        --from-literal=ENDPOINT=http://one-minio.minio:9000 \
        --from-literal=AWS_REGION=us-east-1 \
        --namespace spark-operator
    

    部署sparkapp:

    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
        name: spark-minio
        namespace: spark-operator
    spec:
        type: Python
        pythonVersion: "3"
        mode: cluster
        image: "swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2"
        imagePullPolicy: Always
        mainApplicationFile: local:///app/main.py
        sparkVersion: "3.3.2"
        restartPolicy:
            type: OnFailure
            onFailureRetries: 3
            onFailureRetryInterval: 10
            onSubmissionFailureRetries: 5
            onSubmissionFailureRetryInterval: 20
        driver:
            cores: 1
            memory: "1024m"
            labels:
                version: 3.3.2
            serviceAccount: my-release-spark
            env:
                - name: INPUT_PATH
                  value: "s3a://openlake/spark/sample-data/rows.csv"
                - name: OUTPUT_PATH
                  value: "s3a://openlake/spark/result/taxi"
                - name: SSL_ENABLED
                  value: "true"
                - name: AWS_REGION
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: AWS_REGION
                - name: AWS_ACCESS_KEY_ID
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: AWS_ACCESS_KEY_ID
                - name: AWS_SECRET_ACCESS_KEY
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: AWS_SECRET_ACCESS_KEY
                - name: ENDPOINT
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: ENDPOINT
        executor:
            cores: 1
            instances: 3
            memory: "1024m"
            labels:
                version: 3.3.2
            env:
                - name: INPUT_PATH
                  value: "s3a://openlake/spark/sample-data/rows.csv"
                - name: OUTPUT_PATH
                  value: "s3a://openlake/spark/result/taxi"
                - name: SSL_ENABLED
                  value: "true"
                - name: AWS_REGION
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: AWS_REGION
                - name: AWS_ACCESS_KEY_ID
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: AWS_ACCESS_KEY_ID
                - name: AWS_SECRET_ACCESS_KEY
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: AWS_SECRET_ACCESS_KEY
                - name: ENDPOINT
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: ENDPOINT
    

    这个sparkapp中的python脚本内容如下,实际上做的就是统计每天超过6位乘客的信息:

    import logging
    import os
    
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType
    
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    logger = logging.getLogger("MinIOSparkJob")
    
    spark = SparkSession.builder.getOrCreate()
    
    
    def load_config(spark_context: SparkContext):
        spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"))
        spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
                                                     os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"))
        spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
        # spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", os.getenv("SSL_ENABLED", "true"))
        spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
        spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
        # spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
        spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")
    
    
    load_config(spark.sparkContext)
    
    
    # Define schema for NYC Taxi Data
    schema = StructType([
        StructField('VendorID', LongType(), True),
        StructField('tpep_pickup_datetime', StringType(), True),
        StructField('tpep_dropoff_datetime', StringType(), True),
        StructField('passenger_count', DoubleType(), True),
        StructField('trip_distance', DoubleType(), True),
        StructField('RatecodeID', DoubleType(), True),
        StructField('store_and_fwd_flag', StringType(), True),
        StructField('PULocationID', LongType(), True),
        StructField('DOLocationID', LongType(), True),
        StructField('payment_type', LongType(), True),
        StructField('fare_amount', DoubleType(), True),
        StructField('extra', DoubleType(), True),
        StructField('mta_tax', DoubleType(), True),
        StructField('tip_amount', DoubleType(), True),
        StructField('tolls_amount', DoubleType(), True),
        StructField('improvement_surcharge', DoubleType(), True),
        StructField('total_amount', DoubleType(), True)])
    
    # Read CSV file from MinIO
    df = spark.read.option("header", "true").schema(schema).csv(
        os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))
    # Filter dataframe based on passenger_count greater than 6
    large_passengers_df = df.filter(df.passenger_count > 6)
    
    total_rows_count = df.count()
    filtered_rows_count = large_passengers_df.count()
    # File Output Committer is used to write the output to the destination (Not recommended for Production)
    large_passengers_df.write.format("csv").option("header", "true").save(
        os.getenv("OUTPUT_PATH", "s3a://openlake-tmp/spark/nyc/taxis_small"))
    
    logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")
    logger.info(f"Total Rows for Passenger Count > 6: {filtered_rows_count}")
    

    如果上面的代码跑的有问题可以创建如下一个debug-pod,并进入pod进行debug

    apiVersion: v1
    kind: Pod
    metadata:
      name: debug-pod
      namespace: spark-operator
    spec:
      containers:
        - name: spark-minio
          image: swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2
          command: ["sleep"]
          args: ["infinity"]
          env:
              - name: INPUT_PATH
                value: "s3a://openlake/spark/sample-data/rows.csv"
              - name: OUTPUT_PATH
                value: "s3a://openlake/spark/result/taxi"
              - name: SSL_ENABLED
                value: "true"
              - name: AWS_REGION
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: AWS_REGION
              - name: AWS_ACCESS_KEY_ID
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: AWS_ACCESS_KEY_ID
              - name: AWS_SECRET_ACCESS_KEY
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: AWS_SECRET_ACCESS_KEY
              - name: ENDPOINT
                valueFrom:
                    secretKeyRef:
                        name: minio-secret
                        key: ENDPOINT
    

    最后查看日志,可以看到打印的日志

    #kubectl -n spark-operator logs spark-minio-driver
    2023-05-25 01:13:49,104 - MinIOSparkJob - INFO - Total Rows for NYC Taxi Data: 112234626
    2023-05-25 01:13:49,104 - MinIOSparkJob - INFO - Total Rows for Passenger Count > 6: 1066
    

    4.spark中使用iceberg分析minio上的数据

    iceberg是Netflix开源的一款软件,简单来说就是方便大数据工程师通过sql方式操作csv,parquet等文件,并且支持snapshot,具体可以见官网介绍。
    原文中的一些地址写死了,这里我修改了一下,保存为main-iceberg.py

    import logging
    import os
    from pyspark import SparkConf
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType
    
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    logger = logging.getLogger("MinIOSparkJob")
    
    
    # adding iceberg configs
    conf = (
        SparkConf()
        .set("spark.sql.extensions",
             "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") # Use Iceberg with Spark
        .set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
        .set("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
        .set("spark.sql.catalog.demo.warehouse", os.getenv("WAREHOUSE", "s3a://openlake/warehouse/"))
        .set("spark.sql.catalog.demo.s3.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
        .set("spark.sql.defaultCatalog", "demo") # Name of the Iceberg catalog
        .set("spark.sql.catalogImplementation", "in-memory")
        .set("spark.sql.catalog.demo.type", "hadoop") # Iceberg catalog type
        .set("spark.executor.heartbeatInterval", "300000")
        .set("spark.network.timeout", "400000")
    )
    
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    
    # Disable below line to see INFO logs
    spark.sparkContext.setLogLevel("ERROR")
    
    
    def load_config(spark_context: SparkContext):
        spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"))
        spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
                                                     os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"))
        spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "play.min.io:50000"))
        spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
        spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
        spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
        spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
        spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")
    
    
    load_config(spark.sparkContext)
    
    # Define schema for NYC Taxi Data
    schema = StructType([
        StructField('VendorID', LongType(), True),
        StructField('tpep_pickup_datetime', StringType(), True),
        StructField('tpep_dropoff_datetime', StringType(), True),
        StructField('passenger_count', DoubleType(), True),
        StructField('trip_distance', DoubleType(), True),
        StructField('RatecodeID', DoubleType(), True),
        StructField('store_and_fwd_flag', StringType(), True),
        StructField('PULocationID', LongType(), True),
        StructField('DOLocationID', LongType(), True),
        StructField('payment_type', LongType(), True),
        StructField('fare_amount', DoubleType(), True),
        StructField('extra', DoubleType(), True),
        StructField('mta_tax', DoubleType(), True),
        StructField('tip_amount', DoubleType(), True),
        StructField('tolls_amount', DoubleType(), True),
        StructField('improvement_surcharge', DoubleType(), True),
        StructField('total_amount', DoubleType(), True)])
    
    # Read CSV file from MinIO
    df = spark.read.option("header", "true").schema(schema).csv(
        os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))
    
    # Create Iceberg table "nyc.taxis_large" from RDD
    df.write.mode("overwrite").saveAsTable("nyc.taxis_large")
    
    # Query table row count
    count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
    total_rows_count = count_df.first().cnt
    logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")
    
    # Rename column "fare_amount" in nyc.taxis_large to "fare"
    spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN fare_amount TO fare")
    
    # Rename column "trip_distance" in nyc.taxis_large to "distance"
    spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN trip_distance TO distance")
    
    # Add description to the new column "distance"
    spark.sql(
        "ALTER TABLE nyc.taxis_large ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'")
    
    # Move "distance" next to "fare" column
    spark.sql("ALTER TABLE nyc.taxis_large ALTER COLUMN distance AFTER fare")
    
    # Add new column "fare_per_distance" of type float
    spark.sql("ALTER TABLE nyc.taxis_large ADD COLUMN fare_per_distance FLOAT AFTER distance")
    
    # Check the snapshots available
    snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
    snap_df.show()  # prints all the available snapshots (1 till now)
    
    # Populate the new column "fare_per_distance"
    logger.info("Populating fare_per_distance column...")
    spark.sql("UPDATE nyc.taxis_large SET fare_per_distance = fare/distance")
    
    # Check the snapshots available
    logger.info("Checking snapshots...")
    snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
    snap_df.show()  # prints all the available snapshots (2 now) since previous operation will create a new snapshot
    
    # Qurey the table to see the results
    res_df = spark.sql("""SELECT VendorID
                                ,tpep_pickup_datetime
                                ,tpep_dropoff_datetime
                                ,fare
                                ,distance
                                ,fare_per_distance
                                FROM nyc.taxis_large LIMIT 15""")
    res_df.show()
    
    # Delete rows from "fare_per_distance" based on criteria
    logger.info("Deleting rows from fare_per_distance column...")
    spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance > 4.0 OR distance > 2.0")
    spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance IS NULL")
    
    # Check the snapshots available
    logger.info("Checking snapshots...")
    snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots")
    snap_df.show()  # prints all the available snapshots (4 now) since previous operations will create 2 new snapshots
    
    # Query table row count
    count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
    total_rows_count = count_df.first().cnt
    logger.info(f"Total Rows for NYC Taxi Data after delete operations: {total_rows_count}")
    
    # Partition table based on "VendorID" column
    logger.info("Partitioning table based on VendorID column...")
    spark.sql("ALTER TABLE nyc.taxis_large ADD PARTITION FIELD VendorID")
    
    # Query Metadata tables like snapshot, files, history
    logger.info("Querying Snapshot table...")
    snapshots_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots ORDER BY committed_at")
    snapshots_df.show()  # shows all the snapshots in ascending order of committed_at column
    
    logger.info("Querying Files table...")
    files_count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large.files")
    total_files_count = files_count_df.first().cnt
    logger.info(f"Total Data Files for NYC Taxi Data: {total_files_count}")
    
    spark.sql("""SELECT file_path,
                        file_format,
                        record_count,
                        null_value_counts,
                        lower_bounds,
                        upper_bounds
                        FROM nyc.taxis_large.files LIMIT 1""").show()
    
    # Query history table
    logger.info("Querying History table...")
    hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
    hist_df.show()
    
    # Time travel to initial snapshot
    logger.info("Time Travel to initial snapshot...")
    snap_df = spark.sql("SELECT snapshot_id FROM nyc.taxis_large.history LIMIT 1")
    spark.sql(f"CALL demo.system.rollback_to_snapshot('nyc.taxis_large', {snap_df.first().snapshot_id})")
    
    # Qurey the table to see the results
    res_df = spark.sql("""SELECT VendorID
                                ,tpep_pickup_datetime
                                ,tpep_dropoff_datetime
                                ,fare
                                ,distance
                                ,fare_per_distance
                                FROM nyc.taxis_large LIMIT 15""")
    res_df.show()
    
    # Query history table
    logger.info("Querying History table...")
    hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history")
    hist_df.show()  # 1 new row
    
    # Query table row count
    count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large")
    total_rows_count = count_df.first().cnt
    logger.info(f"Total Rows for NYC Taxi Data after time travel: {total_rows_count}")
    

    创建sparkapp,保存为spark-iceberg-minio.yaml

    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
        name: spark-iceberg-minio
        namespace: spark-operator
    spec:
        type: Python
        pythonVersion: "3"
        mode: cluster
        image: "swr.cn-north-4.myhuaweicloud.com/xiowang/spark-minio:3.3.2"
        imagePullPolicy: Always
        mainApplicationFile: local:///app/main-iceberg.py
        sparkVersion: "3.3.2"
        restartPolicy:
            type: OnFailure
            onFailureRetries: 3
            onFailureRetryInterval: 10
            onSubmissionFailureRetries: 5
            onSubmissionFailureRetryInterval: 20
        driver:
            cores: 1
            memory: "1024m"
            labels:
                version: 3.3.2
            serviceAccount: my-release-spark
            env:
                - name: INPUT_PATH
                  value: "s3a://openlake/spark/sample-data/rows.csv"
                - name: OUTPUT_PATH
                  value: "s3a://openlake/spark/result/taxi"
                - name: SSL_ENABLED
                  value: "true"
                - name: WAREHOUSE
                  value: "s3a://openlake/warehouse"
                - name: AWS_REGION
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: AWS_REGION
                - name: AWS_ACCESS_KEY_ID
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: AWS_ACCESS_KEY_ID
                - name: AWS_SECRET_ACCESS_KEY
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: AWS_SECRET_ACCESS_KEY
                - name: ENDPOINT
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: ENDPOINT
        executor:
            cores: 1
            instances: 3
            memory: "1024m"
            labels:
                version: 3.3.2
            env:
                - name: INPUT_PATH
                  value: "s3a://openlake/spark/sample-data/rows.csv"
                - name: OUTPUT_PATH
                  value: "s3a://openlake/spark/result/taxi"
                - name: SSL_ENABLED
                  value: "true"
                - name: WAREHOUSE
                  value: "s3a://openlake/warehouse"
                - name: AWS_REGION
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: AWS_REGION
                - name: AWS_ACCESS_KEY_ID
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: AWS_ACCESS_KEY_ID
                - name: AWS_SECRET_ACCESS_KEY
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: AWS_SECRET_ACCESS_KEY
                - name: ENDPOINT
                  valueFrom:
                      secretKeyRef:
                          name: minio-secret
                          key: ENDPOINT
    

    小结

    本文主要参考openlake的guide体验了一下k8s环境下spark如何处理minio中的数据。能感受到spark对应的生态比较完善,对于结构化和半结构化的数据处理起来非常方便。当然也有一些不太习惯的地方,比如iceberg这些中间件都是通过jar包的方式被引入,而不是通过中间件服务,这就意味着更新中间件需要去更新容器镜像。

    另外openlake原文中还有dremio做查询层,利用kakfa进行spark stream处理的内容。感兴趣的同学可以去试一下。

    相关文章

      网友评论

          本文标题:openlake:搭建云原生数据湖

          本文链接:https://www.haomeiwen.com/subject/uxxkedtx.html