美文网首页
Azure Synapse 04

Azure Synapse 04

作者: 山猪打不过家猪 | 来源:发表于2023-08-11 06:08 被阅读0次

    WORDS

    1. 用requires 代替need 表示 依靠,依赖,需要
    2. persisted 持久化

    1 What's Synapse?

    Synapse 主要有几个功能:

    1. 集成分析,可以将gen2里的各类数据源整合在一起,如data lake, Nosql,使得用户可以从多个数据源中获取数据
    2. 数据仓库,提供了大规模数据存储和查询,采用了分布式架构
    3. 大数据分析:集成了apache spark引擎
    4. 实时数据处理:集成了kafka
    5. 高级的安全性和身份信息:数据加密,身份验证,访问控制


      image.png

    2 Sql data warehouse

    Host a data warehouse via the azure synapse service。
    sql database 和azure sql warehouse的区别:

    1. database适用于传统的web类似于 服务器上托管的sql2008,对于大数据查询性能不好,data warehouse则是分布式的,对大数据友好。
    2. 如果只是web的功能或者只是想进行表格的增删改查,sql database是个很好的选择
    3. 但是除了想增删改查,但是又想进行数据分析,这会给传统的database造成压力,所以data warehouse就是最好的选择
    4. 总而言之,只是进行简单的业务操作使用Sql, 但是既然想业务操作,又想数据分析则warehouse。
    image.png

    3 Create an Azure synapse

    创建synapse需要已经部署好的azure data lake gen2 storage account的账号,我们可以使用现有的,也可以创建一个新的


    image.png
    • 在部署时候,我们有两个选项,一个是sql pool 用来build the data warehouse,这里存放的数据往往都是干净的,清洗过后且直接可以用来数据分析或者是图像化的数据,因为这里是需要计算的,所以花费也是较高的。因为他是根据DWU计费,根据计算量付费和存储空间计费。
    • 还有一种是serverless sql pool 基于分析的是数据量进行计费。所以可以使用这个进行快速的临时性的数据分析, 例如在data lake 里的各种数据。注意,这里的数据是无法persisted的,这里处理过的数据,需要保存在sql pool 里面。

    4 使用synapse链接外部文件(09,10)

    回顾:首先你需要

    1. 一个azure sql database名字为sqldatabase01lg
    2. 创建一个data lake gen2 account 名字为datalake01lg,并且container里面创建两个containers 分别csv用来存放.csv文件,parquert用来存放二进制的qarquert文件,上穿Log.csv Log.qarquert


      image.png

    创建一个synapse(06,07)

    image.png
    • 在创建synapse的时候必须有gen2 account,可以使用现有的,也可以使用新建的。这里我们已经有一个之前创建的datalake01lg的gen2,但是, 我们现在为了演示从外部来获取数据,所以我们需要创建一个新的gen2 account,名字为datalake02lg
    • 在创建的过程中,他会默认创建一个sql server的服务,需要设置用户名和账号,这里的username是sqladmin.


      image.png

    use synapse (09,10)

    • 这里我们把之前创建的datalake01lg的gen2 account作为外部数据源,来进行数据的操作
    1. 选择添加外部数据源


      image.png
    2. 选择添加外部数据源的类型,这里是最前面的gen2 container里的数据


      image.png
      image.png
    3. 成功后,我们在Linked里面就可以找到,刚才添加的datalake01lg的外部数据源


      image.png
    4. Then, we clicked the csv and executed to query top 100 data, there was an error occurred,it means we do not have the authority to use this container.


      image.png
    5. datalake02lg needs to be authorized by datalake01lg to use the containers. Now go back to the datalake01lg , find the Access Control and add a role assessment.(give permissions to datalake02lg)


      image.png
    给外部account授权使用datalake01lg的权限
    • 找到blob reader


      image.png
    • 添加成员,并授权


      image.png
    1. 当我们授权给datalake02lg可以读取01里的Container的成功后,在执行上的query,就可以查询成功


      image.png

    使用外部数据的重要指令

    1. 创建外部数据源, Create external data source
    2. 指定外部数据源格式, Create external file format
    3. 创建外部数据表, Create external table
    • 在使用外部数据的时候,需要生成token


      image.png

    读取外部CSV文件(10)

    • 创建外部指令的代码
    CREATE DATABASE [appdb]; --创建appdb数据库
    
    CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'P@ssword@123' --主加密 密钥
    
    CREATE DATABASE SCOPED CREDENTIAL SasToken  --数据库范围凭证
    WITH IDENTITY= 'SHARED ACCESS SIGNATURE',
    SECRET = 'sv=2022-11-02&ss=b&srt=sco&sp=rlx&se=2023-08-11T09:42:48Z&st=2023-08-11T01:42:48Z&spr=https&sig=TBRMpm7332EjuSYSFC2LKMUTkcjB8dfi%2B3NTNsPhiRU%3D'
    
    
    CREATE EXTERNAL DATA SOURCE log_data -- 外部数据源
    WITH(LOCATION='https://datalake01lg.blob.core.windows.net/csv',  --读取外部数据源的位置,文件夹
        CREDENTIAL = SasToken ) -- 获取SasToken
    
    • 完成了上述的步骤,我们就已经创建了数据源,现在我们使用指令来,读取外部数据
    --创建外部文件源
    CREATE EXTERNAL FILE FORMAT TextFileFormat WITH(
        FORMAT_TYPE = DELIMITEDTEXT,
        FORMAT_OPTIONS(
            FIELD_TERMINATOR=',',
            FIRST_ROW = 2
        )
    )
    
    --创建外部表
    CREATE EXTERNAL TABLE [logdata]
    (
        [Correlation id] [varchar](200) NULL, -- 必须和csv文件里的表头一致
        [Operation name] [varchar](200) NULL,
        [Status] [varchar](100) NULL,
        [Event category] [varchar](100) NULL,
        [Level] [varchar](100) NULL,
        [Time] [datetime] NULL,
        [Subscription] [varchar](200) NULL,
        [Event initiated by] [varchar](1000) NULL,
        [Resource type] [varchar](1000) NULL,
        [Resource group] [varchar](1000) NULL,
        [Resource] [varchar](2000) NULL
    )
    WITH(
        LOCATION ='/Log.csv',  --上面地址的根目录下的文件
        DATA_SOURCE = log_data, --上面的创建的data source名
        FILE_FORMAT = TextFileFormat --上面指定的文件格式
    )
    
    SELECT * FROM [logdata]
    
    
    image.png

    读取外部parquet 文件

    • 我们可以使用上面的sastoken的代码,但是这样会出现问题,如果我们使用上面的sastoken的时候,这个token过期,那么我们就要生成新的token。
    1. drop 外部表
    2. drop 外部数据源
    3. drop 过期的token
    4. 用上面的方法重新生成新的sastoken
    DROP EXTERNAL TABLE [logdata]
    
    DROP EXTERNAL DATA SOURCE log_data
    
    DROP DATABASE SCOPED CREDENTIAL SasToken
    
    • DROP了过期的token后,我们去生成新的sastoken,然后进行parquet的读取,注意这里我们可以直接从设置数据库范围凭证开始,因为前面我们已经设置过了主加密密钥
    
    CREATE DATABASE SCOPED CREDENTIAL SasToken  --创建数据库代码凭证
    WITH IDENTITY= 'SHARED ACCESS SIGNATURE',
    SECRET = 'sv=2022-11-02&ss=b&srt=sco&sp=rlx&se=2023-08-11T10:15:25Z&st=2023-08-11T02:15:25Z&spr=https&sig=nzwVeqZUBPDHuQETSfeN0nIBy7mOAGYhXzWzXqVKAZ0%3D'
    
    
    CREATE EXTERNAL DATA SOURCE log_data_parquet -- 创建外部数据格式
    WITH(LOCATION='https://datalake01lg.blob.core.windows.net/parquet',  --读取外部数据源的位置,文件夹parquet
        CREDENTIAL = SasToken ) -- 获取SasToken
    
    
    --创建外部文件源parquet
    CREATE EXTERNAL FILE FORMAT ParquetFileFormat WITH(
        FORMAT_TYPE = PARQUET,
        DATA_COMPRESSION = 'org.apache.hadoop.io.compress.SnappyCodec'
        )
    
    
    --创建外部表 
    CREATE EXTERNAL TABLE [logdata_parquet]
    (
        [Correlation id] [varchar](200) NULL, -- 必须和csv文件里的表头一致
        [Operation name] [varchar](200) NULL,
        [Status] [varchar](100) NULL,
        [Event category] [varchar](100) NULL,
        [Level] [varchar](100) NULL,
        [Time] [varchar](100) NULL,
        [Subscription] [varchar](200) NULL,
        [Event initiated by] [varchar](1000) NULL,
        [Resource type] [varchar](1000) NULL,
        [Resource group] [varchar](1000) NULL,
        [Resource] [varchar](2000) NULL
    )
    WITH(
        LOCATION ='/Log.parquet',  --上面地址的根目录下的文件
        DATA_SOURCE = log_data_parquet, --上面的创建的data source名
        FILE_FORMAT = ParquetFileFormat --上面指定的文件格式
    )
    
    SELECT * FROM [logdata_parquet]
    
    
    image.png
    • 注意:和上面csv还是有区别的
    1. csv表中的Time是时间类型,而parquet里面都是字符串
    2. 其次在parquet文件中,列名是不能有空格的,所以这个结果下所有有空格的列的值都为空,所以删除列明中的空格就可以成功的显示数据

    创建一个SQL pool(016)

    • 没有sql pool之前 只有个build-in,此时我们需要创建一个新的pool


      image.png
    • 创建成功后,我们就可以看到


      image.png
    • 在synapse的Data里,我们就看到我们的pool,他和之前的sql serverless 不一样的原因是,serverless只有external表,而在pool里,不仅仅有exeternal table 他还有自己的Table


      image.png
    • 使用sql pool时候,因为我们已经创建过pooldb,所以不需要和serverless一样,创建一个新的库appdb,我们只需要将我的工作环境改为pooldb,类似于sql2008下的 use pooldb,其余代码一模一样


      image.png

    使用sql pool读取外部的csv表格(017)

    • 这里我们需要新的token,是datalake01lg的access keys


      image.png
    • 其次注意 LOCATION也发生了改变,其中,注意将blob改为dfs
    CREATE DATABASE SCOPED CREDENTIAL AzureStorageCredential
    WITH
      IDENTITY = 'datalake01lg',
      SECRET = 'ZuTNLYAYPx1gmjZHE2+pIizxi4zwOHBYX395HDVQ/wM47yfhHBEBppzMKVegfROIi99SCwCWD04F+AStQjvUxw==';
    
    
    CREATE EXTERNAL DATA SOURCE log_data
    WITH (    LOCATION   = 'abfss://csv@datalake01lg.dfs.core.windows.net/parquet',
              CREDENTIAL = AzureStorageCredential,
              TYPE = HADOOP
    )
    
    
    • SQL POOL charges for hours, so when we don't use it, we can stop and it means you can not use it to calculate ability. But the storage still costs a lot of money.

    在sql pool 里加载数据 019

    复制csv到sql pool里
    • 链接和选择我们自己的sql pool 然后通过建表语句创建一个新的表,我们可以在sql pool里 看到这个新建的表


      image.png
    • 直接从datalake01lg里的container复制csv表到sql pool里,这里From的链接是从container里获得的csv的链接,Storage Account Key是datalake01lg里的Access Key

    COPY INTO [pool_logdata] FROM 'https://datalake010lg.blob.core.windows.net/csv/Log.csv'
    WITH(
        FILE_TYPE='CSV',
        CREDENTIAL=(IDENTITY='Storage Account Key',SECRET='t7Wc/rXNJ5fXU45W6NVxMYUG3XG5ge8dUVlmvSbpMMGonY0puqMDuA4DIcGT3IwA6sYIqQSHcu9U+AStvZLNcw=='),
        FIRSTROW=2)
    
    SELECT * FROM [pool_logdata]
    
    • 复制成功后,我们就可以查询pool_logdata里的数据了


      image.png
    复制parquet到sql pool里
    • 首先,还是创建一个新的表名为pool_logdata_parquet到sql pool里
    CREATE TABLE [pool_logdata_parquet]
    (
        [Correlationid] [varchar](200) NULL,
        [Operationname] [varchar](200) NULL,
        [Status] [varchar](100) NULL,
        [Eventcategory] [varchar](100) NULL,
        [Level] [varchar](100) NULL,
        [Time] [varchar](500) NULL,
        [Subscription] [varchar](200) NULL,
        [Eventinitiatedby] [varchar](1000) NULL,
        [Resourcetype] [varchar](1000) NULL,
        [Resourcegroup] [varchar](1000) NULL,
        [Resource] [varchar](2000) NULL)
    
    • 复制parquet
    COPY INTO [pool_logdata_parquet] FROM 'https://datalake244434.blob.core.windows.net/parquet/log.parquet'
    WITH(
        FILE_TYPE='PARQUET',
        CREDENTIAL=(IDENTITY='Storage Account Key',SECRET='vDV2bSKSR44lbE6x05HtFz57DvlK3O2WNkb11te+H+GrBjeXCojnHjiTw3KdYBWXJRSAnOAZNdgB+AStAasz8w==') )
    
    SELECT * FROM [pool_logdata_parquet]
    
    SELECT * FROM [logdata_parquet]
    

    使用Bulk load 加载数据到表中022

    • Linked页面下选择csv文件,前提上面已经链接了外部数据库。


      image.png
    • 注意,我们在加载bulk load的时候,出现一个问题是的读取不到,原因是因为未授权,我们需要去datalake01lg里的Access Control来添加我们datalake02lg account的权限。


      image.png
    • 授权后,选择要处理的文件


      image.png
    • 选择导入的表


      image.png
    • 系统将会自动生成语句,其实这里使用了Azure synapse的pipline


      image.png

    使用PolyBase复制external数据到sql pool的表里023

    一种更加有效的方法来加载外部的数据到sql pool里
    之前的方法是1. 创建数据库凭证 2.创建外部数据源链接 3.创建外部表结构,这样就建立了外部数据的链接,
    如果我们想复用这些的话,可以使用下面代码,读取系统里的外部表,这样我们就可以很轻松的创建各种表,只用创建一次1,2,3

    SELECT * FROM sys.database_scoped_credentials  --读取外部数据凭证
    
    SELECT * FROM sys.external_data_sources  --查询外部数据源
    
    SELECT * FROM sys.external_file_formats --查询外部文件
    
    

    使用synapse的pipeline复制外部数据到pool里

    • 我们使用Integrate里的Copy Data tool


      image.png
    • 在Source里选需要链接的地方和文件


      image.png
    • 配置源文件的格式,这里我们是csv,且第一行是表头


      image.png
    • 选择数据表要复制的地方,这里我们选择pooldb里的已经存在的表


      image.png
    • 创建pipeline成功


      image.png
    • 成功后,我们可以在monitor里面看到我们的pipeline的状态而且pooldb的数据库也有了数据,说明我们的pipeline成功


      image.png

    使用pipeline复制database的数据到sql pool里

    • 创建一个Copy数据的pipeline,我们发现之前直接的下拉框找不到我们的database01lg的数据库,我们需要添加一个new connection


      image.png
    • 选择sql 的服务器和sql的数据库


      image.png
    • 后面和上面的一样最后,我们的pipeline运行成功


      image.png

    设计data warehouse

    • Fact Table: 实际用到的数据,类似于销售表,你需要的是货号,时间,销量
    • Dimension Table: 但是,知道了上面的销售情况,你需要类似于货号关联的商品信息,商品信息关联的客户表等等,这就是Dimension Table

    创建一个Fact table

    • 使用sql server 2008链接sql pool


      image.png
    • 链接sql database


      image.png
    • 执行连表查询


      image.png
    • 注意:我们在实际的生产中,为了不扰乱原来的数据,我们一般先生成一个临时的fact table,然后通过pipeline来获取这临时表里的数据

    创建一个Dimension表

    和上面的表一样,我们使用View试图创建表

    -- Lab - Building a dimension table
    -- Lets build a view for the customers
    
    CREATE VIEW Customer_view 
    AS
      SELECT ct.[CustomerID],ct.[CompanyName],ct.[SalesPerson]
      FROM [SalesLT].[Customer] as ct  
    
    -- Lets create a customer dimension table
    
    SELECT [CustomerID],[CompanyName],[SalesPerson]
    INTO DimCustomer
    FROM Customer_view 
    
    
    -- Let's build a view of the products
    
    CREATE VIEW Product_view 
    AS
    SELECT prod.[ProductID],prod.[Name] as ProductName,model.[ProductModelID],model.[Name] as ProductModelName,category.[ProductcategoryID],category.[Name] AS ProductCategoryName
    FROM [SalesLT].[Product] prod
    LEFT JOIN [SalesLT].[ProductModel] model ON prod.[ProductModelID] = model.[ProductModelID]
    LEFT JOIN [SalesLT].[ProductCategory] category ON prod.[ProductcategoryID]=category.[ProductcategoryID]
    
    -- Lets create a product dimension table
    
    SELECT [ProductID],[ProductModelID],[ProductcategoryID],[ProductName],[ProductModelName],[ProductCategoryName]
    INTO DimProduct
    FROM Product_view 
    
    -- If you want to drop the views and the tables
    
    DROP VIEW Customer_view 
    
    DROP TABLE DimCustomer
    
    DROP VIEW Product_view 
    
    DROP TABLE DimProduct
    
    

    从sqldatabase里迁移数据到sql pool里面

    • 此时,在sql database里,我们有1张Fact表和2张deminsion表


      image.png
    • 在synapse里的sql pool现在是没有任何表的


      image.png
    • 在synapse 里创建copy data的pipeline,和上面的几乎一致

    • pipeline完成后,可以看到pooldb里已经有了刚才的三张表


      image.png

    设计表

    • Hash Table可以提高大型表的查询性能
    • Round-Robin表可以提高数据的加载性能


      image.png

    创建一个哈希表(Hash Table)

    Fact table 使用哈希表, dimension table使用复制表

    • 创建一个基于CustomerID的hash distribution table
    CREATE TABLE [dbo].[FactSales](
        [ProductID] [int] NOT NULL,
        [SalesOrderID] [int] NOT NULL,
        [CustomerID] [int] NOT NULL,
        [OrderQty] [smallint] NOT NULL,
        [UnitPrice] [money] NOT NULL,
        [OrderDate] [datetime] NULL,
        [TaxAmt] [money] NULL
    )
    WITH  
    (   
        DISTRIBUTION = HASH (CustomerID)
    )
    
    • 创建好后,我们使用pipleline给上面的表导入数据

    创建一个复制表(replicated Tables)

    • 创建一个replicated table
    -- Replicated Tables
    
    CREATE TABLE [dbo].[FactSales](
        [ProductID] [int] NOT NULL,
        [SalesOrderID] [int] NOT NULL,
        [CustomerID] [int] NOT NULL,
        [OrderQty] [smallint] NOT NULL,
        [UnitPrice] [money] NOT NULL,
        [OrderDate] [datetime] NULL,
        [TaxAmt] [money] NULL
    )
    WITH  
    (   
        DISTRIBUTION = REPLICATE
    )
    
    

    使用surrrogate keys for dimension tables

    • 没使用surrogate key的表里面的ID是从1自增的


      image.png
    • 我们创建一个有surrogate key的表
    
    CREATE TABLE [dbo].[DimProduct](
        [ProductSK] [int] IDENTITY(1,1) NOT NULL,  -- surrogate key
        [ProductID] [int] NOT NULL,
        [ProductModelID] [int] NOT NULL,
        [ProductcategoryID] [int] NOT NULL,
        [ProductName] varchar(50) NOT NULL, 
        [ProductModelName] varchar(50) NULL,
        [ProductCategoryName] varchar(50) NULL
    )
    
    • 创建成功后,我们将数据迁移的sql pool里,发现key不是从1自增的,而是根据azure的分布来给的ID


      image.png

    Slowly Changing the dimension table

    • 我们再更改dimension table的时候,例如一个商品详情表的一个商品的名称,如果更改只是少量的一次性的,且以后不会在用到的时候,我们可以一次性更新。
      但是,如果我们想更新一下很多商品的价格,但是这个旧的价格,可能在以后数据分析中使用,那么我们就需要给他一个标识


      image.png

    Indexs

    • 在传统数据库中,我们使用索引是很常见的,为了提高查询效率
    • 在azure中,因为已经有个distribution ,所以在group by ,join的使用上效率大大提高。但是如果需要使用where进行过滤,我们则需要使用index

    创建聚集性索引(clustered index) 和非聚集性索引(none-clustered index)

    • 1.创建一个新的表pool_logdata,使用hash distribution
    CREATE TABLE [pool_logdata]
    (
        [Correlation id] [varchar](200) NULL,
        [Operation name] [varchar](200) NULL,
        [Status] [varchar](100) NULL,
        [Event category] [varchar](100) NULL,
        [Level] [varchar](100) NULL,
        [Time] [datetime] NULL,
        [Subscription] [varchar](200) NULL,
        [Event initiated by] [varchar](1000) NULL,
        [Resource type] [varchar](1000) NULL,
        [Resource group] [varchar](1000) NULL,
        [Resource] [varchar](2000) NULL
    )
    WITH  
    (   
        DISTRIBUTION = HASH ([Operation name])
    )
    
    
    • 2.使用之前旧的pipeline来转移数据


      image.png
    • 3.查看一下数据分布, 我们发现,数据被分在60个块里,但是有些rows是0,这就是数据倾斜
      DBCC PDW_SHOWSPACEUSED('[dbo].[pool_logdata]')
    
    image.png
    • 4.创造一个聚集性索引
    -- Note on creating Indexes
    
    CREATE TABLE [logdata]
    (
        [Correlation id] [varchar](200) NULL,
        [Operation name] [varchar](200) NULL,
        [Status] [varchar](100) NULL,
        [Event category] [varchar](100) NULL,
        [Level] [varchar](100) NULL,
        [Time] [datetime] NULL,
        [Subscription] [varchar](200) NULL,
        [Event initiated by] [varchar](1000) NULL,
        [Resource type] [varchar](1000) NULL,
        [Resource group] [varchar](1000) NULL,
        [Resource] [varchar](2000) NULL
    )
    WITH  
    (   
        DISTRIBUTION = HASH ([Operation name]),
        CLUSTERED INDEX ([Resource type]) -- 聚集性索引
    )
    
    
    CREATE INDEX EventCategoryIndex ON [logdata] ([Event category]) --非聚集性索引
    
    

    创建一个heap table 临时表

    少于6000w数据的临时表,使用heap table,大于这个数使用clustered columnstore table.

    -- Heap tables
    
    CREATE TABLE [logdata]
    (
        [Correlation id] [varchar](200) NULL,
        [Operation name] [varchar](200) NULL,
        [Status] [varchar](100) NULL,
        [Event category] [varchar](100) NULL,
        [Level] [varchar](100) NULL,
        [Time] [datetime] NULL,
        [Subscription] [varchar](200) NULL,
        [Event initiated by] [varchar](1000) NULL,
        [Resource type] [varchar](1000) NULL,
        [Resource group] [varchar](1000) NULL,
        [Resource] [varchar](2000) NULL
    )
    WITH  
    (   
        HEAP,
        DISTRIBUTION = ROUND_ROBIN
    )
    
    CREATE INDEX ResourceTypeIndex ON [logdata] ([Resource type])
    

    遍历分区表

    • 便利分区表可以提高带有where的语句查询效率,使用分区,可以使你的数据分层不同的区域,将数据划分呢成更加的分组 ,他和distribution是不同的。partitions一般是基于时间来进行的分区的,既如果经常使用到日期作为查询条件,那么partitions就很有效。支持 hash distribution tables, heap tables ,round robin tables,
      replicated tables, clustered column tables
    创建一个Partition table
    
    CREATE TABLE [logdata]
    (
        [Correlation id] [varchar](200) NULL,
        [Operation name] [varchar](200) NULL,
        [Status] [varchar](100) NULL,
        [Event category] [varchar](100) NULL,
        [Level] [varchar](100) NULL,
        [Time] [datetime] NULL,
        [Subscription] [varchar](200) NULL,
        [Event initiated by] [varchar](1000) NULL,
        [Resource type] [varchar](1000) NULL,
        [Resource group] [varchar](1000) NULL,
        [Resource] [varchar](2000) NULL
    )
    WITH  
    (   
        DISTRIBUTION = HASH ([Operation name]),
        PARTITION ( [Time] RANGE RIGHT FOR VALUES
                ('2023-01-01','2023-02-01','2023-03-01','2023-04-01'))
    )
    
    
    • 创建后的表的分区结构是


      image.png

    读取json数据

    • 给datalake01lg里的container加一个json

    相关文章

      网友评论

          本文标题:Azure Synapse 04

          本文链接:https://www.haomeiwen.com/subject/sextmdtx.html