餐饮开发组有一个需求,要将餐饮生产数据库的订单表和订单详情表的数据,做数据整合,数据抽取,整合以后抽取到BI 数据库,供BI项目开发做BI分析。
要求:
<1> BI项目组可以查询到的数据时段必须与生产订单库的时间最大间隔两个小时,也就是说 BI 项目组早晨11点必须要查询到早晨9点之前的所有数据, 中午13点必须要查询到中午11点之前的数据,依次类推。
<2> BI库存储的数据包含所有的历史数据,非当月订单数据不要求与主订单库是完全一致同步的,但是当月的数据必须每天同步到BI库与主库的订单数据保持一致的。
<3> 通过BI SQL 多表关联集和运算以后,可能会出现业务上的“重复性数据”,之前一直设计的是用select distinct,但是这样数据量很大的情况下,尤其在类似月结的时候,跑批 当月数据会特别慢,需要优化整合,保证同步到BI库的订单数据在业务上是无重复的。
我的设计思路:
1)抽取到BI库端的数据事实表设计成分区表CYERP_BASE,按月分区,BI程序就去查询这个表的数据。
2) 考虑到要 求<1>中的准实时性,如果直接在 CYERP_BASE 表上Truncate 最后一个月的分区,然后再insert CYERP_BASE select from xxxx ,那么一定在拉数据过程中,BI 项目组对这个表中当月数据的使用时不可以用的,这种方案肯定不行。那么我就利用普通表与分区表 可以做分区exchange的特性,由于非当月的历史数据 不用管,那么创建一张表结构与BI库分区表一样 的普通表CYERP_BASE_EXG,然后,先将数据拉去到CYERP_BASE_EXG中,拉入完成以后,我再将CYERP_BASE_EXG表和CYERP_BASE表最后一个分区做分区交换(partition exchange),由于分区交换是DDL操作,瞬间完成,那么就消除了拉取数据早晨的基 表CYERP_BASE不可用的情况。
3)定时任务我则配置成 :
8:00 从ERP生产数据库拉取数据 -----> CYERP_BASE_EXG表 这样能够保证CYERP_BASE_EXG表的数据是8:00之前的。
9:00 做分区交换,将CYERP_BASE_EXG的数据exchange到CYERP_BASE表的最后一个分区 这样就实现了BI 库 9点以后可以查询8点之前的业务数据。
做成定时任务,这两个存储过程的执行间隔设置为每小时一执行,先执行从ERP生产数据库拉取数据 -----> CYERP_BASE_EXG表。
下面是实施步骤:
<1>建表:
创建BI 程序使用的事实表 CYERP_BASE
create table CYERP_BASE ( created TIMESTAMP(6), prosku VARCHAR2(20), totalnum NUMBER(14,2), totalnums NUMBER(14,2), totalprice NUMBER(14,2), pronum VARCHAR2(20), proname VARCHAR2(200), specvalue VARCHAR2(200), procode VARCHAR2(20), unit VARCHAR2(20), price NUMBER(14,2), proarea VARCHAR2(200), province NUMBER(10), CITY NUMBER(10), area NUMBER(10), orgCode VARCHAR2(50) ) PARTITION BY RANGE (created) (PARTITION P201509 VALUES LESS THAN (TIMESTAMP '2015-10-01 00:00:00') , PARTITION P201510 VALUES LESS THAN (TIMESTAMP '2015-11-01 00:00:00') , PARTITION P201511 VALUES LESS THAN (TIMESTAMP '2015-12-01 00:00:00') , PARTITION P201512 VALUES LESS THAN (TIMESTAMP '2016-01-01 00:00:00') , PARTITION P201601 VALUES LESS THAN (TIMESTAMP '2016-02-01 00:00:00') , PARTITION P201602 VALUES LESS THAN (TIMESTAMP '2016-03-01 00:00:00') , PARTITION P201603 VALUES LESS THAN (TIMESTAMP '2016-04-01 00:00:00') , PARTITION P201604 VALUES LESS THAN (TIMESTAMP '2016-05-01 00:00:00') , PARTITION P201605 VALUES LESS THAN (TIMESTAMP '2016-06-01 00:00:00') , PARTITION P201606 VALUES LESS THAN (TIMESTAMP '2016-07-01 00:00:00') , PARTITION PMAX VALUES LESS THAN (MAXVALUE) ) ;
创建用于分区交换的表CYERP_BASE_EXG:
create table CYERP_BASE_EXG as select * from CYERP_BASE where 1=0;
<2> 还要攻克一个难点,就是我给定当前的一个日期,怎么获取当前日期对应到分区表的哪个分区中呢? 也就是说如何定位要被用作分区交换的分区? 例如:当前时间为2016 年4月3日,那么首先获取到4月3日这个月的第一天是4月1号,然后需要确定4月1号开始这个月的数据落在了CYER_BASE这个表的哪个分区中。
我的解决思路就是:
select a.table_name,a.partition_name,a.high_value from USER_TAB_PARTITIONS a
2016年4月的数据会落入到P201604分区里面,这个分区的的HIGH_VALE值为 TIMESTAMP' 2016-05-01 00:00:00'。 那么我就可以求出trunc(sysdate,'month') 当前月第一天的日期和 HIGH_VALE值 相比较,如果 某个分区HIGH_VALUE的值大于trunc(sysdate,'month') 就是符合条件的分区名,但是如果后面有新添加的空的预留分区,也会出现这些分区的 HIGH_VALUE的值大于trunc(sysdate,'month') 的情况,那么我就先order by 符合条件的分区升序排列,然后取第一个,肯定就是要交换的分区名字了,但是由于HIGH_VALUE的数据类型是CLOB 类型,必须有一个包和包体 将CLOB字段转成VARCHAR2字段。
<3> 创建CLOB---->VARCHAR2的转换包和包体
reate or replace package long_help authid current_user as function substr_of ( p_query in varchar2, p_from in number, p_for in number, p_name1 in varchar2 default NULL, p_bind1 in varchar2 default NULL, p_name2 in varchar2 default NULL, p_bind2 in varchar2 default NULL, p_name3 in varchar2 default NULL, p_bind3 in varchar2 default NULL, p_name4 in varchar2 default NULL, p_bind4 in varchar2 default NULL ) return varchar2; end; / create or replace package body long_help as g_cursor number := dbms_sql.open_cursor; g_query varchar2(32765); procedure bind_variable( p_name in varchar2, p_value in varchar2 ) is begin if ( p_name is not null ) then dbms_sql.bind_variable( g_cursor, p_name, p_value ); end if; end; function substr_of ( p_query in varchar2, p_from in number, p_for in number, p_name1 in varchar2 default NULL, p_bind1 in varchar2 default NULL, p_name2 in varchar2 default NULL, p_bind2 in varchar2 default NULL, p_name3 in varchar2 default NULL, p_bind3 in varchar2 default NULL, p_name4 in varchar2 default NULL, p_bind4 in varchar2 default NULL ) return varchar2 as l_buffer varchar2(4000); l_buffer_len number; begin if ( nvl(p_from,0) <= 0 ) then raise_application_error (-20002, 'From must be >= 1 (positive numbers)' ); end if; if ( nvl(p_for,0) not between 1 and 4000 ) then raise_application_error (-20003, 'For must be between 1 and 4000' ); end if; if ( p_query <> g_query or g_query is NULL ) then if ( upper(trim(nvl(p_query,'x'))) not like 'SELECT%') then raise_application_error (-20001, 'This must be a select only' ); end if; dbms_sql.parse( g_cursor, p_query, dbms_sql.native ); g_query := p_query; end if; bind_variable( p_name1, p_bind1 ); bind_variable( p_name2, p_bind2 ); bind_variable( p_name3, p_bind3 ); bind_variable( p_name4, p_bind4 ); dbms_sql.define_column_long(g_cursor, 1); if (dbms_sql.execute_and_fetch(g_cursor)>0) then dbms_sql.column_value_long (g_cursor, 1, p_for, p_from-1, l_buffer, l_buffer_len ); end if; return l_buffer; end substr_of; end; /
<4> 创建查询出分区名字的视图:
CREATE OR REPLACE VIEW YWS_PART_DATE AS SELECT "TABLE_NAME","PARTITION_NAME","HIGH_VALUE" FROM (SELECT TABLE_NAME, PARTITION_NAME, LONG_HELP.SUBSTR_OF('SELECT HIGH_VALUE FROM USER_TAB_PARTITIONS WHERE TABLE_NAME=:TABLE_NAME AND PARTITION_NAME=:PARTITION_NAME', 1, 4000, 'TABLE_NAME', TABLE_NAME, 'PARTITION_NAME', PARTITION_NAME) HIGH_VALUE FROM USER_TAB_PARTITIONS) where table_name = 'CYERP_BASE';
<5> 建立从BI库到到cyerp 库的DB LINK 用于跨库拉取数据:
create public database link XEDG connect to cyerp identified by pwcyerp using 'xedg';
<6> 同步所有基础数据(略)
<7> 创建同步到CYERP_BASE_EXG表的存储过程
create or replace procedure cyerptobi_exg as current_partition varchar2(4000); v_sql varchar2(4000); begin v_sql:='truncate table CYERP_BASE_EXG'; execute immediate v_sql; insert into /*+ append */ CYERP_BASE_EXG select created, prosku, t.totalnum, t.totalnums, t.totalprice, t.pronum, t.proname, t.specvalue, t.procode, t.unit, t.price, t.proarea, t.province, t.city, t.area, t.orgCode from (SELECT item.create_date created, item.pro_sku prosku, (SELECT SUM(it.NORM_WEIGHT) FROM cy_order_item@xedg it WHERE it.pro_sku = item.pro_sku AND it.pro_num = item.pro_num AND it.price = item.price AND it.create_date >= to_timestamp(to_char(trunc(sysdate, 'month'), 'yyyy-mm-dd hh24:mi:ss'), 'yyyy-MM-dd HH24:MI:ss') AND it.create_date <= to_timestamp(to_char(sysdate, 'yyyy-mm-dd hh24:mi:ss'), 'yyyy-MM-dd HH24:MI:ss')) totalnum, (SELECT SUM(it.nums) FROM cy_order_item@xedg it WHERE it.pro_sku = item.pro_sku AND it.pro_num = item.pro_num AND it.price = item.price AND it.create_date >= to_timestamp(to_char(trunc(sysdate, 'month'), 'yyyy-mm-dd hh24:mi:ss'), 'yyyy-MM-dd HH24:MI:ss') AND it.create_date <= to_timestamp(to_char(sysdate, 'yyyy-mm-dd hh24:mi:ss'), 'yyyy-MM-dd HH24:MI:ss')) totalnums, (SELECT SUM(it.total_price) FROM cy_order_item@xedg it WHERE it.pro_sku = item.pro_sku AND it.pro_num = item.pro_num AND it.price = item.price AND it.create_date >= to_timestamp(to_char(trunc(sysdate, 'month'), 'yyyy-mm-dd hh24:mi:ss'), 'yyyy-MM-dd HH24:MI:ss') AND it.create_date <= to_timestamp(to_char(sysdate, 'yyyy-mm-dd hh24:mi:ss'), 'yyyy-MM-dd HH24:MI:ss')) totalprice, item.pro_num pronum, item.pro_name proname, item.spec_value specvalue, item.pro_code procode, item.unit, item.price, info.ca_province province, info.ca_city city, info.ca_area area, item.org_code orgCode, getAreaName(info.PRO_SALE_AREA) proarea FROM cy_order_item@xedg item LEFT JOIN cy_order_info@xedg info ON info.order_id = item.order_id WHERE item.pro_type ! = 2 AND info.status ! = 4 AND item.create_date >= to_timestamp(to_char(trunc(sysdate, 'month'), 'yyyy-mm-dd hh24:mi:ss'), 'yyyy-MM-dd HH24:MI:ss') AND item.create_date <= to_timestamp(to_char(sysdate, 'yyyy-mm-dd hh24:mi:ss'), 'yyyy-MM-dd HH24:MI:ss')) t; commit; end; /
<8> 创建用于分区交换的存储过程:CYERP_BASE_EXG----->CYERP_BASE表
create or replace procedure cyerptobi_daily as current_partition varchar2(4000); v_sql1 varchar2(4000); v_sql2 varchar2(4000); begin select PARTITION_NAME into current_partition from yws_part_date where to_date(substr(HIGH_VALUE,11,20),'yyyy-mm-dd hh24:mi:ss') > trunc(sysdate,'month') and rownum=1 order by to_date(substr(HIGH_VALUE,11,20),'yyyy-mm-dd hh24:mi:ss'); v_sql1:='alter table CYERP_BASE drop unused columns'; v_sql2:='alter table CYERP_BASE exchange partition '||current_partition||' with table CYERP_BASE_EXG'; execute immediate v_sql1; execute immediate v_sql2; delete CYERP_BASE where rowid in (select rid from (select rowid rid, row_number() over(partition by PROSKU,PROAREA,PRICE ORDER by created desc) rn from CYERP_BASE) where rn > 1); commit; end; /
最后一步的delete 是考虑到要求3,要根据PROSKU,PROAREA,PRICE这三列去重,即要求是 如果有多行有相同PROSKU,PROAREA,PRICE,那么只保留CREATED 创建时间最靠的那一行数据。
以下是创建JOB 去定时跑这两个存储过程:
begin sys.dbms_scheduler.create_job(job_name => 'SUPER_BI.ETL_DATA', job_type => 'STORED_PROCEDURE', job_action => '"SUPER_BI"."CYERPTOBI_EXG"', start_date => to_date('25-12-2015 21:00:00', 'dd-mm-yyyy hh24:mi:ss'), repeat_interval => 'Freq=HOURLY;Interval=2', end_date => to_date(null), job_class => 'DEFAULT_JOB_CLASS', enabled => true, auto_drop => false, comments => ''); end; /
begin sys.dbms_scheduler.create_job(job_name => 'SUPER_BI.EXCHANGE_PART', job_type => 'STORED_PROCEDURE', job_action => '"SUPER_BI"."CYERPTOBI_DAILY"', start_date => to_date('25-12-2015 22:00:00', 'dd-mm-yyyy hh24:mi:ss'), repeat_interval => 'Freq=HOURLY;Interval=2', end_date => to_date(null), job_class => 'DEFAULT_JOB_CLASS', enabled => true, auto_drop => false, comments => ''); end; /