美文网首页商业智能BI我爱编程
使用Oracle 10g进行数据仓库的抽取、转换、装载ETL

使用Oracle 10g进行数据仓库的抽取、转换、装载ETL

作者: 姜小明同学 | 来源:发表于2016-03-31 10:40 被阅读1220次

    总览

    ETL过程中发生了什么?

    首先从多个不同的数据源中抽取数据(Extraction),然后传输(transpoting)到中间临时系统或目标系统中,上面两个过程中都可能会有部分转换工作(transform)发生。
    抽取盒传输完成之后就是最重要的转换和装载工作,包含:

    • 复杂的过滤器的使用
    • 保证与目标数据库表中的数据一致
    • 检查数据是否需要更新或插入
    • 统一数据可能需要插入多次,如作为细节数据或者聚集信息。

    先决条件

    Sales History 销售记录模式改变的实现

    procedure代码end;之后加斜杠是执行存储过程。
    这部分主要代码在modifySH_10gR2.sql中,代码运行了3分钟。

    多表插入

    公司从合作伙伴公司接收到的数据记录了每个客户每周的信息。
    转换(transformation)的一部分工作就是从每周一条记录转换为7条记录。另外,数据仓库需要分开记录所有信用高于某一界限的新用户。
    这一节,将使用Oracle的多表插入来实现上述业务转换。

    使用多表插入实现pivoting旋转

    Oracle RDBMS执行每一条SQL语句,都必须经过Oracle优化器的评估。所以,了解优化器是如何选择(搜索)路径以及索引是如何被使用的,对优化SQL语句有很大的帮助。Explain可以用来迅速方便地查出对于给定SQL语句中的查询数据是如何得到的即搜索路径(我们通常称为Access Path)。

    这里我们用explain plan发现使用insert all into sales value into sales[or other table] values输入源表只扫描了一次,因为多表插入通过多个Insert into分支实现数据反向规格化(分解),避免了多趟扫描。
    对比而言INSERT INTO sales [select_statement1] union all [select_statement1]的方法需要多变扫描。

    使用多表插入实现条件插入

        INSERT /*+ APPEND NOLOGGING */FIRST
          WHEN cust_credit_limit >= 4500 THEN
            INTO customers
            INTO customers_special VALUES (cust_id, cust_credit_limit)
          ELSE
            INTO customers
        SELECT * FROM customers_new; 
    

    使用upsert(update or insert)功能,即SQL merge关键字

    公司需要定期根据产品数据库更新数据仓库。不幸的是信息是否新旧需要在数据仓库端来确定。下面步骤介绍了sql MERGE操作:

    1. 为外部产品信息建立external table外部表(和directories目录)。
      外部表作用:加载和卸载数据
      外部表都是只读表 , 不能进行 update,insert,delete 操作 .
      Oracle 提供两种访问驱动 :
      ORACLE_LOADER : 它利用 oracle loader 技术从外部表读取数据 . 它还具有类似 SQL*Loader 工具控制文件语法的数据映射能力 .
      ORACLE_DATAPUMP :它提供卸载数据的能力 , 即把数据从数据库里导入一个外部表 , 再由一个或几个外部表导入到数据库里 . 对于 ASsubquery 的子句必须使用 ORACLE_DATAPUMP . 创建用于卸载数据的外部表时,不能制定列名。同时,不支持 badfile 、 discardfile 参数,因为其无效或者丢弃数据将不予以记录为操作系统文件 , 没有 fields erminated by( 或者 missing field values are null) 参数。
      语法为:

       CREATE TABLE products_delta
       (
       PROD_ID NUMBER(6),
       PROD_VALID CHAR(1)
       )
       ORGANIZATION external
       (
       TYPE oracle_loader
       DEFAULT DIRECTORY data_dir
       ACCESS PARAMETERS
       (
       RECORDS DELIMITED BY NEWLINE CHARACTERSET US7ASCII
       NOBADFILE
       NOLOGFILE
       FIELDS TERMINATED BY "|" LDRTRIM
       )
       location
       ('prodDelta.dat')
       REJECT LIMIT UNLIMITED NOPARALLEL;
      
    2. 使用SQL MERGE命令行实现更新或插入。

    3. 显示Merge命令的执行计划。

    4. 不用MERGE操作,使用两段分开的代码实现upsert。

    对比结论:

    To leverage the updatable join view functionality, the external table needs to be copied into a real database table and a unique index on the join column needs to be created. In short, this requires more operations, more space requirements, and more processing time.

    学习DML错误日志

        --调用模块
        exec dbms_errlog.create_error_log('sales_overall');
        INSERT /*+ APPEND NOLOGGING PARALLEL */ INTO sales_overall
        SELECT * FROM sales_activity_direct
        LOG ERRORS INTO err$_sales_overall ( 'load_test1' ) REJECT LIMIT 10;--记录错误,最多容错十条
        commit;
        --检查错误的方法,这里可以看出相对于文件方法的优势
        select distinct ora_err_number$ from err$_sales_overall;
        select distinct ora_err_number$, ora_err_mesg$ from err$_sales_overall;
    

    体会基本的表函数

    在ETL过程中,数据从原系统中抽取(Extraction)出来,经过一系列的转换(Transformation),最后被装载(Load)到目标系统中。复杂的转换通常以过程(procedure)的形式实现。有时,转换的结果太大,内存容不下,这些结果必须以数据库表或falt files(普通文件)的方式临时地实体化。然后再被读入和作为下一转换的输入被处理。Oracle提供了对这些转换的管道化和并行处理的支持,实现的语言有PL/SQL,C和java。
    表函数可接受查询语句或游标作为输入参数,并可输出多行数据。该函数可以平行执行,并可持续输出数据流,被称作管道式输出。应用表函数可将数据转换分阶段处理,并省去中间结果的存储和缓冲表。

    自从 Oracle9i 之后,提供了一个叫做"管道化表函数"的概念。来解决此类问题,这种类型的函数,必须返回一个集合类型,且标明 PIPELINED。这个函数不能返回具体变量,必须以一个空的RETURN 返回。这个函数中,通过 PIPE ROW() 语句来送出要返回的表中的每一行。调用这个函数的时候,通过 TABLE() 关键字把管道流仿真为一个数据集。

    下面是学习使用表函数的基本步骤:

    1. 设立表函数的基本对象
    • 定义对象(记录)类型
      CREATE TYPE product_t AS OBJECT (
      prod_id NUMBER(6)
      , prod_name VARCHAR2(50)
      );
      /

    • 定义对象(集合)类型
      CREATE TYPE product_t_table AS TABLE OF product_t;
      /

    • 记录集合的结构,定义ref cursor引用游标类型的包
      ref cursor:动态关联结果集的临时对象。即在运行的时候动态决定执行查询。实现在程序间传递结果集的功能,能作为参数传递。
      CREATE OR REPLACE PACKAGE cursor_PKG as
      TYPE product_t_rec IS RECORD (prod_id NUMBER(6)
      , prod_name VARCHAR2(50)
      );

        TYPE product_t_rectab IS TABLE OF product_t_rec;
        TYPE strong_refcur_t IS REF CURSOR RETURN       product_t_rec;
       TYPE refcur_t IS REF CURSOR;
        END;
      
    • 为表函数创建日志表
      CREATE TABLE obsolete_products_errors
      (prod_id NUMBER, msg VARCHAR2(2000));

    1. 实现非管道化的表函数,返回记录数组
      Rem uses weakly typed cursor as input
      CREATE OR REPLACE FUNCTION obsolete_products(cur cursor_pkg.refcur_t) RETURN
      product_t_table
      IS
      prod_id NUMBER(6);
      prod_name VARCHAR2(50);
      sales NUMBER:=0;
      objset product_t_table := product_t_table();
      i NUMBER := 0;
      BEGIN
      LOOP
      -- Fetch from cursor variable
      FETCH cur INTO prod_id, prod_name;
      EXIT WHEN cur%NOTFOUND; -- exit when last row is fetched
      -- Category Electronics is not meant to be obsolete and will be suppressed
      IF prod_status='obsolete' AND prod_category != 'Electronics' THEN
      -- append to collection
      i:=i+1;
      objset.extend;
      objset(i):=product_t( prod_id, prod_name);
      END IF;
      END LOOP;
      CLOSE cur;
      RETURN objset;
      END;
      /
      调用及结果:
    调用obsolete_products
    1. 实现管道化表函数
      CREATE OR REPLACE FUNCTION obsolete_products_pipe(cur cursor_pkg.strong_refcur_t)
      RETURN product_t_table
      PIPELINED
      PARALLEL_ENABLE (PARTITION cur BY ANY) IS
      prod_id NUMBER(6);
      prod_name VARCHAR2(50);
      sales NUMBER:=0;
      BEGIN
      LOOP
      -- Fetch from cursor variable
      FETCH cur INTO prod_id, prod_name;
      EXIT WHEN cur%NOTFOUND; -- exit when last row is fetched
      IF prod_status='obsolete' AND prod_category !='Electronics' THEN
      PIPE ROW (product_t( prod_id, prod_name));
      END IF;
      END LOOP;
      CLOSE cur;
      RETURN;
      END;
      /
      区别在于RETURN product_t_table PIPELINED和使用PIPE ROW(添加数据)。
      调用obsolete_product与前类似。

    2. 实现透明并行执行表函数
      有时表函数需要对一些产品在类别属性上处理复杂的聚合操作。若要将其并行化,需要保证具有统一产品类别属性的所有记录被统一并行子进程处理,使得聚集函数覆盖了同一group中的所有记录。因此,需要使用对分布规则,在表函数头部添加PARALLEL_ENABLE (PARTITION cur BY ANY) IS。
      通过对ref cursor进行hint提示/+ PARALLEL(a,4)/来强制进行并行化。

    3. 带自治的DML语句的表函数
      表函数提供在其自治事务的域内扇出数据到其它表中。
      例子中进行判断并向错误日志记录表中输出数据。
      FETCH cur INTO prod_id, prod_name;
      EXIT WHEN cur%NOTFOUND; -- exit when last row is fetched
      IF prod_status='obsolete' THEN
      IF prod_category=prod_cat THEN
      INSERT INTO obsolete_products_errors VALUES
      (prod_id, 'correction: category '||UPPER(prod_cat)||' still available');
      COMMIT;
      ELSE
      PIPE ROW (product_t( prod_id, prod_name));
      END IF;
      END IF;

    1. 通过多个表函数实现无缝流式处理
      除了表函数在sql语句中的透明使用和其并行处理能力,表函数的另一个优势是可以相互之间调用。更进一步,表函数可以被任何sql语句使用,可以成为任何DML语句的输入。
    • 嵌套使用使用两个表函数
      SELECT DISTINCT prod_category, prod_status
      FROM TABLE(obsolete_products_dml(CURSOR(SELECT *
      FROM TABLE(obsolete_products_pipe(
      CURSOR(SELECT prod_id, prod_name
      FROM products))))));

    • 使用create table as select命令将表函数用作输入。
      CREATE TABLE PIPE_THROUGH AS

        SELECT DISTINCT prod_category, prod_status
        FROM TABLE(obsolete_products_dml(CURSOR(SELECT *
        FROM TABLE(obsolete_products_pipe(
        CURSOR(SELECT prod_id, prod_name
        FROM products))))));
      

    使用同步Change Data Capture(CDC增量抽取解决方案)来记录抽取增量数据变化

    Change Data Capture快速找到并处理那些改变了的数据,而不是整张表,并使得这些改变的数据可以供后续之用。
    CDC不依赖于在关系数据库外使用中间文件临时存储数据。它能捕获由对用户表进行insert、update、delete操作而引起的改变数据。这些改变数据被存储在一个叫做change table的表中,并可被应用程序方便地控制与操作。

    publish 和 subscribe模型

    publish 和 subscribe模型

    大多数CDC系统有一个publisher发布者负责从多个数据源捕获和发布这些改变数据。可以有多个subscriber订阅用户访问数据。CDC提供了PL/SQL包来完成发布、订阅任务。

    • publisher发布者
      通常是负责创建、维护组成CDC系统模式对象的DBA。主要任务有:

    • 定义数据仓库可能感兴趣的关系表(源表)。

    • 使用oracle提供的DBMS_LOGMNR_CDC_PUBLISH包来启动系统从多个源表中捕获数据。

    • 以change table的形式发布这些变化数据。

    • 使用SQL的Grant和Revoke语句来对用户和角色赋予对change table改变表的select权限。

    • subscriber订阅用户
      使用发布的改变数据的应用。

    • 使用Oracle提供的DBMS_LOGMNR_CDC_SUBSCRIBE包来订阅源表,以获取对所发布的改变数据的访问权限。

    • 展开订阅窗口,当可以接收数据时,创造一个新的subscriber view订阅着视图。

    • 使用SELECT语句来从subscriber view订阅着视图遍历改变数据。

    • 删除subscriber view订阅着视图,清楚订阅窗口。

    • 当不再需要其改变数据时,删除subscription订阅。

    这一主题的学习步骤如下:

    1. 使用同步CDC来记录所有增量数据。
    • 创建一个intermediate table中间表来进行操作。
      CREATE TABLE my_price_change_Electronics
      (prod_id number, prod_min_price number, prod_list_price number, when date);
    1. 创建一个改变表。
      使用DBMS_CDC_PUBLISH包创建改变表。
      begin
      DBMS_CDC_PUBLISH.CREATE_CHANGE_TABLE (OWNER => 'SH', -
      CHANGE_TABLE_NAME => 'PROD_price_CT',
      CHANGE_SET_NAME => 'SYNC_SET',
      SOURCE_SCHEMA => 'SH',
      SOURCE_TABLE => 'PRODUCTS',
      COLUMN_TYPE_LIST => 'prod_id number(6), prod_min_price number(8,2),
      prod_list_price number(8,2)',
      CAPTURE_VALUES => 'both',
      RS_ID => 'y',
      ROW_ID => 'n',
      USER_ID => 'n',
      TIMESTAMP => 'n',
      OBJECT_ID => 'n',
      SOURCE_COLMAP => 'y',
      TARGET_COLMAP => 'y',
      OPTIONS_STRING => null);
      end;
      /
      这段代码创建了一个叫PROD_PRICE_CT的改变表,以及跟踪所有在product上的后续改变的必须的触发器。
    change table的结构

    表中除了源表PRODUCTS中的列之外还有多个必须(如OPPERATION$)和可选(SOURCE_COLMAP$)的metadata元数据列。
    可通过SELECT * FROM change_tables;等查询change table相关信息。

    1. 订阅到一个change set改变集和所有感兴趣的源表列。
      订阅处理的逻辑实体是change set而不是change table。一个chage set可以包含多个change table,并能保证其间的逻辑一致。在订阅一个change set后,你就可以订阅所有感兴趣的源表列。方法为DBMS_CDC_SUBSCRIBE.CREATE_SUBSCRIPTION();
    • 首先获得独一的将在整个对话中使用的 subscription handle订阅句柄,并告诉系统你感兴趣的列。

        variable subname varchar2(30)
        begin
        :subname := 'my_subscription_no_1';
        DBMS_CDC_SUBSCRIBE.CREATE_SUBSCRIPTION (
        CHANGE_SET_NAME => 'SYNC_SET',
        DESCRIPTION => 'Change data PRODUCTS for ELECTRONICS',
        SUBSCRIPTION_name => :subname);
        END;
        /
      

    订阅是一种连接。

    • 然后就可以启动一个该subscription的change view改变视图。

        BEGIN
        :view_name := 'my_prod_price_change_view';
        DBMS_CDC_SUBSCRIBE.SUBSCRIBE (
        SUBSCRIPTION_NAME => :subname,
        SOURCE_SCHEMA => 'sh',
        SOURCE_TABLE => 'products',
        COLUMN_LIST => 'prod_id, prod_min_price, prod_list_price',
        subscriber_view => :view_name );
        END;
        /
      

    订阅者视图主要是为了添加感兴趣的列。

    1. 激活一个订阅,并展开订阅窗口。
      EXEC DBMS_LOGMNR_CDC_SUBSCRIBE.ACTIVATE_SUBSCRIPTION -
      (SUBSCRIPTION_name => 'my_subscription_no_1')
      至此,所有对PRODUCT源表的改变都反映在change table中。这些改变是通过在源表上的trigger透明地实现的。
    对source table的操作会改变change table

    源表中每条记录的改变,在change table中会添加对应的两条记录:旧、新。
    然而,建议不要直接使用change table来找到源表中的改变数据,而是使用安全有保障的subscriber view订阅者视角。
    展开订阅窗口后,就可以在subscriber view订阅者视角中看到一段时间窗口内改变的数据。

    • 现在就可以在目标系统上使用这些改变数据
      可以将subscriber view订阅者视角my_prod_price_change_view 当做表来处理。
      INSERT into my_price_change_electronics
      SELECT p1.prod_id, p1.prod_min_price, p1.prod_list_price, commit_timestamp$
      FROM my_prod_price_change_view p1, products p2
      WHERE p1.prod_id=p2.prod_id
      AND p2.prod_category='Electronics' AND operation$='UN';

        COMMIT;
      
        SELECT prod_id, prod_min_price, prod_list_price,
        to_char(when,'dd-mon-yyyy hh24:mi:ss')
        FROM my_price_change_electronics;
      
    1. 考察如何随时间变化处理新的环境。
    purge_window清理subscriber view中的数据 extend_window后可以继续获取数据

    然后就可以使用所获取的数据。如用merge指令将数据导入已存在的表中。

    merge,将subscriber view中的数据当做“普通”表进行处理
    1. 运行发布者。
      发布者需清理不需要的数据。
      exec DBMS_CDC_PUBLISH.purge_change_table('sh','prod_price_ct')

    2. 删除使用过的改变视图并清理订阅窗口。

    3. 清理CDC环境。
      exec DBMS_CDC_SUBSCRIBE.drop_subscription -
      (subscription_name=> 'my_subscription_no_1');
      exec DBMS_CDC_PUBLISH.DROP_CHANGE_TABLE (OWNER => 'sh', -
      CHANGE_TABLE_NAME => 'prod_price_CT', -
      FORCE_FLAG => 'Y');

    为数据集市进行信息传播

    除了中心的数据仓库,公司还可能运行多个小的data mart数据集市。比如,产品部门想要接收所有的事务型SALES销售数据来进行市场营销分析,这些数据根据其主要的产品类别而划分,且只有2000年的数据相关。
    我们讲使用Oracle的transportable tablespace可传输表空间功能和list partition列表划分来解决这些问题。另外,为了保障新表生成过程的成功完成,我们将在resumable可恢复模式下运行语句,以保证不会因为空间问题而导致创建表失败。
    从数据仓库传播数据到数据集市的学习步骤:

    1. 开启一个resumable session可恢复对话。
      当遇到空间不足或超出空间的错误后,resumable session会suspend挂起,待错误解决后再自动地resume继续执行。如果错误不能在一定的时间范围内被解决,语句终究还是失败。本例中,我们手工地解决错误。

       ALTER SESSION ENABLE RESUMABLE TIMEOUT 1200 NAME 
       'create list partitioning';
      
    2. 创建一个新的表空间(将用作可传输表空间)。
      现在创建一个额外的表空间来存储我们的List patitioned事实表。
      CREATE TABLESPACE my_obe_transfer DATAFILE 'c:\obetemp' SIZE 2M REUSE autoextend off;

    3. 在新的表空间中创建List partitioned table列表分离表。
      在窗口1中,利用CREATE TABLE AS SELECT创建List patitioned table列表分割表。
      DROP TABLE sales_prod_dept;
      PROMPT create table in new TS that is too small
      CREATE TABLE sales_prod_dept
      (prod_category, prod_subcategory,cust_id,
      time_id,channel_id,promo_id, quantity_sold, amount_sold
      ) NOLOGGING TABLESPACE my_obe_transfer
      PARTITION BY LIST (prod_category)
      (PARTITION electronic_sales values ('Electronics'),
      PARTITION hardware_sales values ('Hardware'),
      PARTITION sw_other_sales values ('Software/Other'),
      PARTITION p_and_a values ('Peripherals and Accessories'),
      PARTITION photo_sales values ('Photo')
      )
      AS
      SELECT p.prod_category, p.prod_subcategory, s.cust_id, s.time_id,s.channel_id,
      s.promo_id, SUM(s.amount_sold) amount_sold, SUM(s.quantity_sold) quantity_sold
      FROM sales s, products p, times t
      WHERE p.prod_id=s.prod_id
      AND s.time_id = t.time_id
      AND t.fiscal_year=2000
      GROUP BY prod_category, prod_subcategory,cust_id, s.time_id,channel_id, promo_id
      ;
      上面代码中,partition相当于切片,select pks [] group by plks相当于建立了事实表。
      但由于初始划分的空间2m太小,会报错误并挂起,加入下一步。

    4. 使用新的resumable statement可恢复语句功能来进行高效地错误检测和处理。
      SELECT NAME, STATUS, ERROR_MSG FROM dba_resumable;
      显示不能扩展表空间my_obe_transfer中的临时段。
      我们手工解决这个问题,执行:
      ALTER DATABASE DATAFILE 'c:\obetemp' AUTOEXTEND ON NEXT 5M;
      错误修正后,挂起的窗口1自动resume继续运行。

    5. 创建一个新的range_list partitioned table(oracle 9i功能)。
      一个使用Range-List分割的典型例子是全球零售环境,使用time range partitions时间范围分割(滑动窗口)和一个底层的region-oriented list partition基于地域的列表分割,这样就可以为每个地域维护所有时间窗。
      oracle提供了subpartition子分割模板技术,来实现range-hash 和 range-list组合分割,可类比于下钻。
      CREATE TABLE sales_rlp
      COMPRESS
      TABLESPACE MY_OBE_TRANSFER
      PARTITION BY RANGE (time_id)
      SUBPARTITION BY LIST (channel_id)
      SUBPARTITION TEMPLATE
      ( SUBPARTITION direct values (3),
      SUBPARTITION internet values (4),
      SUBPARTITION partner values (2),
      SUBPARTITION other values (DEFAULT)
      )
      (PARTITION SALES_before_1999 VALUES LESS THAN (TO_DATE('01-JAN-1999','DD-MON-YYYY')),
      PARTITION SALES_Q1_1999 VALUES LESS THAN (TO_DATE('01-APR-1999','DD-MON-YYYY')),
      PARTITION SALES_Q2_1999 VALUES LESS THAN (TO_DATE('01-JUL-1999','DD-MON-YYYY')),
      PARTITION SALES_Q3_1999 VALUES LESS THAN (TO_DATE('01-OCT-1999','DD-MON-YYYY')),
      PARTITION SALES_Q4_1999 VALUES LESS THAN (TO_DATE('01-JAN-2000','DD-MON-YYYY')),
      PARTITION SALES_Q1_2000 VALUES LESS THAN (TO_DATE('01-APR-2000','DD-MON-YYYY')),
      PARTITION SALES_Q2_2000 VALUES LESS THAN (TO_DATE('01-JUL-2000','DD-MON-YYYY')),
      PARTITION SALES_Q3_2000 VALUES LESS THAN (TO_DATE('01-OCT-2000','DD-MON-YYYY')),
      PARTITION SALES_Q4_2000 VALUES LESS THAN (MAXVALUE) NOCOMPRESS)
      AS
      SELECT * FROM sales sample(10);

    6. 为可传输表空间准备元数据导出
      CREATE DIRECTORY my_obe_dump_dir as 'c:\wkdir';
      expdp '/ as sysdba' DIRECTORY=my_obe_dump_dir DUMPFILE= meta_MY_OBE_TRANSFER.dmp TRANSPORT_TABLESPACES=MY_OBE_TRANSFER.dmp
      -原网站上的代码有误。

    清理

        SET SERVEROUTPUT ON\
        EXEC dw_handsOn.cleanup_modules
    

    总结

    • 多表插入
    • merge实现upsert
    • 使用表函数
    • 使用同步CDC来捕获和使用增量数据
    • 从数据仓库向数据集市传播数据

    相关文章

      网友评论

        本文标题:使用Oracle 10g进行数据仓库的抽取、转换、装载ETL

        本文链接:https://www.haomeiwen.com/subject/bicjlttx.html