美文网首页
Snowflake(1)

Snowflake(1)

作者: 山猪打不过家猪 | 来源:发表于2024-02-16 09:57 被阅读0次

    1. 从s3加载数据

    1. aws s3设置以及挂载bucket到snowflake里

    1. 挂载外部STAGE S3
    CREATE STORAGE INTEGRATION s3_integration
      TYPE = EXTERNAL_STAGE
      STORAGE_PROVIDER = S3
      ENABLED = TRUE
      STORAGE_AWS_ROLE_ARN = 'arn:aws:iam:role/snowflake-role'  #s3里的arn
      STORAGE_ALLOWED_LOCATIONS = ('s3://lgbucket101/snowflake/'); #folder文件夹
    
    image.png

    3.查看storage的属性

    DESC INTEGRATION s3_integration
    
    image.png

    4.建立snowflake与s3的联系
    在s3里添加③的参数


    image.png

    2. snowflake导入s3里的csv表

    1.创建一个HEALTHCARE的表

    1. 创建csv的格式规则
    create or replace file format demo_db.public.csv_format
                        type = csv
                        field_delimiter = '|'
                        skip_header = 1
                        null_if = ('NULL', 'null')
                        empty_field_as_null = true;
    
    1. 给外部表指定格式规则
    create or replace stage demo_db.public.ext_csv_stage
      URL = 's3://lgbucket101/snowflake/csv/health_pipe.csv'
      STORAGE_INTEGRATION = s3_integration
      file_format = demo_db.public.csv_format;
    

    4.将s3的csv拷贝到snowflake的表里

    -- Use copy command to ingest data from S3
    copy into healthcare
    from @demo_db.public.ext_csv_stage
    on_error = CONTINUE;
    

    3. snowflake导入s3里的parquet表

    1. 创建parquet的表
    CREATE or replace TABLE HEALTHCARE_PARQUET(
        AVERAGE_COVERED_CHARGES    VARCHAR(150)  
       ,AVERAGE_TOTAL_PAYMENTS    VARCHAR(150)  
       ,TOTAL_DISCHARGES    VARCHAR(150)   
       ,BACHELORORHIGHER    VARCHAR(150)   
       ,HSGRADORHIGHER    VARCHAR(150)   
       ,TOTALPAYMENTS    VARCHAR(128)  
       ,REIMBURSEMENT    VARCHAR(128)  
       ,TOTAL_COVERED_CHARGES    VARCHAR(128) 
       ,REFERRALREGION_PROVIDER_NAME    VARCHAR(256)  
       ,REIMBURSEMENTPERCENTAGE    VARCHAR(150)   
       ,DRG_DEFINITION    VARCHAR(256)  
       ,REFERRAL_REGION    VARCHAR(26)  
       ,INCOME_PER_CAPITA    VARCHAR(150)   
       ,MEDIAN_EARNINGSBACHELORS    VARCHAR(150)   
       ,MEDIAN_EARNINGS_GRADUATE    VARCHAR(150) 
       ,MEDIAN_EARNINGS_HS_GRAD    VARCHAR(150)   
       ,MEDIAN_EARNINGSLESS_THAN_HS    VARCHAR(150) 
       ,MEDIAN_FAMILY_INCOME    VARCHAR(150)   
       ,NUMBER_OF_RECORDS    VARCHAR(150)  
       ,POP_25_OVER    VARCHAR(150)  
       ,PROVIDER_CITY    VARCHAR(128)  
       ,PROVIDER_ID    VARCHAR(150)   
       ,PROVIDER_NAME    VARCHAR(256)  
       ,PROVIDER_STATE    VARCHAR(128)  
       ,PROVIDER_STREET_ADDRESS    VARCHAR(256)  
       ,PROVIDER_ZIP_CODE    VARCHAR(150) 
       ,filename    VARCHAR
       ,file_row_number VARCHAR
       ,load_timestamp timestamp default TO_TIMESTAMP_NTZ(current_timestamp)
    );
    
    1. 创建名为ext_parquet_stage的external stage
    create or replace stage demo_db.public.ext_parquet_stage
      URL = 's3://lgbucket101/snowflake/parquet/health.parquet'
      STORAGE_INTEGRATION = s3_integration
      file_format = demo_db.public.parquet_format;
    

    3.复制s3的数据到snowflake的表里

    copy into demo_db.public.healthcare_parquet
    from (select 
    $1:average_covered_charges::varchar,
    $1:average_total_payments::varchar,
    $1:total_discharges::varchar,
    $1:"PERCENT_Bachelor's_or_Higher"::varchar,
    $1:percent_hs_grad_or_higher::varchar,
    $1:total_payments::varchar,
    $1:percent_reimbursement::varchar,
    $1:total_covered_charges::varchar,
    $1:referral_region_provider_name::varchar,
    $1:reimbursement_percentage::varchar,
    $1:drg_definition::varchar,
    $1:referral_region::varchar,
    $1:income_per_capita::varchar,
    $1:median_earnings_bachelors::varchar,
    $1:median_earnings_graduate::varchar,
    $1:median_earnings_hs_grad::varchar,
    $1:median_earnings_less_than_hs::varchar,
    $1:median_family_income::varchar,
    $1:number_of_records::varchar,
    $1:pop_25_over::varchar,
    $1:provider_city::varchar,
    $1:provider_id::varchar,
    $1:provider_name::varchar,
    $1:provider_state::varchar,
    $1:provider_street_address::varchar,
    $1:provider_zip_code::varchar,
    METADATA$FILENAME,
    METADATA$FILE_ROW_NUMBER,
    TO_TIMESTAMP_NTZ(current_timestamp)
    from @demo_db.public.ext_parquet_stage);
    
    1. 查看我们已经创建的file format和external stage


      image.png

    4. 加载JSON表格

    1. 创建json_data的schema,类似于命名空间
    create schema json_data
    
    1. 创建储存s3数据的表health_json
    
    CREATE or replace TABLE HEALTHCARE_JSON(
        id VARCHAR(50)
       ,AVERAGE_COVERED_CHARGES    VARCHAR(150)  
       ,AVERAGE_TOTAL_PAYMENTS    VARCHAR(150)  
       ,TOTAL_DISCHARGES    INTEGER
       ,BACHELORORHIGHER    FLOAT
       ,HSGRADORHIGHER    VARCHAR(150)   
       ,TOTALPAYMENTS    VARCHAR(128)  
       ,REIMBURSEMENT    VARCHAR(128)  
       ,TOTAL_COVERED_CHARGES    VARCHAR(128) 
       ,REFERRALREGION_PROVIDER_NAME    VARCHAR(256)  
       ,REIMBURSEMENTPERCENTAGE    VARCHAR(150)   
       ,DRG_DEFINITION    VARCHAR(256)  
       ,REFERRAL_REGION    VARCHAR(26)  
       ,INCOME_PER_CAPITA    VARCHAR(150)   
       ,MEDIAN_EARNINGSBACHELORS    VARCHAR(150)   
       ,MEDIAN_EARNINGS_GRADUATE    VARCHAR(150) 
       ,MEDIAN_EARNINGS_HS_GRAD    VARCHAR(150)   
       ,MEDIAN_EARNINGSLESS_THAN_HS    VARCHAR(150) 
       ,MEDIAN_FAMILY_INCOME    VARCHAR(150)   
       ,NUMBER_OF_RECORDS    VARCHAR(150)  
       ,POP_25_OVER    VARCHAR(150)  
       ,PROVIDER_CITY    VARCHAR(128)  
       ,PROVIDER_ID    VARCHAR(150)   
       ,PROVIDER_NAME    VARCHAR(256)  
       ,PROVIDER_STATE    VARCHAR(128)  
       ,PROVIDER_STREET_ADDRESS    VARCHAR(256)  
       ,PROVIDER_ZIP_CODE    VARCHAR(150) 
       ,filename    VARCHAR
       ,file_row_number VARCHAR
       ,load_timestamp timestamp default TO_TIMESTAMP_NTZ(current_timestamp)
    );
    
    1. 创建json的format
    --create json format
    create or replace file format demo_db.public.json_format
      type = 'json';
    
    
    1. 创建于S3的联系storage
    create or replace stage demo_db.public.ext_json_stage
      URL = 's3://lgbucket101/snowflake/json/healthcare_providers.json'
      STORAGE_INTEGRATION = s3_integration
      file_format = demo_db.public.json_format;
    

    5.将数据写入到snowflake的表里

    copy into demo_db.json_data.healthcare_json
    from (select 
    $1:"_id"::varchar,
    $1:" Average Covered Charges "::varchar,
    $1:" Average Total Payments "::varchar,
    $1:" Total Discharges "::integer,
    $1:"% Bachelor's or Higher"::float,
    $1:"% HS Grad or Higher"::varchar,
    $1:"Total payments"::varchar,
    $1:"% Reimbursement"::varchar,
    $1:"Total covered charges"::varchar,
    $1:"Referral Region Provider Name"::varchar,
    $1:"ReimbursementPercentage"::varchar,
    $1:"DRG Definition"::varchar,
    $1:"Referral Region"::varchar,
    $1:"INCOME_PER_CAPITA"::varchar,
    $1:"MEDIAN EARNINGS - BACHELORS"::varchar,
    $1:"MEDIAN EARNINGS - GRADUATE"::varchar,
    $1:"MEDIAN EARNINGS - HS GRAD"::varchar,
    $1:"MEDIAN EARNINGS- LESS THAN HS"::varchar,
    $1:"MEDIAN_FAMILY_INCOME"::varchar,
    $1:"Number of Records"::varchar,
    $1:"POP_25_OVER"::varchar,
    $1:"Provider City"::varchar,
    $1:"Provider Id"::varchar,
    $1:"Provider Name"::varchar,
    $1:"Provider State"::varchar,
    $1:"Provider Street Address"::varchar,
    $1:"Provider Zip Code"::varchar,
    METADATA$FILENAME,
    METADATA$FILE_ROW_NUMBER,
    TO_TIMESTAMP_NTZ(current_timestamp)
    from @demo_db.public.ext_json_stage);
    

    5. 使用Snowpipe service 自动加载s3的数据

    自动加载csv
    • 上面的external stage都是手动创建的,使用Snowpipe可以自动加载到snowflake里,近乎于实时;(题外:azure 实现实时传输)
      和上面手动前面一样,不一样的是,在创建external_stage的时候,手动指向的是一个文件而snowpipe指向的是一个文件夹

    1.创建external stage

    create or replace stage ext_csv_stage_pipe
      URL = 's3://lgbucket101/snowflake/csv'
      STORAGE_INTEGRATION = s3_integration
      file_format = csv_format;
    

    2.创建snowpipe

      --create pipe to automate data ingestion from s3 to snowflake
    create or replace pipe demo_db.public.mypipe_csv auto_ingest=true as
    copy into HEALTHCARE
    from @demo_db.public.ext_csv_stage_pipe
    on_error = CONTINUE;
    
    

    3.查看pipe的状态并复制notification

    show pipes
    
    image.png

    4.S3的bukect里设置,提醒snowflake,s3有文件,就可以传输,实现自动传输


    image.png
    image.png

    5.设置成功后,就可以看到有事件了


    image.png
    6.我们给s3里上传一个新的health_care文件,此时,snowflake将自动加载该文件到database里
    image.png
    自动加载parquet
    1. 创建pipe stage
    --创建parquet pipe的stage
    create or replace stage ext_parquet_stage_pipe
      URL = 's3://lgbucket101/snowflake/parquet'
      STORAGE_INTEGRATION = s3_integration
      file_format = parquet_format;
    
    1. 创建pipe
    --create pipe to automate data ingestion from s3 to snowflake
    create or replace pipe demo_db.public.mypipe_parqeut auto_ingest=true as
    copy into demo_db.public.healthcare_parquet
    from (select 
    $1:average_covered_charges::varchar,
    $1:average_total_payments::varchar,
    $1:total_discharges::varchar,
    $1:"PERCENT_Bachelor's_or_Higher"::varchar,
    $1:percent_hs_grad_or_higher::varchar,
    $1:total_payments::varchar,
    $1:percent_reimbursement::varchar,
    $1:total_covered_charges::varchar,
    $1:referral_region_provider_name::varchar,
    $1:reimbursement_percentage::varchar,
    $1:drg_definition::varchar,
    $1:referral_region::varchar,
    $1:income_per_capita::varchar,
    $1:median_earnings_bachelors::varchar,
    $1:median_earnings_graduate::varchar,
    $1:median_earnings_hs_grad::varchar,
    $1:median_earnings_less_than_hs::varchar,
    $1:median_family_income::varchar,
    $1:number_of_records::varchar,
    $1:pop_25_over::varchar,
    $1:provider_city::varchar,
    $1:provider_id::varchar,
    $1:provider_name::varchar,
    $1:provider_state::varchar,
    $1:provider_street_address::varchar,
    $1:provider_zip_code::varchar,
    METADATA$FILENAME,
    METADATA$FILE_ROW_NUMBER,
    TO_TIMESTAMP_NTZ(current_timestamp)
    from @demo_db.public.ext_parquet_stage_pipe);
    

    3.此时我们show pipes会发现,同一个bukect的notification是相同的,所以我们不需要在去s3里设置了
    4.上传新的parqeut去S3里,就可以自动加载到snowflake了

    自动加载json

    1.json pipe的stage

    create or replace stage ext_json_stage_pipe
      URL = 's3://lgbucket101/snowflake/json'
      STORAGE_INTEGRATION = s3_integration
      file_format = json_format;
    
    
    1. 创建pipe
    create or replace pipe demo_db.public.mypipe_json auto_ingest=true as
    copy into demo_db.json_data.healthcare_json from (select 
    $1:"_id"::varchar,
    $1:" Average Covered Charges "::varchar,
    $1:" Average Total Payments "::varchar,
    $1:" Total Discharges "::integer,
    $1:"% Bachelor's or Higher"::float,
    $1:"% HS Grad or Higher"::varchar,
    $1:"Total payments"::varchar,
    $1:"% Reimbursement"::varchar,
    $1:"Total covered charges"::varchar,
    $1:"Referral Region Provider Name"::varchar,
    $1:"ReimbursementPercentage"::varchar,
    $1:"DRG Definition"::varchar,
    $1:"Referral Region"::varchar,
    $1:"INCOME_PER_CAPITA"::varchar,
    $1:"MEDIAN EARNINGS - BACHELORS"::varchar,
    $1:"MEDIAN EARNINGS - GRADUATE"::varchar,
    $1:"MEDIAN EARNINGS - HS GRAD"::varchar,
    $1:"MEDIAN EARNINGS- LESS THAN HS"::varchar,
    $1:"MEDIAN_FAMILY_INCOME"::varchar,
    $1:"Number of Records"::varchar,
    $1:"POP_25_OVER"::varchar,
    $1:"Provider City"::varchar,
    $1:"Provider Id"::varchar,
    $1:"Provider Name"::varchar,
    $1:"Provider State"::varchar,
    $1:"Provider Street Address"::varchar,
    $1:"Provider Zip Code"::varchar,
    METADATA$FILENAME,
    METADATA$FILE_ROW_NUMBER,
    TO_TIMESTAMP_NTZ(current_timestamp)
    from @demo_db.public.ext_json_stage_pipe);
    

    3. Azure to Snowflake

    1.创建azure的integration

    方法1:没权限
    --创建azure 的integration
    CREATE STORAGE INTEGRATION azure_integration
      TYPE = EXTERNAL_STAGE
      STORAGE_PROVIDER = AZURE
      ENABLED = TRUE
      AZURE_TENANT_ID = 'c56908eb-1777-40d2-86f2-1d134e61a0fd'  
      STORAGE_ALLOWED_LOCATIONS = ('azure://snowflake101lg.blob.core.windows.net/snowflake');
    
    image.png

    2.查看integration

    --查看integration
    DESC INTEGRATION azure_integration
    
    方法2:使用blob的access key来创建

    1.azure 生成blob的access key

    image.png
    2.直接使用sas复制表,注意?要在复制的时候删除,任何格式都可以使用该方法复制
    • 格式
    COPY INTO snowflake表名
    FROM `azure://文件在azure上的具体位置`
    CREDENTIALS=(AZURE_SAS_TOKEN='azure blob里的sas token 无?')
    FILE_FORMAT = 自定义的csv格式;
    
    • 举例
    COPY INTO HEALTHCARE
      FROM 'azure://snowflake101lg.blob.core.windows.net/snowflake/csv/snowpipe_sample.csv'
      CREDENTIALS=(AZURE_SAS_TOKEN='sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2024-02-18T11:15:28Z&st=2024-02-18T03:15:28Z&spr=https,http&sig=0QlPdlg5%2BCl1zXB0%2FkYtFmDOHKWcARuEmoZYAXRNLK4%3D')
      FILE_FORMAT= csv_format;
    

    2.snowpipe azure 自己完成

    pass

    相关文章

      网友评论

          本文标题:Snowflake(1)

          本文链接:https://www.haomeiwen.com/subject/fdbyadtx.html