从“进阶技术”开始,已经通过增加列和表扩展了数据仓库,在进阶技术(五) “快照”里增加了第二个事实表,month_end_sales_order_fact表。这之后数据仓库模式就有了两个事实表(第一个是在开始建立数据仓库时创建的sales_order_fact表)。有了这两个事实表的数据仓库就是一个标准的双星型模式。
本节将在现有的维度数据仓库上再增加一个新的星型结构。与现有的与销售关联的星型结构不同,新的星型结构关注的是产品业务领域。新的星型结构有一个事实表和一个维度表,用于存储数据仓库中的产品数据。
1. 一个新的星型模式
下图显示了扩展后的数据仓库模式。
模式中有三个星型结构。sales_order_fact表是第一个星型结构的事实表,与其相关的维度表是customer_dim、product_dim、date_dim和sales_order_attribute_dim表。month_end_sales_order_fact表是第二个星型结构的事实表。product_dim和month_dim是其对应的维度表。第一个和第二个星型结构共享product_dim维度表。第二个星型结构的事实表和月份维度数据分别来自于第一个星型结构的事实表和date_dim维度表。它们不从源数据获得数据。第三个星型模式的事实表是新建的production_fact表。它的维度除了存储在已有的date_dim和product_dim表,还有一个新的factory_dim表。第三个星型结构的数据来自源数据。
执行下面的脚本建立第三个星型模式中的新表和对应的源数据表。
-- 在MySQL源库上建立工厂表和每日产品表USE source;2. 新建定期导入脚本文件
CREATE TABLE factory_master (
factory_code INT,
factory_name CHAR(30),
factory_street_address CHAR(50),
factory_zip_code INT(5),
factory_city CHAR(30),
factory_state CHAR(2)
);
alter table factory_master add primary key (factory_code);
CREATE TABLE daily_production (
product_code INT,
production_date DATE,
factory_code INT,
production_quantity INT
);
ALTER TABLE daily_production ADD FOREIGN KEY (factory_code)
REFERENCES factory_master(factory_code) ON DELETE CASCADE ON UPDATE CASCADE ;
ALTER TABLE daily_production ADD FOREIGN KEY (product_code)
REFERENCES product(product_code) ON DELETE CASCADE ON UPDATE CASCADE ;
alter table daily_production add primary key (product_code,production_date,factory_code);
-- 在Hive的rds库上建立相应的过渡表
USE rds;
CREATE TABLE factory_master (
factory_code INT,
factory_name VARCHAR(30),
factory_street_address VARCHAR(50),
factory_zip_code INT,
factory_city VARCHAR(30),
factory_state VARCHAR(2)
);
CREATE TABLE daily_production (
product_code INT,
production_date DATE,
factory_code INT,
production_quantity INT
);
-- 在Hive的dw库上建立相应的维度表和事实表
USE dw;
CREATE TABLE factory_dim (
factory_sk INT,
factory_code INT,
factory_name VARCHAR(30),
factory_street_address VARCHAR(50),
factory_zip_code INT,
factory_city VARCHAR(30),
factory_state VARCHAR(2),
version int,
effective_date DATE,
expiry_date DATE
)
clustered by (factory_sk) into 8 buckets
stored as orc tblproperties ('transactional'='true');
CREATE TABLE production_fact (
product_sk INT
, production_date_sk INT
, factory_sk INT
, production_quantity INT
);
(1)新建抽取作业
# 建立增量抽取每日产品表的作业,以production_date作为检查列,初始值是'1900-01-01'last_value='1900-01-01'sqoop job --delete myjob_incremental_import_daily_production --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop sqoop job \--meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop \--create myjob_incremental_import_daily_production \-- \import \--connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" \--table daily_production \--columns "product_code,production_date,factory_code,production_quantity" \--hive-import \--hive-table rds.daily_production \--incremental append \--check-column production_date \--last-value $last_value新建定期装载每日产品脚本文件regular_etl_daily_production.sh,内容如下。
#!/bin/bash# 全量抽取工厂表sqoop import --connect jdbc:mysql://cdh1:3306/source?useSSL=false --username root --password mypassword --table factory_master --hive-import --hive-table rds.factory_master --hive-overwrite# 增量抽取每日产品表sqoop job --exec myjob_incremental_import_daily_production --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop# 调用regular_etl_daily_production.sql文件执行定期装载beeline -u jdbc:hive2://cdh2:10000/dw -f regular_etl_daily_production.sql为了和其它定期装载脚本共用环境和时间窗口设置,新建一个set_time.sql脚本,内容如下。
-- 设置变量以支持事务 set hive.support.concurrency=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; set hive.compactor.initiator.on=true; set hive.compactor.worker.threads=1; USE dw; -- 设置SCD的生效时间和过期时间 SET hivevar:cur_date = CURRENT_DATE();SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1); SET hivevar:max_date = CAST('2200-01-01' AS DATE); -- 设置CDC的上限时间 INSERT OVERWRITE TABLE rds.cdc_time SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time;新建regular_etl_daily_production.sql脚本文件内容如下。
-- 设置环境与时间窗口!run /root/set_time.sql-- 工厂信息很少修改,一般不需要保留历史,所以使用SCD1drop table if exists tmp;create table tmp as select a.factory_sk, a.factory_code, b.factory_name, b.factory_street_address, b.factory_zip_code, b.factory_city, b.factory_state, a.version, a.effective_date, a.expiry_date from factory_dim a,rds.factory_master b where a.factory_code = b.factory_code and !(a.factory_name <=> b.factory_name and a.factory_street_address <=> b.factory_street_address and a.factory_zip_code <=> b.factory_zip_code and a.factory_city <=> b.factory_city and a.factory_state <=> b.factory_state);delete from factory_dim where factory_dim.factory_sk in (select factory_sk from tmp);insert into factory_dim select * from tmp;-- 添加新的工厂信息INSERT INTO factory_dim SELECT ROW_NUMBER() OVER (ORDER BY t1.factory_code) + t2.sk_max, t1.factory_code, t1.factory_name, t1.factory_street_address, t1.factory_zip_code, t1.factory_city, t1.factory_state, 1, ${hivevar:pre_date}, ${hivevar:max_date} FROM ( SELECT t1.* FROM rds.factory_master t1 LEFT JOIN factory_dim t2 ON t1.factory_code = t2.factory_code WHERE t2.factory_sk IS NULL) t1 CROSS JOIN (SELECT COALESCE(MAX(factory_sk),0) sk_max FROM factory_dim) t2; -- 装载每日产品事实表INSERT INTO production_fact SELECT b.product_sk , c.date_sk , d.factory_sk , production_quantity FROM rds.daily_production a , product_dim b , date_dim c , factory_dim d WHERE production_date = ${hivevar:pre_date} AND a.product_code = b.product_code AND a.production_date >= b.effective_date AND a.production_date <= b.expiry_date AND a.production_date = c.date AND a.factory_code = d.factory_code ;3. 测试
到目前为止已经讨论了第三个星型结构里的所有表,现在做一些测试。首先需要一些工厂信息。执行下面的语句向源数据库的factory_master表中装载四个工厂信息。
USE source; INSERT INTO factory_master VALUES ( 1, 'First Factory', '11111 Lichtman St.', 17050, 'Mechanicsburg', 'PA' ) , ( 2, 'Second Factory', '22222 Stobosky Ave.', 17055, 'Pittsburgh', 'PA' ) , ( 3, 'Third Factory', '33333 Fritze Rd.', 17050, 'Mechanicsburg', 'PA' ) , ( 4, 'Fourth Factory', '44444 Jenzen Blvd.', 17055, 'Pittsburgh', 'PA' ); COMMIT ;执行下面的语句向源数据库的daily_production表添加数据。
USE source; set @yesterday:=date_sub(current_date, interval 1 day);INSERT INTO daily_production VALUES (1, @yesterday, 4, 100 ) , (2, @yesterday, 3, 200 ) , (3, @yesterday, 2, 300 ) , (4, @yesterday, 1, 400 ) , (1, @yesterday, 1, 400 ) , (2, @yesterday, 2, 300 ) , (3, @yesterday, 3, 200 ) , (4, @yesterday, 4, 100 ); COMMIT;现在已经做好了测试产品定期装载的准备,使用下面的命令执行定期装载作业。
./regular_etl_daily_production.sh使用下面的SQL语句查询production_fact表,确认每天产品数据的定期装载是正确的。
select product_sk product_sk, production_date_sk date_sk, factory_sk factory_sk, production_quantity quantity from dw.production_fact;查询结果如下图所示,可以看到已经正确装载了8条每日产品记录。
为了确认工厂维度上成功应用了SCD1,使用下面的语句查询factory_dim表。
select factory_sk sk, factory_code c, factory_name name, factory_street_address address, factory_zip_code zip, factory_city city, factory_state state, version ver, effective_date, expiry_date from dw.factory_dim a;查询结果如下图所示。
使用下面的语句修改源库中factory_master的数据。
use source;update factory_master set factory_street_address= (case when factory_code=2 then '24242 Bunty La.' else '37373 Burbank Dr.' end) where factory_code in (2,3);commit;再次执行定期装载作业。
./regular_etl_daily_production.sh再次查询factory_dim表,查询结果如下图所示。
可以看到第二和第三个工厂已经正确修改了地址。