设计案例3----利用存储过程和JOB 设计从餐饮ERP数据库将数据抽取、数据清洗到BI数据库

时间:2022-07-21 16:43:30

     餐饮开发组有一个需求,要将餐饮生产数据库的订单表和订单详情表的数据,做数据整合,数据抽取,整合以后抽取到BI 数据库,供BI项目开发做BI分析。

     要求:

<1>  BI项目组可以查询到的数据时段必须与生产订单库的时间最大间隔两个小时,也就是说 BI 项目组早晨11点必须要查询到早晨9点之前的所有数据, 中午13点必须要查询到中午11点之前的数据,依次类推。

      

<2>  BI库存储的数据包含所有的历史数据,非当月订单数据不要求与主订单库是完全一致同步的,但是当月的数据必须每天同步到BI库与主库的订单数据保持一致的。

    

<3>   通过BI SQL 多表关联集和运算以后,可能会出现业务上的“重复性数据”,之前一直设计的是用select distinct,但是这样数据量很大的情况下,尤其在类似月结的时候,跑批 当月数据会特别慢,需要优化整合,保证同步到BI库的订单数据在业务上是无重复的。


我的设计思路:

1)抽取到BI库端的数据事实表设计成分区表CYERP_BASE,按月分区,BI程序就去查询这个表的数据。

2) 考虑到要 求<1>中的准实时性,如果直接在 CYERP_BASE 表上Truncate 最后一个月的分区,然后再insert  CYERP_BASE  select from xxxx ,那么一定在拉数据过程中,BI 项目组对这个表中当月数据的使用时不可以用的,这种方案肯定不行。那么我就利用普通表与分区表 可以做分区exchange的特性,由于非当月的历史数据 不用管,那么创建一张表结构与BI库分区表一样 的普通表CYERP_BASE_EXG,然后,先将数据拉去到CYERP_BASE_EXG中,拉入完成以后,我再将CYERP_BASE_EXG表和CYERP_BASE表最后一个分区做分区交换(partition exchange),由于分区交换是DDL操作,瞬间完成,那么就消除了拉取数据早晨的基 表CYERP_BASE不可用的情况。

3)定时任务我则配置成 :

8:00  从ERP生产数据库拉取数据 -----> CYERP_BASE_EXG表  这样能够保证CYERP_BASE_EXG表的数据是8:00之前的。

9:00  做分区交换,将CYERP_BASE_EXG的数据exchange到CYERP_BASE表的最后一个分区   这样就实现了BI 库 9点以后可以查询8点之前的业务数据。

做成定时任务,这两个存储过程的执行间隔设置为每小时一执行,先执行从ERP生产数据库拉取数据 -----> CYERP_BASE_EXG表。


下面是实施步骤:

<1>建表:

  创建BI 程序使用的事实表   CYERP_BASE

create table CYERP_BASE
(
  created    TIMESTAMP(6),
  prosku     VARCHAR2(20),
  totalnum   NUMBER(14,2),
  totalnums  NUMBER(14,2),
  totalprice NUMBER(14,2),
  pronum     VARCHAR2(20),
  proname    VARCHAR2(200),
  specvalue  VARCHAR2(200),
  procode    VARCHAR2(20),
  unit       VARCHAR2(20),
  price      NUMBER(14,2),
  proarea    VARCHAR2(200),
  province    NUMBER(10),
  CITY        NUMBER(10),
  area        NUMBER(10),
  orgCode     VARCHAR2(50) 
)
 PARTITION BY RANGE (created)
 (PARTITION P201509  VALUES LESS THAN (TIMESTAMP '2015-10-01 00:00:00') , 
 PARTITION P201510  VALUES LESS THAN (TIMESTAMP '2015-11-01 00:00:00') , 
 PARTITION P201511  VALUES LESS THAN (TIMESTAMP '2015-12-01 00:00:00') , 
 PARTITION P201512  VALUES LESS THAN (TIMESTAMP '2016-01-01 00:00:00') , 
 PARTITION P201601  VALUES LESS THAN (TIMESTAMP '2016-02-01 00:00:00') , 
 PARTITION P201602  VALUES LESS THAN (TIMESTAMP '2016-03-01 00:00:00') , 
 PARTITION P201603  VALUES LESS THAN (TIMESTAMP '2016-04-01 00:00:00') , 
 PARTITION P201604  VALUES LESS THAN (TIMESTAMP '2016-05-01 00:00:00') , 
 PARTITION P201605  VALUES LESS THAN (TIMESTAMP '2016-06-01 00:00:00') , 
 PARTITION P201606  VALUES LESS THAN (TIMESTAMP '2016-07-01 00:00:00') , 
 PARTITION PMAX  VALUES LESS THAN (MAXVALUE) ) ;

 创建用于分区交换的表CYERP_BASE_EXG:

 

 create table CYERP_BASE_EXG as select * from CYERP_BASE where 1=0;

