美文网首页
Azure data factory 05

Azure data factory 05

作者: 山猪打不过家猪 | 来源:发表于2023-08-13 06:26 被阅读0次

    创建data factory

    • 需要我们在azure里,有一个sql database, 一个sql server服务,一个gen2 data lake account


      image.png
    • 选择Data Factories服务并创建,创建成功后,我们launch进去,发现和synapse像


      image.png

    使用data factory 将datalake的数据transform到sql pool里

    1.创建Data source的链接


    image.png
    1. 创建成功,我们可以看到里面的内容和文件


      image.png

      3.创建Destination的链接


      image.png
    2. 后面和Syanpse的pipeline一样

    使用pipeline将csv文件转化为parquet 07

    1. 这次我们直接创建一个 新的pipeline


      image.png
    2. Source选择csv文件,Sink 需要我们新建一个gen2的任务并且将数据格式选为parquet
    image.png image.png
    1. 发布pipeline,在发布前记得validate,发布成功后,add trigger执行pipeline,会出现一个错误


      image.png

      4.上面错误的原因是,parquet的列名是不能有空格的,为了解决这个问题, 我们进入到刚才出错的pipeline里,找到mapping,这里的Mapping是azure自动给我们分配的表格的对应结构,我们修改里面的parquet的列名去掉空格


      image.png
    2. 修改完成后,validata成功后,然后publish,成功


      image.png

    执行多个步骤的pipeline

    我们的需求是:将datalake里的csv文件转换成parquet后,存入container的parquet文件夹后,在自动进行将这个parquet文件导入到sql pool里的pool_logdata_qarquet表里。我们将复用上面的已经创建好的转换格式的pipeline

    1. 首先,删除datalake01lg的container的parquet文件,确保我们pipeline1是有用的
    2. 我们在刚才的pipeline界面里,添加一个新的pipeline


      image.png

      4.设置Source,注意这里的source dataset不是具体的文件,因为在执行这2个pipeline前,还没有文件生成,所以这里的Source只是一个地址,指向了container里的parquet文件夹,然后我们选择woldcard file,来通配里面的parquet文件,因为这里只有一个logparquet,实际中,会有多个文件,慎重使用通配符


      image.png
    3. 设置Sink


      image.png

      6.设置成功后我们建立链接,pipeline1成功后,在执行pipeline2


      image.png
    4. publish and add trigger,成功后,我们可以看到datalake01lg里的log.csv,成功的转换了parquet格式,并且存储在了container的parquet文件夹下,并且这个parquet文件也被迁移到了synapse里pool的logdata_parquet表里


      image.png

    自定义我们需要的数据类型,在传输的过程中

    上面的传输,我们都使用了默认的格式,可以看出Time时间类型,在这里被默认成了string类型,

    1. 修改csv文件转parquet时后的Time类型,改为DateTime


      image.png
    2. 删除pool里的dlogdata_parquet表,创建新的表,将Time字段改为DateTime


      image.png

      3.建表语句

    CREATE TABLE [logdata_parquet]
    (
        [Correlationid] [varchar](200) NULL,
        [Operationname] [varchar](200) NULL,
        [Status] [varchar](100) NULL,
        [Eventcategory] [varchar](100) NULL,
        [Level] [varchar](100) NULL,
        [Time] [datetime] NULL,
        [Subscription] [varchar](200) NULL,
        [Eventinitiatedby] [varchar](1000) NULL,
        [Resourcetype] [varchar](1000) NULL,
        [Resourcegroup] [varchar](1000) NULL,
        [Resource] [varchar](2000) NULL
    )
    
    1. 然后发布pipeline,然后trigger,这样2个pipeline都运行成功,container里有了log.parquet 且被传到了sql pool里

    使用query来进行数据Transfer 012

    1.建立于sqldatabase的链接,
    2.在sql pool里面创建factSales的事实表

    1. 创建pipeline,使用query


      image.png
    2. 执行pipeline,数据迁移成功


      image.png

    使用copy command 复制数据到synapse 015

    • 使用copy command


      image.png

    使用ploybase 复制数据到synapse 016

    1. 选择使用ploybase


      image.png

      2.在datalake的container里创建一个缓存文件夹staging


      image.png
      3.修改setting,添加刚才的staging
      image.png

    Mapping data flows 017-019

    我们在进行复杂的transformation的时候可以使用这个功能

    使用mapping data flows 迁移fact table的数据 018

    我们根据我们的刚才创建FactSales表的query来创建Data flow

    SELECT dt.[ProductID],dt.[SalesOrderID],dt.[OrderQty],dt.[UnitPrice],hd.[OrderDate],hd.[CustomerID],hd.[TaxAmt]
      FROM [SalesLT].[SalesOrderDetail] dt
      LEFT JOIN [SalesLT].[SalesOrderHeader] hd
      ON dt.[SalesOrderID]=hd.[SalesOrderID]
    
    1. 添加一个source data 从detail表里


      image.png
    2. 添加left jion的另外一个表作为source data


      image.png
    3. 加入一个join


      image.png
    4. 设置join


      image.png
    5. 添加sink,就是destination table


      image.png

      6.设置sink里的Mapping


      image.png
    6. 设置完成后,点击validate all,确认无误,然后点击publish
    7. running我们的mapping data flow
    8. 创建一个pipeline,这里选择的是Data flow


      image.png

      10.设置Settings


      image.png
      11.设置完成,validate成功,publish,trigger的时候,会花很多的时间,相对于普通的transfer数据用时几秒,这里花费很多时间的原因是因为她再创建spark 集群服务
    • 为什么使用mapping data flow呢,因为它可以在不改变原始数据的情况下,进行数据传输,给了我们更加灵活的取数方法

    Mapping data flow 迁移Dimension Table 020-021

    DimCustomer表
    1. 在sql pool里创建一个dimension table DimCustomer
    CREATE TABLE [dbo].[DimCustomer](
        [CustomerID] [int] NOT NULL,
        [CompanyName] varchar(200) NOT NULL,
        [SalesPerson] varchar(300) NOT NULL
    )
        WITH  
    (   
        DISTRIBUTION = REPLICATE
    )   
    
    1. 创建一个新的data flow,添加source 数据


      image.png
    2. 添加一个sink(destination),注意里面的mapping,表头的对应关系


      image.png
    3. validate全部没错,点击Publish,发布成功,注意此时并没有执行,表里是没数据的
    4. 添加一个data flow的pipeline,并且add trigger


      image.png
    DimProduct 表

    1.在synapse 的Pool 里创建一个表,这里使用都是REPLICATE,因为他是dimension table

    CREATE TABLE [dbo].[DimProduct](
        [ProductID] [int] NOT NULL,
        [ProductModelID] [int] NOT NULL,
        [ProductcategoryID] [int] NOT NULL,
        [ProductName] varchar(50) NOT NULL, 
        [ProductModelName] varchar(50) NULL,
        [ProductCategoryName] varchar(50) NULL
    )
    WITH  
    (   
        DISTRIBUTION = REPLICATE
    )   
    
    1. 根据下面sql创建我们的data flow
    SELECT prod.[ProductID],prod.[Name] as ProductName,model.[ProductModelID],model.[Name] as ProductModelName,category.[ProductcategoryID],category.[Name] AS ProductCategoryName
    FROM [SalesLT].[Product] prod
    LEFT JOIN [SalesLT].[ProductModel] model ON prod.[ProductModelID] = model.[ProductModelID]
    LEFT JOIN [SalesLT].[ProductCategory] category ON prod.[ProductcategoryID]=category.[ProductcategoryID]
    

    2.1 创建一个Product 的source
    2.2 创建一个ProductModel的source
    2.3创建一个ProductCategory的source


    image.png
    1. 根据FROM [SalesLT].[Product] prod LEFT JOIN [SalesLT].[ProductModel] model ON prod.[ProductModelID] = model.[ProductModelID]创建一个left join
      image.png
      4.再根据LEFT JOIN [SalesLT].[ProductCategory] category ON prod.[ProductcategoryID]=category.[ProductcategoryID] 创建另外一个left join
      image.png
      5.创建一个sink
      image.png
    2. 注意:每次添加sink的时候,都需要调整Mapping


      image.png

      7.最后,创建pipeline,注意,这里我们是两个Data flow的pipeline,所以,在trigger这个pipeline的时候,里面的所有data flow都会执行

    添加一个derived column(派生列) 022

    • 我们可以在原来的Stream添加,也可以在新建的stream里添加。
    1. 创建一个新的FactSales表,增加TotalAmount 列
    -- Lab - Derived Column
    
    CREATE TABLE [dbo].[FactSales](
        [ProductID] [int] NOT NULL,
        [SalesOrderID] [int] NOT NULL,
        [CustomerID] [int] NOT NULL,
        [OrderQty] [smallint] NOT NULL,
        [UnitPrice] [money] NOT NULL,
        [OrderDate] [datetime] NULL,
        [TaxAmt] [money] NULL,
        [TotalAmount] [money] NOT NULL
    )
    WITH  
    (   
        DISTRIBUTION = HASH (CustomerID)
    )
    

    2.在之前生成FactSales的Steam里的Sink之前,添加一个select column来选择我们需要的列


    image.png

    3.设置我们需要的列,这里有个技巧,先全选,然后使用filter 选掉我们需要的列,然后删除不需要的


    image.png

    4.在这个之后,添加一个derived column


    image.png

    5.添加一个需要的列和操作,这里我们需要使用价格*数量,算出总额


    image.png

    6.到sink里,添加新的列,这里注意,我们需要重新获取一下表格的schema,不然totoalAmount列出不来

    image.png
    1. 这样sink里的就有了新的列TotalAmount


      image.png
    2. validate all 然后publish,最后add trigger,总额计算出来


      image.png

    在Dimention Table里创建 surrogate keys 026 (未完待续)

    1. 修改DimProduct,添加一个surrogate key名为Product SK
    CREATE TABLE [dbo].[DimProduct](
        [ProductSK] [int] NOT NULL,
        [ProductID] [int] NOT NULL,
        [ProductModelID] [int] NOT NULL,
        [ProductcategoryID] [int] NOT NULL,
        [ProductName] varchar(50) NOT NULL, 
        [ProductModelName] varchar(50) NULL,
        [ProductCategoryName] varchar(50) NULL
    )
    WITH  
    (   
        DISTRIBUTION = REPLICATE
    )
    
    1. 和上面一样,在sink之前先添加一个select的stream选择我们需要的列,然后修改mapping


      image.png

    3.接着添加一个surrogate key


    image.png

    4.修改Sink里的schema,最后发布,整个流程结束


    image.png

    cache sink

    1. 创建一个新的Customer表,并且加上CustomerSK字段
    CREATE TABLE [dbo].[DimCustomer](
        [CustomerSK] [int] NOT NULL,
        [CustomerID] [int] NOT NULL,
        [CompanyName] varchar(200) NOT NULL,
        [SalesPerson] varchar(300) NOT NULL
    )
        WITH  
    (   
        DISTRIBUTION = REPLICATE
    )       
    
    1. 此时,我们更换source将之前的sql database里的customer表换成data lake gen2里container的Csv文件


      image.png

    3.创建一个选择列的Stream,将csv里面我们需要的列选出来


    image.png

    4.更新一下sink里的mapping让customerSK显示出来

    1. publish and trigger, 成功,查看DimCustomer


      image.png

    避免重复 handling duplicate 028

    1.添加一个stream用来处理最新的写入的product的数据


    image.png

    2.添加一个处理重复的stream,在之前的ProductStream上


    image.png
    3.设置exsist,如果productID有就不插入
    image.png

    Filtering rows 029

    1. 创建一个新的data flow,将datalake里的parquet作为数据源
      2.创建一个sink,用来接收数据到pool里
    2. 创建一个filter,用来添加过滤的条件


      image.png

    Generating Joson data 030

    1. 将parquet转换为Json数据


      image.png

    Load json into SQL pool 031

    1.在上面的Data flow上,我们可以添加一个Copy Data的pipeline


    image.png

    2.设置copy datad的 source 和sink


    image.png
    image.png
    1. 修改data flow里, sink输出的json名称


      image.png

      4.完成后,执行pipeline


      image.png

    处理json数据 032

    1. 在container里有一个customer的json,结构如下


      image.png

    2.需要将找个json格式的表,存入到pool里面的customer表里,表的结构如下

    CREATE TABLE [Customercourse]
    (
        [CustomerID] int,
        [CustomerName] varchar(200),
        [Registered] BIT,
        [Courses] varchar(200)
    )
    

    相关文章

      网友评论

          本文标题:Azure data factory 05

          本文链接:https://www.haomeiwen.com/subject/akjfmdtx.html