1. 从s3加载数据
1. aws s3设置以及挂载bucket到snowflake里
- 挂载外部STAGE S3
CREATE STORAGE INTEGRATION s3_integration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam:role/snowflake-role' #s3里的arn
STORAGE_ALLOWED_LOCATIONS = ('s3://lgbucket101/snowflake/'); #folder文件夹
image.png
3.查看storage的属性
DESC INTEGRATION s3_integration
image.png
4.建立snowflake与s3的联系
在s3里添加③的参数
image.png
2. snowflake导入s3里的csv表
1.创建一个HEALTHCARE的表
- 创建csv的格式规则
create or replace file format demo_db.public.csv_format
type = csv
field_delimiter = '|'
skip_header = 1
null_if = ('NULL', 'null')
empty_field_as_null = true;
- 给外部表指定格式规则
create or replace stage demo_db.public.ext_csv_stage
URL = 's3://lgbucket101/snowflake/csv/health_pipe.csv'
STORAGE_INTEGRATION = s3_integration
file_format = demo_db.public.csv_format;
4.将s3的csv拷贝到snowflake的表里
-- Use copy command to ingest data from S3
copy into healthcare
from @demo_db.public.ext_csv_stage
on_error = CONTINUE;
3. snowflake导入s3里的parquet表
- 创建parquet的表
CREATE or replace TABLE HEALTHCARE_PARQUET(
AVERAGE_COVERED_CHARGES VARCHAR(150)
,AVERAGE_TOTAL_PAYMENTS VARCHAR(150)
,TOTAL_DISCHARGES VARCHAR(150)
,BACHELORORHIGHER VARCHAR(150)
,HSGRADORHIGHER VARCHAR(150)
,TOTALPAYMENTS VARCHAR(128)
,REIMBURSEMENT VARCHAR(128)
,TOTAL_COVERED_CHARGES VARCHAR(128)
,REFERRALREGION_PROVIDER_NAME VARCHAR(256)
,REIMBURSEMENTPERCENTAGE VARCHAR(150)
,DRG_DEFINITION VARCHAR(256)
,REFERRAL_REGION VARCHAR(26)
,INCOME_PER_CAPITA VARCHAR(150)
,MEDIAN_EARNINGSBACHELORS VARCHAR(150)
,MEDIAN_EARNINGS_GRADUATE VARCHAR(150)
,MEDIAN_EARNINGS_HS_GRAD VARCHAR(150)
,MEDIAN_EARNINGSLESS_THAN_HS VARCHAR(150)
,MEDIAN_FAMILY_INCOME VARCHAR(150)
,NUMBER_OF_RECORDS VARCHAR(150)
,POP_25_OVER VARCHAR(150)
,PROVIDER_CITY VARCHAR(128)
,PROVIDER_ID VARCHAR(150)
,PROVIDER_NAME VARCHAR(256)
,PROVIDER_STATE VARCHAR(128)
,PROVIDER_STREET_ADDRESS VARCHAR(256)
,PROVIDER_ZIP_CODE VARCHAR(150)
,filename VARCHAR
,file_row_number VARCHAR
,load_timestamp timestamp default TO_TIMESTAMP_NTZ(current_timestamp)
);
- 创建名为
ext_parquet_stage
的external stage
create or replace stage demo_db.public.ext_parquet_stage
URL = 's3://lgbucket101/snowflake/parquet/health.parquet'
STORAGE_INTEGRATION = s3_integration
file_format = demo_db.public.parquet_format;
3.复制s3的数据到snowflake的表里
copy into demo_db.public.healthcare_parquet
from (select
$1:average_covered_charges::varchar,
$1:average_total_payments::varchar,
$1:total_discharges::varchar,
$1:"PERCENT_Bachelor's_or_Higher"::varchar,
$1:percent_hs_grad_or_higher::varchar,
$1:total_payments::varchar,
$1:percent_reimbursement::varchar,
$1:total_covered_charges::varchar,
$1:referral_region_provider_name::varchar,
$1:reimbursement_percentage::varchar,
$1:drg_definition::varchar,
$1:referral_region::varchar,
$1:income_per_capita::varchar,
$1:median_earnings_bachelors::varchar,
$1:median_earnings_graduate::varchar,
$1:median_earnings_hs_grad::varchar,
$1:median_earnings_less_than_hs::varchar,
$1:median_family_income::varchar,
$1:number_of_records::varchar,
$1:pop_25_over::varchar,
$1:provider_city::varchar,
$1:provider_id::varchar,
$1:provider_name::varchar,
$1:provider_state::varchar,
$1:provider_street_address::varchar,
$1:provider_zip_code::varchar,
METADATA$FILENAME,
METADATA$FILE_ROW_NUMBER,
TO_TIMESTAMP_NTZ(current_timestamp)
from @demo_db.public.ext_parquet_stage);
-
查看我们已经创建的file format和external stage
image.png
4. 加载JSON表格
- 创建json_data的schema,类似于命名空间
create schema json_data
- 创建储存s3数据的表
health_json
CREATE or replace TABLE HEALTHCARE_JSON(
id VARCHAR(50)
,AVERAGE_COVERED_CHARGES VARCHAR(150)
,AVERAGE_TOTAL_PAYMENTS VARCHAR(150)
,TOTAL_DISCHARGES INTEGER
,BACHELORORHIGHER FLOAT
,HSGRADORHIGHER VARCHAR(150)
,TOTALPAYMENTS VARCHAR(128)
,REIMBURSEMENT VARCHAR(128)
,TOTAL_COVERED_CHARGES VARCHAR(128)
,REFERRALREGION_PROVIDER_NAME VARCHAR(256)
,REIMBURSEMENTPERCENTAGE VARCHAR(150)
,DRG_DEFINITION VARCHAR(256)
,REFERRAL_REGION VARCHAR(26)
,INCOME_PER_CAPITA VARCHAR(150)
,MEDIAN_EARNINGSBACHELORS VARCHAR(150)
,MEDIAN_EARNINGS_GRADUATE VARCHAR(150)
,MEDIAN_EARNINGS_HS_GRAD VARCHAR(150)
,MEDIAN_EARNINGSLESS_THAN_HS VARCHAR(150)
,MEDIAN_FAMILY_INCOME VARCHAR(150)
,NUMBER_OF_RECORDS VARCHAR(150)
,POP_25_OVER VARCHAR(150)
,PROVIDER_CITY VARCHAR(128)
,PROVIDER_ID VARCHAR(150)
,PROVIDER_NAME VARCHAR(256)
,PROVIDER_STATE VARCHAR(128)
,PROVIDER_STREET_ADDRESS VARCHAR(256)
,PROVIDER_ZIP_CODE VARCHAR(150)
,filename VARCHAR
,file_row_number VARCHAR
,load_timestamp timestamp default TO_TIMESTAMP_NTZ(current_timestamp)
);
- 创建json的format
--create json format
create or replace file format demo_db.public.json_format
type = 'json';
- 创建于S3的联系storage
create or replace stage demo_db.public.ext_json_stage
URL = 's3://lgbucket101/snowflake/json/healthcare_providers.json'
STORAGE_INTEGRATION = s3_integration
file_format = demo_db.public.json_format;
5.将数据写入到snowflake的表里
copy into demo_db.json_data.healthcare_json
from (select
$1:"_id"::varchar,
$1:" Average Covered Charges "::varchar,
$1:" Average Total Payments "::varchar,
$1:" Total Discharges "::integer,
$1:"% Bachelor's or Higher"::float,
$1:"% HS Grad or Higher"::varchar,
$1:"Total payments"::varchar,
$1:"% Reimbursement"::varchar,
$1:"Total covered charges"::varchar,
$1:"Referral Region Provider Name"::varchar,
$1:"ReimbursementPercentage"::varchar,
$1:"DRG Definition"::varchar,
$1:"Referral Region"::varchar,
$1:"INCOME_PER_CAPITA"::varchar,
$1:"MEDIAN EARNINGS - BACHELORS"::varchar,
$1:"MEDIAN EARNINGS - GRADUATE"::varchar,
$1:"MEDIAN EARNINGS - HS GRAD"::varchar,
$1:"MEDIAN EARNINGS- LESS THAN HS"::varchar,
$1:"MEDIAN_FAMILY_INCOME"::varchar,
$1:"Number of Records"::varchar,
$1:"POP_25_OVER"::varchar,
$1:"Provider City"::varchar,
$1:"Provider Id"::varchar,
$1:"Provider Name"::varchar,
$1:"Provider State"::varchar,
$1:"Provider Street Address"::varchar,
$1:"Provider Zip Code"::varchar,
METADATA$FILENAME,
METADATA$FILE_ROW_NUMBER,
TO_TIMESTAMP_NTZ(current_timestamp)
from @demo_db.public.ext_json_stage);
5. 使用Snowpipe service 自动加载s3的数据
自动加载csv
- 上面的external stage都是手动创建的,使用Snowpipe可以自动加载到snowflake里,近乎于实时;(题外:azure 实现实时传输)
和上面手动前面一样,不一样的是,在创建external_stage的时候,手动指向的是一个文件而snowpipe指向的是一个文件夹
1.创建external stage
create or replace stage ext_csv_stage_pipe
URL = 's3://lgbucket101/snowflake/csv'
STORAGE_INTEGRATION = s3_integration
file_format = csv_format;
2.创建snowpipe
--create pipe to automate data ingestion from s3 to snowflake
create or replace pipe demo_db.public.mypipe_csv auto_ingest=true as
copy into HEALTHCARE
from @demo_db.public.ext_csv_stage_pipe
on_error = CONTINUE;
3.查看pipe的状态并复制notification
show pipes
image.png
4.S3的bukect里设置,提醒snowflake,s3有文件,就可以传输,实现自动传输
image.png
image.png
5.设置成功后,就可以看到有事件了
image.png
6.我们给s3里上传一个新的health_care文件,此时,snowflake将自动加载该文件到database里
image.png
自动加载parquet
- 创建pipe stage
--创建parquet pipe的stage
create or replace stage ext_parquet_stage_pipe
URL = 's3://lgbucket101/snowflake/parquet'
STORAGE_INTEGRATION = s3_integration
file_format = parquet_format;
- 创建pipe
--create pipe to automate data ingestion from s3 to snowflake
create or replace pipe demo_db.public.mypipe_parqeut auto_ingest=true as
copy into demo_db.public.healthcare_parquet
from (select
$1:average_covered_charges::varchar,
$1:average_total_payments::varchar,
$1:total_discharges::varchar,
$1:"PERCENT_Bachelor's_or_Higher"::varchar,
$1:percent_hs_grad_or_higher::varchar,
$1:total_payments::varchar,
$1:percent_reimbursement::varchar,
$1:total_covered_charges::varchar,
$1:referral_region_provider_name::varchar,
$1:reimbursement_percentage::varchar,
$1:drg_definition::varchar,
$1:referral_region::varchar,
$1:income_per_capita::varchar,
$1:median_earnings_bachelors::varchar,
$1:median_earnings_graduate::varchar,
$1:median_earnings_hs_grad::varchar,
$1:median_earnings_less_than_hs::varchar,
$1:median_family_income::varchar,
$1:number_of_records::varchar,
$1:pop_25_over::varchar,
$1:provider_city::varchar,
$1:provider_id::varchar,
$1:provider_name::varchar,
$1:provider_state::varchar,
$1:provider_street_address::varchar,
$1:provider_zip_code::varchar,
METADATA$FILENAME,
METADATA$FILE_ROW_NUMBER,
TO_TIMESTAMP_NTZ(current_timestamp)
from @demo_db.public.ext_parquet_stage_pipe);
3.此时我们show pipes会发现,同一个bukect的notification是相同的,所以我们不需要在去s3里设置了
4.上传新的parqeut去S3里,就可以自动加载到snowflake了
自动加载json
1.json pipe的stage
create or replace stage ext_json_stage_pipe
URL = 's3://lgbucket101/snowflake/json'
STORAGE_INTEGRATION = s3_integration
file_format = json_format;
- 创建pipe
create or replace pipe demo_db.public.mypipe_json auto_ingest=true as
copy into demo_db.json_data.healthcare_json from (select
$1:"_id"::varchar,
$1:" Average Covered Charges "::varchar,
$1:" Average Total Payments "::varchar,
$1:" Total Discharges "::integer,
$1:"% Bachelor's or Higher"::float,
$1:"% HS Grad or Higher"::varchar,
$1:"Total payments"::varchar,
$1:"% Reimbursement"::varchar,
$1:"Total covered charges"::varchar,
$1:"Referral Region Provider Name"::varchar,
$1:"ReimbursementPercentage"::varchar,
$1:"DRG Definition"::varchar,
$1:"Referral Region"::varchar,
$1:"INCOME_PER_CAPITA"::varchar,
$1:"MEDIAN EARNINGS - BACHELORS"::varchar,
$1:"MEDIAN EARNINGS - GRADUATE"::varchar,
$1:"MEDIAN EARNINGS - HS GRAD"::varchar,
$1:"MEDIAN EARNINGS- LESS THAN HS"::varchar,
$1:"MEDIAN_FAMILY_INCOME"::varchar,
$1:"Number of Records"::varchar,
$1:"POP_25_OVER"::varchar,
$1:"Provider City"::varchar,
$1:"Provider Id"::varchar,
$1:"Provider Name"::varchar,
$1:"Provider State"::varchar,
$1:"Provider Street Address"::varchar,
$1:"Provider Zip Code"::varchar,
METADATA$FILENAME,
METADATA$FILE_ROW_NUMBER,
TO_TIMESTAMP_NTZ(current_timestamp)
from @demo_db.public.ext_json_stage_pipe);
3. Azure to Snowflake
1.创建azure的integration
方法1:没权限
--创建azure 的integration
CREATE STORAGE INTEGRATION azure_integration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = AZURE
ENABLED = TRUE
AZURE_TENANT_ID = 'c56908eb-1777-40d2-86f2-1d134e61a0fd'
STORAGE_ALLOWED_LOCATIONS = ('azure://snowflake101lg.blob.core.windows.net/snowflake');
image.png
2.查看integration
--查看integration
DESC INTEGRATION azure_integration
方法2:使用blob的access key来创建
1.azure 生成blob的access key
2.直接使用sas复制表,注意
?
要在复制的时候删除,任何格式都可以使用该方法复制
- 格式
COPY INTO snowflake表名
FROM `azure://文件在azure上的具体位置`
CREDENTIALS=(AZURE_SAS_TOKEN='azure blob里的sas token 无?')
FILE_FORMAT = 自定义的csv格式;
- 举例
COPY INTO HEALTHCARE
FROM 'azure://snowflake101lg.blob.core.windows.net/snowflake/csv/snowpipe_sample.csv'
CREDENTIALS=(AZURE_SAS_TOKEN='sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2024-02-18T11:15:28Z&st=2024-02-18T03:15:28Z&spr=https,http&sig=0QlPdlg5%2BCl1zXB0%2FkYtFmDOHKWcARuEmoZYAXRNLK4%3D')
FILE_FORMAT= csv_format;
2.snowpipe azure 自己完成
pass
网友评论