<2>  还要攻克一个难点,就是我给定当前的一个日期,怎么获取当前日期对应到分区表的哪个分区中呢? 也就是说如何定位要被用作分区交换的分区? 例如:当前时间为2016           年4月3日,那么首先获取到4月3日这个月的第一天是4月1号,然后需要确定4月1号开始这个月的数据落在了CYER_BASE这个表的哪个分区中。

我的解决思路就是:

select a.table_name,a.partition_name,a.high_value from USER_TAB_PARTITIONS a
设计案例3----利用存储过程和JOB 设计从餐饮ERP数据库将数据抽取、数据清洗到BI数据库


2016年4月的数据会落入到P201604分区里面,这个分区的的HIGH_VALE值为 TIMESTAMP' 2016-05-01 00:00:00'。 那么我就可以求出trunc(sysdate,'month') 当前月第一天的日期和 HIGH_VALE值 相比较,如果 某个分区HIGH_VALUE的值大于trunc(sysdate,'month')  就是符合条件的分区名,但是如果后面有新添加的空的预留分区,也会出现这些分区的 HIGH_VALUE的值大于trunc(sysdate,'month') 的情况,那么我就先order by 符合条件的分区升序排列,然后取第一个,肯定就是要交换的分区名字了,但是由于HIGH_VALUE的数据类型是CLOB 类型,必须有一个包和包体 将CLOB字段转成VARCHAR2字段。


<3>  创建CLOB---->VARCHAR2的转换包和包体

 

reate or replace package long_help
authid current_user
as
function substr_of
( p_query in varchar2,
p_from  in number,
p_for   in number,
p_name1 in varchar2 default NULL,
p_bind1 in varchar2 default NULL,
p_name2 in varchar2 default NULL,
p_bind2 in varchar2 default NULL,
p_name3 in varchar2 default NULL,
p_bind3 in varchar2 default NULL,
p_name4 in varchar2 default NULL,
p_bind4 in varchar2 default NULL )
return varchar2;
end;
/


create or replace package body long_help
as
    g_cursor number := dbms_sql.open_cursor;
    g_query  varchar2(32765);
procedure bind_variable( p_name in varchar2, p_value in varchar2 )
is
begin
    if ( p_name is not null )
    then
        dbms_sql.bind_variable( g_cursor, p_name, p_value );
    end if;
end;
 
function substr_of
( p_query in varchar2,
  p_from  in number,
  p_for   in number,
  p_name1 in varchar2 default NULL,
  p_bind1 in varchar2 default NULL,
  p_name2 in varchar2 default NULL,
  p_bind2 in varchar2 default NULL,
  p_name3 in varchar2 default NULL,
  p_bind3 in varchar2 default NULL,
  p_name4 in varchar2 default NULL,
  p_bind4 in varchar2 default NULL )
return varchar2
as
    l_buffer       varchar2(4000);
    l_buffer_len   number;
