基于Hadoop生态圈的数据仓库实践 —— 进阶技术(四)

时间:2021-03-25 14:13:37
四、角色扮演维度
        当一个事实表多次引用一个维度表时会用到角色扮演维度。例如,一个销售订单有一个是订单日期,还有一个交货日期,这时就需要引用日期维度表两次。
        本节将说明两类角色扮演维度的实现,分别是表别名和数据库视图。这两种都使用了Hive的功能。表别名是在SQL语句里引用维度表多次,每次引用都赋予维度表一个别名。而数据库视图,则是按照事实表需要引用维度表的次数,建立相同数量的视图。
1. 修改数据库模式
        使用下面的脚本修改数据库模式。分别给数据仓库里的事实表sales_order_fact和源数据库中订单销售表sales_order增加request_delivery_date_sk和request_delivery_date列。
-- in hiveUSE dw; 

-- sales_order_fact表是ORC格式,增加列需要重建数据
ALTER TABLE sales_order_fact RENAME TO sales_order_fact_old;
CREATE TABLE sales_order_fact (
order_sk INT comment 'order surrogate key',
customer_sk INT comment 'customer surrogate key',
product_sk INT comment 'product surrogate key',
order_date_sk INT comment 'date surrogate key',
request_delivery_date_sk INT comment 'request delivery date surrogate key',
order_amount DECIMAL(10 , 2 ) comment 'order amount',
order_quantity INT COMMENT 'order_quantity'
)
CLUSTERED BY (order_sk) INTO 8 BUCKETS
STORED AS ORC TBLPROPERTIES ('transactional'='true');
INSERT INTO sales_order_fact
SELECT order_sk, customer_sk, product_sk, order_date_sk, NULL, order_amount, order_quantity
FROM sales_order_fact_old;
DROP TABLE sales_order_fact_old;

USE rds;
ALTER TABLE sales_order ADD COLUMNS (request_delivery_date DATE COMMENT 'request delivery date') ;

-- in mysql
USE source;
ALTER TABLE sales_order ADD request_delivery_date DATE AFTER order_date ;
        修改后源数据库模式如下图所示。
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(四)        修改后DW数据库模式如下图所示。
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(四)
        Hive不能像MySQL那样指定新增列的位置,它新增的列都是在表的最后。
2. 重建Sqoop作业
        使用下面的脚本重建Sqoop作业,增加request_delivery_date列。
last_value=`sqoop job --show myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop | grep incremental.last.value | awk '{print $3}'`sqoop job --delete myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoopsqoop job \--meta-connect jdbc:hsqldb:hsql://cdh2:16000/sqoop \--create myjob_incremental_import \-- \import \--connect "jdbc:mysql://cdh1:3306/source?useSSL=false&user=root&password=mypassword" \--table sales_order \--columns "order_number, customer_number, product_code, order_date, entry_date, order_amount, order_quantity, request_delivery_date" \--hive-import \--hive-table rds.sales_order \--incremental append \--check-column order_number \--last-value $last_value
        注意columns参数值中列的顺序(MySQL里的source.sales_order)要和rds.sales_order的顺序保持一致。
3. 修改定期装载regular_etl.sql文件
        定期装载HiveQL脚本需要增加对交货日期列的处理,修改后的脚本如下所示。
