IBM InfoSphere DataStage 8.1 DataStage Job 开发具体解释

时间:2021-04-21 21:15:22

简单介绍

DataStage 使用了 Client-Server 架构,server端存储全部的项目和元数据,client DataStage Designer 为整个 ETL 过程提供了一个图形化的开发环境。用所见即所得的方式设计数据的抽取清洗转换整合和载入的过程。Datastage 的可执行单元是 Datastage Job ,用户在 Designer 中对 Datastage Job 的进行设计和开发。

Datastage 中的 Job 分为 Server Job, Parallel Job 和 Mainframe Job
。当中 Mainframe Job 专供大型机上用。经常使用到的 Job 为 Server Job 和 Parallel Job 。本文将介绍怎样使用 Server Job 和 Parallel Job 进行 ETL 开发。

Server Job

一个 Job 就是一个 Datastage 的可执行单元。Server Job 是最简单经常使用的 Job 类型,它使用拖拽的方式将主要的设计单元 -Stage 拖拽到工作区中。并通过连线的方式代表数据的流向。通过 Server Job,能够实现下面功能。

  1. 定义数据怎样抽取
  2. 定义数据流程
  3. 定义数据的集合
  4. 定义数据的转换
  5. 定义数据的约束条件
  6. 定义数据的聚载
  7. 定义数据的写入

Parallel Job

Server Job 简单而强大,适合高速开发 ETL 流程。Parallel Job 与 Server Job 的不同点在于其提供了并行机制,在支持多节点的情况下能够迅速提高数据处理效率。Parallel Job 中包括很多其它的 Stage 并用于不同的需求。每种 Stage 使用上的限制也往往大于 Server Job。

Sequence Job

Sequence Job 用于 Job 之间的协同控制,使用图形化的方式来将多个 Job 汇集在一起,并指定了 Job 之间的运行顺序。逻辑关系和出错处理等。

数据源的连接

DataStage 可以直接连接许多的数据源,应用范围很大,可连接的数据源包含:

  • 文本文件
  • XML 文件
  • 企业应用程序。比方 SAP 、PeopleSoft 、Siebel 、Oracle Application
  • 差点儿全部的数据库系统,比方 DB2 、Oracle 、SQL Server 、Sybase ASE/IQ 、Teradata 、Informix 以及可通过 ODBC 连接的数据库等
  • Web Services
  • SAS 、WebSphere MQ

回页首

Server Job

Server Job 中的 Stage 综述

Stage 是构成 Datastage Job 的基本元素。在 Server Job 中。Stage 可分为下面五种:

  1. General
  2. Database
  3. File
  4. Processing
  5. Real Time

本节中将介绍怎样使用 Datastage 开发一个 Server Job。如图 1 所看到的:

图 1. Server Job

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

Sequential File Stage

Sequential File Stage 可用来从一个 Sequential 文件里获取源数据或将数据载入到一个 Sequential 文件里。在使用 Sequential File Stage 时须要指定文件的路径和名称,文件的格式,列的定义和文件写入的类型(覆盖或追加)。

图 2. Sequential File 属性框

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

图 3. Sequential File 列定义

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

上图是本节样例中使用到的 Sequence File。

在 Input 页中。File Name 參数代表文件的实际路径,假设文件不存在将会被自己主动建立。Update Action 中选择 Overwrite existing file 表示此文件在载入数据之前将被清空;在 Format 页中,定义文件的格式,比如分隔符,NULL 值,首行是否为列定义等;在 Column 页中,须要输入文件的列定义。

Hash File Stage

Hash File 以主键将记录分成一个或多个部分的文件,在 Datastage 中通常被用做參考查找。

在进行參考查找的时候,Hash File 文件会被载入到内存中,因此具有较高的查找效率。

和 Sequence File 类似,使用 Hash File 时须要输入文件的实际地址。通过參数设置写入时的选项,并提供数据的列定义。须要注意的是,Hash File 须要指定主键。假设未指定,第一列被默觉得主键。进行參数查找时,使用主键值在 Hash File 中搜索。假设找到则返回该数据。假设未找到则返回 NULL 值。