begin
    if ( nvl(p_from,0) <= 0 )
    then
        raise_application_error
        (-20002, 'From must be >= 1 (positive numbers)' );
    end if;
    if ( nvl(p_for,0) not between 1 and 4000 )
    then
        raise_application_error
        (-20003, 'For must be between 1 and 4000' );
    end if;
    if ( p_query <> g_query or g_query is NULL )
    then
        if ( upper(trim(nvl(p_query,'x'))) not like 'SELECT%')
        then
            raise_application_error
            (-20001, 'This must be a select only' );
        end if;
        dbms_sql.parse( g_cursor, p_query, dbms_sql.native );
        g_query := p_query;
    end if;
    bind_variable( p_name1, p_bind1 );
    bind_variable( p_name2, p_bind2 );
    bind_variable( p_name3, p_bind3 );
    bind_variable( p_name4, p_bind4 );
    dbms_sql.define_column_long(g_cursor, 1);
    if (dbms_sql.execute_and_fetch(g_cursor)>0)
    then
        dbms_sql.column_value_long
        (g_cursor, 1, p_for, p_from-1,
         l_buffer, l_buffer_len );
    end if;
    return l_buffer;
end substr_of;
end;
/

  <4> 创建查询出分区名字的视图:

CREATE OR REPLACE VIEW YWS_PART_DATE AS
SELECT "TABLE_NAME","PARTITION_NAME","HIGH_VALUE"
  FROM (SELECT
                TABLE_NAME,
                PARTITION_NAME,
                LONG_HELP.SUBSTR_OF('SELECT HIGH_VALUE
FROM   USER_TAB_PARTITIONS WHERE  TABLE_NAME=:TABLE_NAME
AND PARTITION_NAME=:PARTITION_NAME',
                                     1,
                                     4000,
                                     'TABLE_NAME',
                                     TABLE_NAME,
                                     'PARTITION_NAME',
                                     PARTITION_NAME) HIGH_VALUE
           FROM USER_TAB_PARTITIONS)
 where table_name = 'CYERP_BASE';

<5> 建立从BI库到到cyerp 库的DB LINK 用于跨库拉取数据:

create public database link XEDG
  connect to cyerp identified by pwcyerp
  using 'xedg';

<6> 同步所有基础数据(略)

<7>  创建同步到CYERP_BASE_EXG表的存储过程

create or replace procedure cyerptobi_exg as
current_partition varchar2(4000);
v_sql  varchar2(4000);
begin
v_sql:='truncate table CYERP_BASE_EXG';
execute immediate v_sql;
insert into /*+ append */
CYERP_BASE_EXG
  select created,
         prosku,
         t.totalnum,
         t.totalnums,
         t.totalprice,
         t.pronum,
         t.proname,
         t.specvalue,
         t.procode,
         t.unit,
         t.price,
         t.proarea,
	 t.province,
         t.city,
         t.area,
         t.orgCode
    from (SELECT item.create_date created,
                 item.pro_sku prosku,
                 (SELECT SUM(it.NORM_WEIGHT)
                    FROM cy_order_item@xedg it
                   WHERE it.pro_sku = item.pro_sku
                     AND it.pro_num = item.pro_num
                     AND it.price = item.price
                     AND it.create_date >=
                         to_timestamp(to_char(trunc(sysdate, 'month'),
                                              'yyyy-mm-dd hh24:mi:ss'),
                                      'yyyy-MM-dd HH24:MI:ss')
                     AND it.create_date <=
                         to_timestamp(to_char(sysdate,
                                              'yyyy-mm-dd hh24:mi:ss'),
                                      'yyyy-MM-dd HH24:MI:ss')) totalnum,
                 (SELECT SUM(it.nums)
                    FROM cy_order_item@xedg it
                   WHERE it.pro_sku = item.pro_sku
                     AND it.pro_num = item.pro_num
                     AND it.price = item.price
                     AND it.create_date >=
                         to_timestamp(to_char(trunc(sysdate, 'month'),
                                              'yyyy-mm-dd hh24:mi:ss'),
                                      'yyyy-MM-dd HH24:MI:ss')
                     AND it.create_date <=
                         to_timestamp(to_char(sysdate,
                                              'yyyy-mm-dd hh24:mi:ss'),
                                      'yyyy-MM-dd HH24:MI:ss')) totalnums,
                 (SELECT SUM(it.total_price)
                    FROM cy_order_item@xedg it
                   WHERE it.pro_sku = item.pro_sku
                     AND it.pro_num = item.pro_num
                     AND it.price = item.price
                     AND it.create_date >=
                         to_timestamp(to_char(trunc(sysdate, 'month'),
                                              'yyyy-mm-dd hh24:mi:ss'),
                                      'yyyy-MM-dd HH24:MI:ss')
                     AND it.create_date <=
                         to_timestamp(to_char(sysdate,
                                              'yyyy-mm-dd hh24:mi:ss'),
                                      'yyyy-MM-dd HH24:MI:ss')) totalprice,
                 item.pro_num pronum,
                 item.pro_name proname,
                 item.spec_value specvalue,
                 item.pro_code procode,
                 item.unit,
                 item.price,
				info.ca_province province,
               info.ca_city city,
               info.ca_area area,
               item.org_code orgCode,
                 getAreaName(info.PRO_SALE_AREA) proarea
            FROM cy_order_item@xedg item
            LEFT JOIN cy_order_info@xedg info
              ON info.order_id = item.order_id
           WHERE item.pro_type ! = 2
             AND info.status
           ! = 4
             AND item.create_date >=
                 to_timestamp(to_char(trunc(sysdate, 'month'),
                                      'yyyy-mm-dd hh24:mi:ss'),
                              'yyyy-MM-dd HH24:MI:ss')
             AND item.create_date <=
                 to_timestamp(to_char(sysdate, 'yyyy-mm-dd hh24:mi:ss'),
                              'yyyy-MM-dd HH24:MI:ss')) t;
