总览
ETL过程中发生了什么?
首先从多个不同的数据源中抽取数据(Extraction),然后传输(transpoting)到中间临时系统或目标系统中,上面两个过程中都可能会有部分转换工作(transform)发生。
抽取盒传输完成之后就是最重要的转换和装载工作,包含:
- 复杂的过滤器的使用
- 保证与目标数据库表中的数据一致
- 检查数据是否需要更新或插入
- 统一数据可能需要插入多次,如作为细节数据或者聚集信息。
先决条件
Sales History 销售记录模式改变的实现
procedure代码end;之后加斜杠是执行存储过程。
这部分主要代码在modifySH_10gR2.sql中,代码运行了3分钟。
多表插入
公司从合作伙伴公司接收到的数据记录了每个客户每周的信息。
转换(transformation)的一部分工作就是从每周一条记录转换为7条记录。另外,数据仓库需要分开记录所有信用高于某一界限的新用户。
这一节,将使用Oracle的多表插入来实现上述业务转换。
使用多表插入实现pivoting旋转
Oracle RDBMS执行每一条SQL语句,都必须经过Oracle优化器的评估。所以,了解优化器是如何选择(搜索)路径以及索引是如何被使用的,对优化SQL语句有很大的帮助。Explain可以用来迅速方便地查出对于给定SQL语句中的查询数据是如何得到的即搜索路径(我们通常称为Access Path)。
这里我们用explain plan发现使用insert all into sales value into sales[or other table] values输入源表只扫描了一次,因为多表插入通过多个Insert into分支实现数据反向规格化(分解),避免了多趟扫描。
对比而言INSERT INTO sales [select_statement1] union all [select_statement1]的方法需要多变扫描。
使用多表插入实现条件插入
INSERT /*+ APPEND NOLOGGING */FIRST
WHEN cust_credit_limit >= 4500 THEN
INTO customers
INTO customers_special VALUES (cust_id, cust_credit_limit)
ELSE
INTO customers
SELECT * FROM customers_new;
使用upsert(update or insert)功能,即SQL merge关键字
公司需要定期根据产品数据库更新数据仓库。不幸的是信息是否新旧需要在数据仓库端来确定。下面步骤介绍了sql MERGE操作:
-
为外部产品信息建立external table外部表(和directories目录)。
外部表作用:加载和卸载数据
外部表都是只读表 , 不能进行 update,insert,delete 操作 .
Oracle 提供两种访问驱动 :
ORACLE_LOADER : 它利用 oracle loader 技术从外部表读取数据 . 它还具有类似 SQL*Loader 工具控制文件语法的数据映射能力 .
ORACLE_DATAPUMP :它提供卸载数据的能力 , 即把数据从数据库里导入一个外部表 , 再由一个或几个外部表导入到数据库里 . 对于 ASsubquery 的子句必须使用 ORACLE_DATAPUMP . 创建用于卸载数据的外部表时,不能制定列名。同时,不支持 badfile 、 discardfile 参数,因为其无效或者丢弃数据将不予以记录为操作系统文件 , 没有 fields erminated by( 或者 missing field values are null) 参数。
语法为:CREATE TABLE products_delta ( PROD_ID NUMBER(6), PROD_VALID CHAR(1) ) ORGANIZATION external ( TYPE oracle_loader DEFAULT DIRECTORY data_dir ACCESS PARAMETERS ( RECORDS DELIMITED BY NEWLINE CHARACTERSET US7ASCII NOBADFILE NOLOGFILE FIELDS TERMINATED BY "|" LDRTRIM ) location ('prodDelta.dat') REJECT LIMIT UNLIMITED NOPARALLEL;
-
使用SQL MERGE命令行实现更新或插入。
-
显示Merge命令的执行计划。
-
不用MERGE操作,使用两段分开的代码实现upsert。
对比结论:
To leverage the updatable join view functionality, the external table needs to be copied into a real database table and a unique index on the join column needs to be created. In short, this requires more operations, more space requirements, and more processing time.
学习DML错误日志
--调用模块
exec dbms_errlog.create_error_log('sales_overall');
INSERT /*+ APPEND NOLOGGING PARALLEL */ INTO sales_overall
SELECT * FROM sales_activity_direct
LOG ERRORS INTO err$_sales_overall ( 'load_test1' ) REJECT LIMIT 10;--记录错误,最多容错十条
commit;
--检查错误的方法,这里可以看出相对于文件方法的优势
select distinct ora_err_number$ from err$_sales_overall;
select distinct ora_err_number$, ora_err_mesg$ from err$_sales_overall;
体会基本的表函数
在ETL过程中,数据从原系统中抽取(Extraction)出来,经过一系列的转换(Transformation),最后被装载(Load)到目标系统中。复杂的转换通常以过程(procedure)的形式实现。有时,转换的结果太大,内存容不下,这些结果必须以数据库表或falt files(普通文件)的方式临时地实体化。然后再被读入和作为下一转换的输入被处理。Oracle提供了对这些转换的管道化和并行处理的支持,实现的语言有PL/SQL,C和java。
表函数可接受查询语句或游标作为输入参数,并可输出多行数据。该函数可以平行执行,并可持续输出数据流,被称作管道式输出。应用表函数可将数据转换分阶段处理,并省去中间结果的存储和缓冲表。
自从 Oracle9i 之后,提供了一个叫做"管道化表函数"的概念。来解决此类问题,这种类型的函数,必须返回一个集合类型,且标明 PIPELINED。这个函数不能返回具体变量,必须以一个空的RETURN 返回。这个函数中,通过 PIPE ROW() 语句来送出要返回的表中的每一行。调用这个函数的时候,通过 TABLE() 关键字把管道流仿真为一个数据集。
下面是学习使用表函数的基本步骤:
- 设立表函数的基本对象
-
定义对象(记录)类型
CREATE TYPE product_t AS OBJECT (
prod_id NUMBER(6)
, prod_name VARCHAR2(50)
);
/ -
定义对象(集合)类型
CREATE TYPE product_t_table AS TABLE OF product_t;
/ -
记录集合的结构,定义ref cursor引用游标类型的包
ref cursor:动态关联结果集的临时对象。即在运行的时候动态决定执行查询。实现在程序间传递结果集的功能,能作为参数传递。
CREATE OR REPLACE PACKAGE cursor_PKG as
TYPE product_t_rec IS RECORD (prod_id NUMBER(6)
, prod_name VARCHAR2(50)
);TYPE product_t_rectab IS TABLE OF product_t_rec; TYPE strong_refcur_t IS REF CURSOR RETURN product_t_rec; TYPE refcur_t IS REF CURSOR; END;
-
为表函数创建日志表
CREATE TABLE obsolete_products_errors
(prod_id NUMBER, msg VARCHAR2(2000));
- 实现非管道化的表函数,返回记录数组
Rem uses weakly typed cursor as input
CREATE OR REPLACE FUNCTION obsolete_products(cur cursor_pkg.refcur_t) RETURN
product_t_table
IS
prod_id NUMBER(6);
prod_name VARCHAR2(50);
sales NUMBER:=0;
objset product_t_table := product_t_table();
i NUMBER := 0;
BEGIN
LOOP
-- Fetch from cursor variable
FETCH cur INTO prod_id, prod_name;
EXIT WHEN cur%NOTFOUND; -- exit when last row is fetched
-- Category Electronics is not meant to be obsolete and will be suppressed
IF prod_status='obsolete' AND prod_category != 'Electronics' THEN
-- append to collection
i:=i+1;
objset.extend;
objset(i):=product_t( prod_id, prod_name);
END IF;
END LOOP;
CLOSE cur;
RETURN objset;
END;
/
调用及结果:
-
实现管道化表函数
CREATE OR REPLACE FUNCTION obsolete_products_pipe(cur cursor_pkg.strong_refcur_t)
RETURN product_t_table
PIPELINED
PARALLEL_ENABLE (PARTITION cur BY ANY) IS
prod_id NUMBER(6);
prod_name VARCHAR2(50);
sales NUMBER:=0;
BEGIN
LOOP
-- Fetch from cursor variable
FETCH cur INTO prod_id, prod_name;
EXIT WHEN cur%NOTFOUND; -- exit when last row is fetched
IF prod_status='obsolete' AND prod_category !='Electronics' THEN
PIPE ROW (product_t( prod_id, prod_name));
END IF;
END LOOP;
CLOSE cur;
RETURN;
END;
/
区别在于RETURN product_t_table PIPELINED和使用PIPE ROW(添加数据)。
调用obsolete_product与前类似。 -
实现透明并行执行表函数
有时表函数需要对一些产品在类别属性上处理复杂的聚合操作。若要将其并行化,需要保证具有统一产品类别属性的所有记录被统一并行子进程处理,使得聚集函数覆盖了同一group中的所有记录。因此,需要使用对分布规则,在表函数头部添加PARALLEL_ENABLE (PARTITION cur BY ANY) IS。
通过对ref cursor进行hint提示/+ PARALLEL(a,4)/来强制进行并行化。 -
带自治的DML语句的表函数
表函数提供在其自治事务的域内扇出数据到其它表中。
例子中进行判断并向错误日志记录表中输出数据。
FETCH cur INTO prod_id, prod_name;
EXIT WHEN cur%NOTFOUND; -- exit when last row is fetched
IF prod_status='obsolete' THEN
IF prod_category=prod_cat THEN
INSERT INTO obsolete_products_errors VALUES
(prod_id, 'correction: category '||UPPER(prod_cat)||' still available');
COMMIT;
ELSE
PIPE ROW (product_t( prod_id, prod_name));
END IF;
END IF;
- 通过多个表函数实现无缝流式处理
除了表函数在sql语句中的透明使用和其并行处理能力,表函数的另一个优势是可以相互之间调用。更进一步,表函数可以被任何sql语句使用,可以成为任何DML语句的输入。
-
嵌套使用使用两个表函数
SELECT DISTINCT prod_category, prod_status
FROM TABLE(obsolete_products_dml(CURSOR(SELECT *
FROM TABLE(obsolete_products_pipe(
CURSOR(SELECT prod_id, prod_name
FROM products)))))); -
使用create table as select命令将表函数用作输入。
CREATE TABLE PIPE_THROUGH ASSELECT DISTINCT prod_category, prod_status FROM TABLE(obsolete_products_dml(CURSOR(SELECT * FROM TABLE(obsolete_products_pipe( CURSOR(SELECT prod_id, prod_name FROM products))))));
使用同步Change Data Capture(CDC增量抽取解决方案)来记录抽取增量数据变化
Change Data Capture快速找到并处理那些改变了的数据,而不是整张表,并使得这些改变的数据可以供后续之用。
CDC不依赖于在关系数据库外使用中间文件临时存储数据。它能捕获由对用户表进行insert、update、delete操作而引起的改变数据。这些改变数据被存储在一个叫做change table的表中,并可被应用程序方便地控制与操作。
publish 和 subscribe模型
publish 和 subscribe模型大多数CDC系统有一个publisher发布者负责从多个数据源捕获和发布这些改变数据。可以有多个subscriber订阅用户访问数据。CDC提供了PL/SQL包来完成发布、订阅任务。
-
publisher发布者
通常是负责创建、维护组成CDC系统模式对象的DBA。主要任务有: -
定义数据仓库可能感兴趣的关系表(源表)。
-
使用oracle提供的DBMS_LOGMNR_CDC_PUBLISH包来启动系统从多个源表中捕获数据。
-
以change table的形式发布这些变化数据。
-
使用SQL的Grant和Revoke语句来对用户和角色赋予对change table改变表的select权限。
-
subscriber订阅用户
使用发布的改变数据的应用。 -
使用Oracle提供的DBMS_LOGMNR_CDC_SUBSCRIBE包来订阅源表,以获取对所发布的改变数据的访问权限。
-
展开订阅窗口,当可以接收数据时,创造一个新的subscriber view订阅着视图。
-
使用SELECT语句来从subscriber view订阅着视图遍历改变数据。
-
删除subscriber view订阅着视图,清楚订阅窗口。
-
当不再需要其改变数据时,删除subscription订阅。
这一主题的学习步骤如下:
- 使用同步CDC来记录所有增量数据。
- 创建一个intermediate table中间表来进行操作。
CREATE TABLE my_price_change_Electronics
(prod_id number, prod_min_price number, prod_list_price number, when date);
- 创建一个改变表。
使用DBMS_CDC_PUBLISH包创建改变表。
begin
DBMS_CDC_PUBLISH.CREATE_CHANGE_TABLE (OWNER => 'SH', -
CHANGE_TABLE_NAME => 'PROD_price_CT',
CHANGE_SET_NAME => 'SYNC_SET',
SOURCE_SCHEMA => 'SH',
SOURCE_TABLE => 'PRODUCTS',
COLUMN_TYPE_LIST => 'prod_id number(6), prod_min_price number(8,2),
prod_list_price number(8,2)',
CAPTURE_VALUES => 'both',
RS_ID => 'y',
ROW_ID => 'n',
USER_ID => 'n',
TIMESTAMP => 'n',
OBJECT_ID => 'n',
SOURCE_COLMAP => 'y',
TARGET_COLMAP => 'y',
OPTIONS_STRING => null);
end;
/
这段代码创建了一个叫PROD_PRICE_CT的改变表,以及跟踪所有在product上的后续改变的必须的触发器。
表中除了源表PRODUCTS中的列之外还有多个必须(如OPPERATION$)和可选(SOURCE_COLMAP$)的metadata元数据列。
可通过SELECT * FROM change_tables;等查询change table相关信息。
- 订阅到一个change set改变集和所有感兴趣的源表列。
订阅处理的逻辑实体是change set而不是change table。一个chage set可以包含多个change table,并能保证其间的逻辑一致。在订阅一个change set后,你就可以订阅所有感兴趣的源表列。方法为DBMS_CDC_SUBSCRIBE.CREATE_SUBSCRIPTION();
-
首先获得独一的将在整个对话中使用的 subscription handle订阅句柄,并告诉系统你感兴趣的列。
variable subname varchar2(30) begin :subname := 'my_subscription_no_1'; DBMS_CDC_SUBSCRIBE.CREATE_SUBSCRIPTION ( CHANGE_SET_NAME => 'SYNC_SET', DESCRIPTION => 'Change data PRODUCTS for ELECTRONICS', SUBSCRIPTION_name => :subname); END; /
订阅是一种连接。
-
然后就可以启动一个该subscription的change view改变视图。
BEGIN :view_name := 'my_prod_price_change_view'; DBMS_CDC_SUBSCRIBE.SUBSCRIBE ( SUBSCRIPTION_NAME => :subname, SOURCE_SCHEMA => 'sh', SOURCE_TABLE => 'products', COLUMN_LIST => 'prod_id, prod_min_price, prod_list_price', subscriber_view => :view_name ); END; /
订阅者视图主要是为了添加感兴趣的列。
- 激活一个订阅,并展开订阅窗口。
EXEC DBMS_LOGMNR_CDC_SUBSCRIBE.ACTIVATE_SUBSCRIPTION -
(SUBSCRIPTION_name => 'my_subscription_no_1')
至此,所有对PRODUCT源表的改变都反映在change table中。这些改变是通过在源表上的trigger透明地实现的。
源表中每条记录的改变,在change table中会添加对应的两条记录:旧、新。
然而,建议不要直接使用change table来找到源表中的改变数据,而是使用安全有保障的subscriber view订阅者视角。
展开订阅窗口后,就可以在subscriber view订阅者视角中看到一段时间窗口内改变的数据。
-
现在就可以在目标系统上使用这些改变数据
可以将subscriber view订阅者视角my_prod_price_change_view 当做表来处理。
INSERT into my_price_change_electronics
SELECT p1.prod_id, p1.prod_min_price, p1.prod_list_price, commit_timestamp$
FROM my_prod_price_change_view p1, products p2
WHERE p1.prod_id=p2.prod_id
AND p2.prod_category='Electronics' AND operation$='UN';COMMIT; SELECT prod_id, prod_min_price, prod_list_price, to_char(when,'dd-mon-yyyy hh24:mi:ss') FROM my_price_change_electronics;
- 考察如何随时间变化处理新的环境。
然后就可以使用所获取的数据。如用merge指令将数据导入已存在的表中。
merge,将subscriber view中的数据当做“普通”表进行处理-
运行发布者。
发布者需清理不需要的数据。
exec DBMS_CDC_PUBLISH.purge_change_table('sh','prod_price_ct') -
删除使用过的改变视图并清理订阅窗口。
-
清理CDC环境。
exec DBMS_CDC_SUBSCRIBE.drop_subscription -
(subscription_name=> 'my_subscription_no_1');
exec DBMS_CDC_PUBLISH.DROP_CHANGE_TABLE (OWNER => 'sh', -
CHANGE_TABLE_NAME => 'prod_price_CT', -
FORCE_FLAG => 'Y');
为数据集市进行信息传播
除了中心的数据仓库,公司还可能运行多个小的data mart数据集市。比如,产品部门想要接收所有的事务型SALES销售数据来进行市场营销分析,这些数据根据其主要的产品类别而划分,且只有2000年的数据相关。
我们讲使用Oracle的transportable tablespace可传输表空间功能和list partition列表划分来解决这些问题。另外,为了保障新表生成过程的成功完成,我们将在resumable可恢复模式下运行语句,以保证不会因为空间问题而导致创建表失败。
从数据仓库传播数据到数据集市的学习步骤:
-
开启一个resumable session可恢复对话。
当遇到空间不足或超出空间的错误后,resumable session会suspend挂起,待错误解决后再自动地resume继续执行。如果错误不能在一定的时间范围内被解决,语句终究还是失败。本例中,我们手工地解决错误。ALTER SESSION ENABLE RESUMABLE TIMEOUT 1200 NAME 'create list partitioning';
-
创建一个新的表空间(将用作可传输表空间)。
现在创建一个额外的表空间来存储我们的List patitioned事实表。
CREATE TABLESPACE my_obe_transfer DATAFILE 'c:\obetemp' SIZE 2M REUSE autoextend off; -
在新的表空间中创建List partitioned table列表分离表。
在窗口1中,利用CREATE TABLE AS SELECT创建List patitioned table列表分割表。
DROP TABLE sales_prod_dept;
PROMPT create table in new TS that is too small
CREATE TABLE sales_prod_dept
(prod_category, prod_subcategory,cust_id,
time_id,channel_id,promo_id, quantity_sold, amount_sold
) NOLOGGING TABLESPACE my_obe_transfer
PARTITION BY LIST (prod_category)
(PARTITION electronic_sales values ('Electronics'),
PARTITION hardware_sales values ('Hardware'),
PARTITION sw_other_sales values ('Software/Other'),
PARTITION p_and_a values ('Peripherals and Accessories'),
PARTITION photo_sales values ('Photo')
)
AS
SELECT p.prod_category, p.prod_subcategory, s.cust_id, s.time_id,s.channel_id,
s.promo_id, SUM(s.amount_sold) amount_sold, SUM(s.quantity_sold) quantity_sold
FROM sales s, products p, times t
WHERE p.prod_id=s.prod_id
AND s.time_id = t.time_id
AND t.fiscal_year=2000
GROUP BY prod_category, prod_subcategory,cust_id, s.time_id,channel_id, promo_id
;
上面代码中,partition相当于切片,select pks [] group by plks相当于建立了事实表。
但由于初始划分的空间2m太小,会报错误并挂起,加入下一步。 -
使用新的resumable statement可恢复语句功能来进行高效地错误检测和处理。
SELECT NAME, STATUS, ERROR_MSG FROM dba_resumable;
显示不能扩展表空间my_obe_transfer中的临时段。
我们手工解决这个问题,执行:
ALTER DATABASE DATAFILE 'c:\obetemp' AUTOEXTEND ON NEXT 5M;
错误修正后,挂起的窗口1自动resume继续运行。 -
创建一个新的range_list partitioned table(oracle 9i功能)。
一个使用Range-List分割的典型例子是全球零售环境,使用time range partitions时间范围分割(滑动窗口)和一个底层的region-oriented list partition基于地域的列表分割,这样就可以为每个地域维护所有时间窗。
oracle提供了subpartition子分割模板技术,来实现range-hash 和 range-list组合分割,可类比于下钻。
CREATE TABLE sales_rlp
COMPRESS
TABLESPACE MY_OBE_TRANSFER
PARTITION BY RANGE (time_id)
SUBPARTITION BY LIST (channel_id)
SUBPARTITION TEMPLATE
( SUBPARTITION direct values (3),
SUBPARTITION internet values (4),
SUBPARTITION partner values (2),
SUBPARTITION other values (DEFAULT)
)
(PARTITION SALES_before_1999 VALUES LESS THAN (TO_DATE('01-JAN-1999','DD-MON-YYYY')),
PARTITION SALES_Q1_1999 VALUES LESS THAN (TO_DATE('01-APR-1999','DD-MON-YYYY')),
PARTITION SALES_Q2_1999 VALUES LESS THAN (TO_DATE('01-JUL-1999','DD-MON-YYYY')),
PARTITION SALES_Q3_1999 VALUES LESS THAN (TO_DATE('01-OCT-1999','DD-MON-YYYY')),
PARTITION SALES_Q4_1999 VALUES LESS THAN (TO_DATE('01-JAN-2000','DD-MON-YYYY')),
PARTITION SALES_Q1_2000 VALUES LESS THAN (TO_DATE('01-APR-2000','DD-MON-YYYY')),
PARTITION SALES_Q2_2000 VALUES LESS THAN (TO_DATE('01-JUL-2000','DD-MON-YYYY')),
PARTITION SALES_Q3_2000 VALUES LESS THAN (TO_DATE('01-OCT-2000','DD-MON-YYYY')),
PARTITION SALES_Q4_2000 VALUES LESS THAN (MAXVALUE) NOCOMPRESS)
AS
SELECT * FROM sales sample(10); -
为可传输表空间准备元数据导出
CREATE DIRECTORY my_obe_dump_dir as 'c:\wkdir';
expdp '/ as sysdba' DIRECTORY=my_obe_dump_dir DUMPFILE= meta_MY_OBE_TRANSFER.dmp TRANSPORT_TABLESPACES=MY_OBE_TRANSFER.dmp
-原网站上的代码有误。
清理
SET SERVEROUTPUT ON\
EXEC dw_handsOn.cleanup_modules
总结
- 多表插入
- merge实现upsert
- 使用表函数
- 使用同步CDC来捕获和使用增量数据
- 从数据仓库向数据集市传播数据
网友评论