图 4. Hash File 属性框

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

Transformer Stage

Transformer Stage 是一个重要的,功能强大的 Stage。它负责 ETL 过程中的数据转换操作。在 Transformer Stage 中能够指定数据的来源和目的地,匹配相应输入字段和输出字段,并指定转换规则和约束条件。

图 5. Transformer Stage 列映射

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

Transformer Stage 中分为 5 个区域:

左上方区域,是用表格形式描写叙述的输入数据信息。

假设有多条输入数据流。则有非常多表格。

本例中有一个输入,一个參照查询,因此左上方有两个表格。

右上方区域。是用表格形式描写叙述的输出信息。

左下方区域为输入的元数据列定义,包含列名。类型和长度等属性。

右下方区域为输出的元数据列定义,包含列名,类型和长度等属性。

左上方和右上方的表格由带有流向的箭头连接,代表了字段的相应关系。

此例中,输入的数据仅仅有一个字段 EMPLOYEE_ID。通过此字段在 Hash File 中进行參照查找。获取 EMPLOYEE_NAME 字段。假设在 Hash File 中找到了 EMPLOYEE_NAME 则将数据发送到输出端,这个条件是通过 Transformer Stage 提高的约束功能实现。我们在约束中的定义为 NOT(ISNULL(lkp_name.EMPLOYEE_ID))。另外不管是否在 Hash File 中查找到相应的数据,我们都将数据记录到一个
csv 文件里,即相应的 save_all 输出。

回页首

Parallel Job

Parallel Job 的 Stage 综述

与 Server job 相比。Parallel Job 提供了更丰富的 stage。添加了 Development/Debug,Restructure 和 Transactional 类的 stage。同一时候。对于一些在 server job 中能够在 transformer 中完毕的功能,Parallel job 也提供了专用的 stage 以提高执行性能和开发效率,比方 lookup。join,Compare 等。另外一个显著的差别是在 Parallel Job 中内置地支持 job 的并行执行,并行执行也就意味着数据在
job 中的各个 stage 见处理时须要处理 partition 和 combination 的问题,所以在开发 job 时,我们须要设定 partition 和 combination 的策略。

Lookup DataSet 与 Lookup   Stage

Parallel Job 对 lookup 的实现做了一些调整,在 Server Job 中。我们通常是用 Transformer Stage 配合 lookup 数据源(通常是 hash 文件)来实现 lookup,同一个 transformer 中能够同一时候完毕多个 lookup。类似于 sql 中的多表自然联接,假设 lookup 数据源使用的是 database stage 而不是 hash file 并且对于一条记录返回多条 lookup data 的话。job 会产生 warning(hash
file 的键唯一特性使得它不会存在这个问题,后面插入的反复数据会覆盖前面的同主键的数据)。

而在 Parallel Job 中,lookup 须要用一个单独的 stage 来实现。transformer 不再兼职 lookup 的“副业”。在一个 lookup stage 中,能够有一个主数据 link 和多个 lookup link。同一时候,Parallel 中的 lookup 还有下面的新特性

  • 支持 multi rows,在一个 lookup stage 中对于一行主输入数据能够有一个 lookup link 返回多于一行的 lookup 数据。

    结果也会变成多行。

  • Parallel 中不在支持 hash file,转而使用封装更强的 Data Set stage, Data Set 本质上也是 hash 数据结构,但对 Job 开发者隐藏了实现细节,我们不用象开发 Server Job 那样去手动设定具体參数
  • Parallel 中除了支持等值 lookup 外,还直接支持 Range lookup 和 Caseless lookup。这样我们在完毕类似月份转换为季度性质的设计时就会很的方便和自然。

类似于 Server Job 中的 hash 文件,在 Parallel Job 中我们使用 Data Set 文件来缓存 lookup 数据,并载入到内存中,在 Data Set stage 中。我们仅仅须要制定记录的主键和存储的文件名称,Parallel 引擎会为我们处理其它的操作。