commit;			  
end;
/

<8>  创建用于分区交换的存储过程:CYERP_BASE_EXG----->CYERP_BASE表

create or replace procedure cyerptobi_daily as
current_partition varchar2(4000);
v_sql1  varchar2(4000);
v_sql2  varchar2(4000);
begin
select PARTITION_NAME  into current_partition from yws_part_date where to_date(substr(HIGH_VALUE,11,20),'yyyy-mm-dd hh24:mi:ss') > trunc(sysdate,'month')  and rownum=1 order by to_date(substr(HIGH_VALUE,11,20),'yyyy-mm-dd hh24:mi:ss');
v_sql1:='alter table CYERP_BASE  drop unused columns';
v_sql2:='alter table CYERP_BASE exchange partition '||current_partition||' with table CYERP_BASE_EXG';
execute immediate v_sql1;
execute immediate v_sql2;
delete CYERP_BASE
 where rowid in (select rid
                   from (select rowid rid,
                                row_number() over(partition by PROSKU,PROAREA,PRICE ORDER by created desc) rn
                           from CYERP_BASE)
                  where rn > 1);
commit;				  
end;
/

最后一步的delete 是考虑到要求3,要根据PROSKU,PROAREA,PRICE这三列去重,即要求是 如果有多行有相同PROSKU,PROAREA,PRICE,那么只保留CREATED  创建时间最靠的那一行数据。


以下是创建JOB 去定时跑这两个存储过程:

begin
  sys.dbms_scheduler.create_job(job_name            => 'SUPER_BI.ETL_DATA',
                                job_type            => 'STORED_PROCEDURE',
                                job_action          => '"SUPER_BI"."CYERPTOBI_EXG"',
                                start_date          => to_date('25-12-2015 21:00:00', 'dd-mm-yyyy hh24:mi:ss'),
                                repeat_interval     => 'Freq=HOURLY;Interval=2',
                                end_date            => to_date(null),
                                job_class           => 'DEFAULT_JOB_CLASS',
                                enabled             => true,
                                auto_drop           => false,
                                comments            => '');
end;
/

begin
  sys.dbms_scheduler.create_job(job_name            => 'SUPER_BI.EXCHANGE_PART',
                                job_type            => 'STORED_PROCEDURE',
                                job_action          => '"SUPER_BI"."CYERPTOBI_DAILY"',
                                start_date          => to_date('25-12-2015 22:00:00', 'dd-mm-yyyy hh24:mi:ss'),
                                repeat_interval     => 'Freq=HOURLY;Interval=2',
                                end_date            => to_date(null),
                                job_class           => 'DEFAULT_JOB_CLASS',
                                enabled             => true,
                                auto_drop           => false,
                                comments            => '');
end;
/