-- 设置变量以支持事务  set hive.support.concurrency=true;  set hive.exec.dynamic.partition.mode=nonstrict;  set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;  set hive.compactor.initiator.on=true;  set hive.compactor.worker.threads=1;    USE dw;      -- 设置SCD的生效时间和过期时间  SET hivevar:cur_date = CURRENT_DATE();  SET hivevar:pre_date = DATE_ADD(${hivevar:cur_date},-1);  SET hivevar:max_date = CAST('2200-01-01' AS DATE);      -- 设置CDC的上限时间  INSERT OVERWRITE TABLE rds.cdc_time SELECT last_load, ${hivevar:cur_date} FROM rds.cdc_time;    -- 装载customer维度  -- 设置已删除记录和地址相关列上SCD2的过期,用<=>运算符处理NULL值。  UPDATE customer_dim      SET expiry_date = ${hivevar:pre_date}     WHERE customer_dim.customer_sk IN    (SELECT a.customer_sk      FROM (SELECT customer_sk,                  customer_number,                  customer_street_address,                  customer_zip_code,                  customer_city,                  customer_state,                  shipping_address,                  shipping_zip_code,                  shipping_city,                  shipping_state             FROM customer_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN                   rds.customer b ON a.customer_number = b.customer_number             WHERE b.customer_number IS NULL OR             (  !(a.customer_street_address <=> b.customer_street_address)            OR !(a.customer_zip_code <=> b.customer_zip_code)            OR !(a.customer_city <=> b.customer_city)            OR !(a.customer_state <=> b.customer_state)            OR !(a.shipping_address <=> b.shipping_address)            OR !(a.shipping_zip_code <=> b.shipping_zip_code)            OR !(a.shipping_city <=> b.shipping_city)            OR !(a.shipping_state <=> b.shipping_state)            ));     -- 处理customer_street_addresses列上SCD2的新增行    INSERT INTO customer_dim  SELECT      ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,      t1.customer_number,      t1.customer_name,      t1.customer_street_address,      t1.customer_zip_code,      t1.customer_city,      t1.customer_state,      t1.shipping_address,      t1.shipping_zip_code,      t1.shipping_city,      t1.shipping_state,      t1.version,      t1.effective_date,      t1.expiry_date  FROM    (    SELECT        t2.customer_number customer_number,      t2.customer_name customer_name,      t2.customer_street_address customer_street_address,      t2.customer_zip_code customer_zip_code,      t2.customer_city customer_city,      t2.customer_state customer_state,      t2.shipping_address shipping_address,      t2.shipping_zip_code shipping_zip_code,      t2.shipping_city shipping_city,      t2.shipping_state shipping_state,      t1.version + 1 version,      ${hivevar:pre_date} effective_date,        ${hivevar:max_date} expiry_date     FROM customer_dim t1   INNER JOIN rds.customer t2       ON t1.customer_number = t2.customer_number       AND t1.expiry_date = ${hivevar:pre_date}     LEFT JOIN customer_dim t3      ON t1.customer_number = t3.customer_number     AND t3.expiry_date = ${hivevar:max_date}    WHERE (!(t1.customer_street_address <=> t2.customer_street_address)     OR  !(t1.customer_zip_code <=> t2.customer_zip_code)     OR  !(t1.customer_city <=> t2.customer_city)     OR  !(t1.customer_state <=> t2.customer_state)     OR  !(t1.shipping_address <=> t2.shipping_address)     OR  !(t1.shipping_zip_code <=> t2.shipping_zip_code)     OR  !(t1.shipping_city <=> t2.shipping_city)     OR  !(t1.shipping_state <=> t2.shipping_state)     )    AND t3.customer_sk IS NULL) t1    CROSS JOIN    (SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;    -- 处理customer_name列上的SCD1  -- 因为hive的update的set子句还不支持子查询,所以这里使用了一个临时表存储需要更新的记录,用先delete再insert代替update  -- 因为SCD1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录  DROP TABLE IF EXISTS tmp;  CREATE TABLE tmp AS  SELECT      a.customer_sk,      a.customer_number,      b.customer_name,      a.customer_street_address,      a.customer_zip_code,      a.customer_city,      a.customer_state,      a.shipping_address,      a.shipping_zip_code,      a.shipping_city,      a.shipping_state,      a.version,      a.effective_date,      a.expiry_date    FROM customer_dim a, rds.customer b     WHERE a.customer_number = b.customer_number AND !(a.customer_name <=> b.customer_name);    DELETE FROM customer_dim WHERE customer_dim.customer_sk IN (SELECT customer_sk FROM tmp);    INSERT INTO customer_dim SELECT * FROM tmp;    -- 处理新增的customer记录   INSERT INTO customer_dim  SELECT      ROW_NUMBER() OVER (ORDER BY t1.customer_number) + t2.sk_max,      t1.customer_number,      t1.customer_name,      t1.customer_street_address,      t1.customer_zip_code,      t1.customer_city,      t1.customer_state,      t1.shipping_address,      t1.shipping_zip_code,      t1.shipping_city,      t1.shipping_state,      1,      ${hivevar:pre_date},      ${hivevar:max_date}  FROM    (    SELECT t1.* FROM rds.customer t1 LEFT JOIN customer_dim t2 ON t1.customer_number = t2.customer_number     WHERE t2.customer_sk IS NULL) t1    CROSS JOIN    (SELECT COALESCE(MAX(customer_sk),0) sk_max FROM customer_dim) t2;    -- 重载PA客户维度  TRUNCATE TABLE pa_customer_dim;    INSERT INTO pa_customer_dim    SELECT      customer_sk    , customer_number    , customer_name    , customer_street_address    , customer_zip_code    , customer_city    , customer_state    , shipping_address    , shipping_zip_code    , shipping_city    , shipping_state    , version    , effective_date    , expiry_date    FROM customer_dim    WHERE customer_state = 'PA' ;     -- 装载product维度  -- 设置已删除记录和product_name、product_category列上SCD2的过期  UPDATE product_dim     SET expiry_date = ${hivevar:pre_date}     WHERE product_dim.product_sk IN    (SELECT a.product_sk      FROM (SELECT product_sk,product_code,product_name,product_category              FROM product_dim WHERE expiry_date = ${hivevar:max_date}) a LEFT JOIN                   rds.product b ON a.product_code = b.product_code             WHERE b.product_code IS NULL OR (a.product_name <> b.product_name OR a.product_category <> b.product_category));    -- 处理product_name、product_category列上SCD2的新增行    INSERT INTO product_dim  SELECT      ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,      t1.product_code,      t1.product_name,      t1.product_category,      t1.version,      t1.effective_date,      t1.expiry_date  FROM    (    SELECT        t2.product_code product_code,      t2.product_name product_name,      t2.product_category product_category,          t1.version + 1 version,      ${hivevar:pre_date} effective_date,        ${hivevar:max_date} expiry_date     FROM product_dim t1   INNER JOIN rds.product t2       ON t1.product_code = t2.product_code      AND t1.expiry_date = ${hivevar:pre_date}     LEFT JOIN product_dim t3      ON t1.product_code = t3.product_code     AND t3.expiry_date = ${hivevar:max_date}    WHERE (t1.product_name <> t2.product_name OR t1.product_category <> t2.product_category) AND t3.product_sk IS NULL) t1    CROSS JOIN    (SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;    -- 处理新增的product记录  INSERT INTO product_dim  SELECT      ROW_NUMBER() OVER (ORDER BY t1.product_code) + t2.sk_max,      t1.product_code,      t1.product_name,      t1.product_category,      1,      ${hivevar:pre_date},      ${hivevar:max_date}  FROM    (    SELECT t1.* FROM rds.product t1 LEFT JOIN product_dim t2 ON t1.product_code = t2.product_code     WHERE t2.product_sk IS NULL) t1    CROSS JOIN    (SELECT COALESCE(MAX(product_sk),0) sk_max FROM product_dim) t2;    -- 装载order维度  INSERT INTO order_dim  SELECT      ROW_NUMBER() OVER (ORDER BY t1.order_number) + t2.sk_max,      t1.order_number,      t1.version,      t1.effective_date,      t1.expiry_date    FROM  (  SELECT      order_number order_number,      1 version,      order_date effective_date,      '2200-01-01' expiry_date    FROM rds.sales_order, rds.cdc_time    WHERE entry_date >= last_load AND entry_date < current_load ) t1  CROSS JOIN    (SELECT COALESCE(MAX(order_sk),0) sk_max FROM order_dim) t2;    -- 装载销售订单事实表  INSERT INTO sales_order_fact  SELECT      order_sk,      customer_sk,      product_sk,      e.date_sk,    f.date_sk,    order_amount,      order_quantity  FROM      rds.sales_order a,      order_dim b,      customer_dim c,      product_dim d,      date_dim e,    date_dim f,    rds.cdc_time g WHERE      a.order_number = b.order_number  AND a.customer_number = c.customer_number  AND a.order_date >= c.effective_date  AND a.order_date < c.expiry_date  AND a.product_code = d.product_code  AND a.order_date >= d.effective_date  AND a.order_date < d.expiry_date  AND to_date(a.order_date) = e.date AND to_date(a.request_delivery_date) = f.date AND a.entry_date >= g.last_load AND a.entry_date < g.current_load ;    -- 更新时间戳表的last_load字段  INSERT OVERWRITE TABLE rds.cdc_time SELECT current_load, current_load FROM rds.cdc_time;
4. 测试
(1)执行下面的SQL脚本增加三个带有交货日期的销售订单。
USE source;/***      新增订单日期为2016年7月17日的3条订单。  ***/    SET @start_date := unix_timestamp('2016-07-17');    SET @end_date := unix_timestamp('2016-07-18'); SET @request_delivery_date := '2016-07-20';   DROP TABLE IF EXISTS temp_sales_order_data;    CREATE TABLE temp_sales_order_data AS SELECT * FROM sales_order WHERE 1=0;         SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    SET @amount := floor(1000 + rand() * 9000);  SET @quantity := floor(10 + rand() * 90);  INSERT INTO temp_sales_order_data VALUES (126, 1, 1, @order_date, @request_delivery_date, @order_date, @amount, @quantity);        SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    SET @amount := floor(1000 + rand() * 9000);    SET @quantity := floor(10 + rand() * 90);  INSERT INTO temp_sales_order_data VALUES (127, 2, 2, @order_date, @request_delivery_date, @order_date, @amount, @quantity);        SET @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));    SET @amount := floor(1000 + rand() * 9000);  SET @quantity := floor(10 + rand() * 90);    INSERT INTO temp_sales_order_data VALUES (128, 3, 3, @order_date, @request_delivery_date, @order_date, @amount, @quantity);        INSERT INTO sales_order    SELECT NULL,customer_number,product_code,order_date,request_delivery_date,entry_date,order_amount,order_quantity FROM temp_sales_order_data ORDER BY order_date;        COMMIT ;

        修改后的销售订单源数据如下图所示,最后三条含有交货日期。

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(四)

(2)修改rds.cdc_time的值
USE rds;INSERT OVERWRITE TABLE rds.cdc_time SELECT '2016-07-17', '2016-07-17' FROM rds.cdc_time;
(3)执行定期装载并查看结果。
        使用下面的命令执行定期装载。
./regular_etl.sh
        使用下面的查询验证结果。
use dw;select a.order_sk, request_delivery_date_sk, c.date  from sales_order_fact a, date_dim b, date_dim c where a.order_date_sk = b.date_sk    and a.request_delivery_date_sk = c.date_sk ;
        查询结果如下图所示,可以看到只有三个新的销售订单具有request_delivery_date_sk值,是2016年7月20日。
基于Hadoop生态圈的数据仓库实践 —— 进阶技术(四)
5. 使用角色扮演维度查询
-- 使用表别名查询USE dw;    SELECT       order_date_dim.date order_date,      request_delivery_date_dim.date request_delivery_date,      SUM(order_amount),      COUNT(*)  FROM      sales_order_fact a,      date_dim order_date_dim,      date_dim request_delivery_date_dim  WHERE      a.order_date_sk = order_date_dim.date_sk          AND a.request_delivery_date_sk = request_delivery_date_dim.date_sk  GROUP BY order_date_dim.date , request_delivery_date_dim.date  CLUSTER BY order_date_dim.date , request_delivery_date_dim.date;-- 使用视图查询USE dw;    CREATE VIEW order_date_dim (order_date_sk, order_date, month, month_name,  quarter, year, promo_ind) AS SELECT * FROM date_dim;    CREATE VIEW request_delivery_date_dim (request_delivery_date_sk, request_delivery_date, month, month_name, quarter, year, promo_ind) AS SELECT * FROM date_dim;SELECT       order_date,      request_delivery_date,      SUM(order_amount),      COUNT(*)  FROM      sales_order_fact a,      order_date_dim b,      request_delivery_date_dim c  WHERE      a.order_date_sk = b.order_date_sk          AND a.request_delivery_date_sk = c.request_delivery_date_sk  GROUP BY order_date , request_delivery_date  CLUSTER BY order_date , request_delivery_date;

        上面两个查询的结果相同,如下图所示:

基于Hadoop生态圈的数据仓库实践 —— 进阶技术(四)