但为了达到性能的最优化,我们有时须要制定 Data Set 的缓存策略和缓存大小,系统默认的缓存大小是 3M。假设我们的 lookup 数据比較大。就须要设定合适的缓存大小。否则会严重影响 lookup 的性能。

图 6. DataSet 缓存设置

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

Sort Stage

Parallel Sort stage 的行为类似于 Sql 中的 order by,可是比 order by 提供了很多其它的选项。

在 job 中,Sort stage 接收一个输入 link 并产生一个输出 link。对于写过 sql order by 或者排序程序的开发者使用 Sort Stage 的基本功能应该是非常easy的,可是要充分发挥 Parallel stage 的强大功能。我们还是须要注意以下几点:

  • 并行还是串行运行,假设选择串行运行。那么 Sort stage 的行为就类似于 Server Job 中的 Sort Stage,整个输入数据都会依照设定的排序选项排序,但假设选择分区 / 并行排序。则仅仅有每一个分区内的输出是有序的,这在有些情况下是能够接受的。但在另外一些情况下会导致代码缺陷,须要依据 sort 的兴许操作做出选择。
  • 假设有可能。尽量在数据源端的数据库中进行排序,这样不但会提高数据排序的效率,还能大大降低 job 对内存。I/O 的压力。Sort stage 仅仅有在接收完输入之后才干完毕排序,进而输出数据,使得 job 的兴许 stage 都处于等待状态。
  • 类似于 order by 后面的字段列表,我们能够指定排序的方向,是升序还是降序,Sort Stage 也能够指定对多个字段进行排序,排在前面的 column 称为主排序字段,假设排序字段中有某一个或几个字段已经是有序的,我么也能够指定其为有序,这样在排序的时候就能够提高排序的效率。
  • 稳定排序(stable sort)/ 同意反复。stable sort 默认是 yes,这样假设两条记录 sort key 同样的话,排序的输出和输入顺序将是同样的,假设没有选择同意反复。两条或者多条记录的 sort key 同样的话。将仅仅保留一条记录。
  • 限制内存的使用,数据的排序操作是很耗费内存的,假设不加限制。让全部的数据的排序都在内存中完毕的话,job 的其它操作或者其它 job 的运行的效率将受到严重影响,全部在 Sort Stage 中。我们能够设定此排序能够使用的最大内存数(M),这样我们在能够接受的排序效率和使用的内存数量之间找到平衡点。

Compare/Difference/Change Capture Stage

Compare, Difference 和 Change Capture Stage 是 Parallel job 中三个用于比較数据集合异同的 stage,对于这三个 stage 本身的使用没有太多困难的地方,主要的參数和设置都非常简明直观,我们的介绍主要集中在这三个 stage 在使用中的同样点和不同点上,一旦了解了这些 stage 的特点。使用的时候不但能依据需求选择正确的 stage,也能依据 stage 特性知道须要设置哪些參数。

同样点:

  • 都有两个输入,产生一个输出,
  • 输入的数据主键的字段名同样,都须要指定须要比較的字段。
  • 产生的结果数据中都会添加一个整型的结果字段,用于表示两行数据的比較结果

不同点:

  • Capture Change Stage 输出的是以 after 输入流为基础,外加 change code 字段,适合和 change apply 配合使用,把 before 输入流同步为和 after 一样。
  • Difference Stage 的输出是以 before 输入流为基础,外加 change code 字段
  • Compare Stage 产生的结果包含 before 和 after。以及 change code 字段

以下是一个 Capture Change Stage 的演示样例:

图 7. Capture Change Stage 的演示样例

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

Before source Sql: SELECT k,v
FROM (values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10))
as temp(k,v) order by k asc After source Sql: SELECT k,v
FROM (values (1,1),(2,2),(11,11),(4,5),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10))
as temp(k,v) order by k asc
图 8. Capture Change Stage 參数设置

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

从以上设置能够看到。我们选择了明白指定主键。剩余 column 都当作 value,对于比較的结果,假设结果同样。则从结果中删除,也就是我们仅仅希望看到对 Before 数据做 Delete。Edit,和 insert 后产生的差异,下图是我们 job 执行得到的结果:

图 9. Comparsion 结果

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

从以上的结果能够看到。before 和 after 数据有三处差异。change_code 的值相应为 2,3。1。分别表示对 before 运行 Delete,Update。Insert 产生的差异。要同步这些差异,我们仅仅须要对 before 数据运行相应的 Delete,Update 和 Insert 就可以实现两个数据集合的同步。

Filter Stage

Filter Stage 顾名思义是一个用于过滤的 Stage,其作用类似于我们写 sql 中的 where 子句,并且其支持的逻辑表达式和运算符也类似于 sql 语句的 where 子句,比方,在 filter stage 中,我们能够使用下面常见的逻辑表达式和运算符以及其组合 ,

  • true 和 false
  • 六个比較运算符 : =, <>, <, >, <=, >=
  • is null 和 is not null
  • like 和 between

从其语法作用上看。则类似于 java 或者 C 语言中的 switch case 语句。我们能够通过设置“Output Row Only Once”选项的值来决定是否在每一个 case when 子句后面是否加入 break。通过加入或者删除“Reject Link”来确定是否加入一个 default 子句 . 以下是一个简单的样例。

展示了我们怎样通过员工编号和薪水的组合条件来过滤员工的记录到不同的结果文件的。

图 10. Filter Stage 的演示样例

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

图 11. Filter Stage 的设置

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

对于每个 where 条件。我们须要设置相应的输出链接。这是一个整型数字,我们能够在“Link Ordering”页签上找到输出链接的编号和名称之间的相应关系。

另外须要注意的一点是,Filter Stage 不正确输入的记录做不论什么修改,仅仅做分发。可是你能够手动设置输出的 column,使得每一个输出的 column 列表不一样,但仅仅要是输入 column 列表的子集就可以。可是对于 Reject Link, Column 列表是默认全然等同于输入的,且不可更改。

用于调试的 Stages

我们知道 DataStage Server Job 中提供了 Debug 功能,我们在开发过程中遇到问题的时候能够让 Job 执行在 debug 模式下。细致查看每行数据在 Job 中各个 Stage 之间的流动和转换情况,但 Parallel Job 并没有给我们提供调试功能。但 Parallel Job 用第二种方式提供了调试的能力:Parallel Job 内置了用于调试的 Stage,使用这些 Stage,我们能够依照我们的须要,把我们怀疑有问题的中间数据输出,进而能够深入查找问题的根源。在
Parallel Job 中提供了下面的 Stage 用于调试:

  • Head Stage
  • Tail Stage
  • Sample Stage
  • Peek Stage
  • Row Generator Stage
  • Column Generator Stage

我们以一个 peek 的实例展示一下 development/debug Stage 的使用。其它的 Stage 的使用方法类似,能够參见后面的表格和文档。例如以下图是我们的 Job。DB2 Stage 的 Source Sql 例如以下:

 SELECT k,v FROM (values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),
(10,10),(11,11),(12,12),(13,13),(14,14),(15,15),(16,16),(17,17),(18,18),(19,19),(20,20) )
as temp(k,v)
图 12. Peek Job 的演示样例

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

图 13. Peek Stage 的设置

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

图 14. Peek 的结果

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

下表简述了这些 Stage 的特点和使用方法

表 1. Stage 的特点和使用方法
Stage 类型 用途 可设置项目
Head Stage 从头部開始抓取输入流的数据

一个输入一个输出
抓取的行数

从哪些分区抓取

每一个分区的起始位置

每次抓取的间隔
Tail Stage 抓取输入流尾部的 N 行数据

一个输入一个输出
抓取的行数

从哪些分区抓取
Sample Stage 一个输入,多个输出依据设置的策略从输入流的各个分区抓取数据,每一个输出流有不同的百分比设置 百分比

随机数的种子

每一个分区抓取的最多行数
Peek Stage 从一个数据流中有选择地“偷窥”流经的数据。一个输入。两个输出,一个输出原样输出输入数据,一个输出生成一个文本,包括“偷窥到的数据” 每次抓取的间隔

每一个分区抓取的行数

“偷窥”哪些行

输出“偷窥”结果到 log 还是输出
Row Generator Stage 依据定义的数据 schema,生成模拟数据,无需输入,一个输出 Schema(column list 或者 schema file)

生成的行数
Column Generator Stage 在一个输入流上加入新的 column,为这些新加入的或原有的 column 生成模拟数据 须要生成模拟数据的 column

回页首

Sequence Job

假设说每一个个 Server Job 或 Parallel Job 完毕 ETL 流程中的一个数据抽取,转换。载入的子过程的话,那么 Sequence Job 的作用就是把这些子过程给串联起来。形成一个完整的全局的 ETL 过程。从结构上看,一个 Sequence Job 类似于一个 C 或者 Java 语言的一个函数。但功能更为强大。

  • 能够使用 UserVariables Activity Stage 定义局部变量。变量在定义的时候须要赋值,赋值表达式能够是系统变量,Job 參数,Datastage 的宏。常量,Routine 的返回结果等。还能够是这些单独变量的条件。数学或者字符串运算后的结果。差点儿你在一个函数中能完毕的局部变量定义的功能这儿都能实现。以下的演示样例定义了六个变量。
  • 能够调用其它的功能模块,通过 Job Activity Stage 能够调用 Server Job,Parallel Job;通过 Execute Command Stage 调用 unix/windows cmd。通过 Routine Activity 支持调用 datastage routine。
  • 支持循环,Sequence Job 通过 StartLoop Activity Stage 和 EndLoop Activity Stage 提供了循环的功能。循环变量能够是基于起始值,结束值和步长的整数循环,也能够基于给定的列表进行循环。还能够把这些循环中的暂时变量传递给每一个详细的循环步骤。

    在 StartLoop Activity Stage 和 EndLoop Activity Stage 之间。能够增加随意多个的

  • 支持逻辑运算。Nested Condition Stage 支持类似 switch 的逻辑,Sequencer Stage 支持与和或的逻辑运算,通过这些 Stage 的组合,能够支持随意复杂的逻辑控制。
  • 支持 email 通知,使用 Notification Stage,在 job 执行成功,失败或者满足其它设定条件时,Sequence Job 能够发送一封或者多封的通知邮件,使我们能够更方便地监控 Job 的执行状态。邮件的内容能够包括 job 的执行状态,当前的參数等等,凡是能够在 User Variables Stage 中引用的变量都能够包括在邮件中。同一时候还能够包括我们指定的文件和 Sequence Job 的执行状态等。
  • 支持错误处理和现场清理,使用 Terminator Activity Stage 和 Exception Handler Stage。我们能够定义须要处理的错误,并在发生错误的使用依据定义的策略停止不必要的 Job 执行。
  • 通过 Wait for File Activity Stage 能够支持等待时间。我们能够定义仅仅有某个信号文件出现或者消失的时候才開始启动 Wait for File Activity Stage 兴许的执行。

以下的图展示了一个简单的 Sequence Job。在 Job 的開始,我们定义一组变量。这些变量在我们循环和发送通知邮件的时候将会被引用。然后利用 Job 參数 yearlist 開始循环,每个循环里面我们调用一次 Job extract_trans, 假设这个 job 调用执行不成功,我们就发邮件通知 Job 执行失败,否则进入下一个循环,在循环结束后,发邮件通知 Sequence Job 执行成功

图 15. Job 全景

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

图 16. 定义的变量

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

图 17. StartLoop Stage 的定义

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释

图 18. Job Activity Triggers 定义

IBM InfoSphere DataStage 8.1  DataStage Job 开发具体解释