本系列源于对commons-pipeline 使用的学习:
首先是:翻译官方文档
本文是针对使用 Apache Commons Pipeline工作流程框架的基础介绍,本文档目标读者为需要组装现有stages或编写自己的stages的开发人员。该项目提供了一个Java类库旨在提升使用和重用模块化的stage的易用性。
一、Pipeline结构:
- Stages:stages在Pipeline中代表来处理数据所需的逻辑单元。每一个stage代表一个高层次的处理概念:如查找文件,读取文件格式,从数据计算产品,或者将数据写入到数据库。使用工作流架构并且构建处理单元到Stages的主要优点是提升Stages在其他Pipeline的可重用性。
- Pipeline:一个Pipeline由stages构建而成,这些stages可以将数据传递给后续的stages。上图中箭头所标记的“EMIT”表明一个stage的数据输出被传递到下一个stage。从代码层面看,有一个 EMIT() 方法将数据发送到下一个stage。数据流开始于左侧,在那里有一个标为“FEED”的箭头。FEED通常通过一个配置文件开始一个Pipeline,这点在后面讨论。Stage自己不关心输入的数据是来自FEED或前一阶段的EMIT()。
Pipeline也可发送相同或不同的数据到不同的分支(branch),使数据沿不同的处理路线流转。
- 通过Digester 或者 Spring 配置:有两种方法来配置Pipeline,都是是基于XML的控制文件。其中更简单的方法是使用Digester,Pipeline的最终用户可以修改该配置来满足自己的需求。 Spring框架也被用于配置Pipeline,但是由于他的结构与java编程对象更加接近,使得配置更复杂,同时也更灵活。通过XML配置文件的顺序来控制stage的顺序,以及stage的特殊参数都在该文件中进行设置。这些控制文件还允许以环境遍历的形式设置对所有stage可见全部参数,这种配置的方式使得完全不需要重新编译Java代码即可实现很多改变Pipeline布局和行为的需求。
本教程将介绍使用Digester配置的pipelines,因为这是比较简单的方法。
二、Stages使用
一个标准的stage都有一个队列来缓存输入的数据对象。当某些stage比其他stage具有不同吞吐量或不规则的处理速度时,排队是一种有效的处理手段,特别是那些数据依赖于网络连接或近线媒体的stage。这个队列并不是stage本身的一部分,而是由stage driver来管理,stage driver负责当数据准备好的时候,将其送入stage。stage将一个数据对象传给下一级stage的时候,它可能在一个队列中排队等待(按顺序接收),直到下一个stage准备好来处理它。通常情况下,每个stage运行一个单独的处理线程。当然,对于一些应用也可以配置pipelines,对于同一个对象一次处理的所以stage运行一个单一的线程,也就是说,除非前一个对象以及完成了所有stage。下一个对象不会开始处理。
Stages都继承自抽象类(org.apache.commons.pipeline.stage.BaseStage),有许多现有的stage可以直接使用,以满足各种加工要求。你还可以通过扩展BaseStage或其他现有的stage之一创建自定义的stage。
下图展示了各种类性和数量。
上图中stage说明:
- 通常所有进入一个stage的这些对象是同一类型的。避免在stage的代码中重复的写switch语句去区分对象,可以通过使用分支来隔离不同的对象类型。
- 一个数据对象输入stage并不总是产生一个输出对象。
- 终端stage不传递(EMIT)任何数据对象。应当避免创建这种类型的阶段,因为他们限制了你建造pipelines时的可能性。 (这很容易做到,将数据传递到下一个阶段只需要一行代码。)
- 传递(EMIT)对象到一个以上的后续stage的stage被称为分支stage。
- 如果stages传递的对象与他们解释的对象为同一类型的对象,仅仅是做选择条件过滤,被称为过滤stage。
- 通常都有读取数据的stage和写数据的stage,他们将数据读入pipeline或者pipeline作为的输出。
- 创造一个与传递给他们的不同的对象的stage被称为转换stage。
- 传递的(emitted)对象类型没有必要保持一致。
- 当碰到分支的时候,进入到不同stage中的对象不必是相同类型的,或有相同的量。注意上图中的“FileReader”stage对于每个到来的文件产生100单元的数据对象,但是通过只有一个边界形状被传递给给了分支。
其他注意事项(不一定是从上面的图中很明显):
- 虽然数据被传递道stage是Java对象的方式,但是stage接受到他们往往期待是一个更具体的数据类型,如文件或数据记录。通常对接受的对象进行检查,看它们是否是期望的数据类型的实例,然后在具体工作之前转换为该类性的对象。
- 您可以为你的pipeline的每个stage设置 stage driver。他们可以限制队列的大小来控制内存和资源使用。对于有界队列来说,上游stage将阻塞并等待,直到下游stage的队列有足够的空间。
StageDriver的作用
StageDriver是一个Java接口,他控制将数据提交(feeding)给stage和不同stages之间的通信。因此,stage的生命周期和不同stage之间的相互作用是非常依赖于通过这些stage drivers来控制。这些StageDriver的工厂实现StageDriverFactory接口。在pipeline初始化的时候,StageDrivers是由产生特定类型的StageDriver的工厂类提供。每个stage都会有其自己的 StageDriver的实例,并在pipeline内的不同stage可以使用不同类型的StageDrivers,虽然常见的情况是pipeline中所有stage使用相同类型StageDriver(全部共享相同的StageDriverFactory实现)。
下面是一些通用的StageDriver
DedicatedThreadStageDriver | 为每一个stage生成一个单独的线程,DedicatedThreadStageDriverFactory()提供 |
SynchronousStageDriver | 这个是非线程的StageDriverSynchronousStageDriverFactory()提供
|
ThreadPoolStageDriver | 使用一个线程池处理输入的对象,ThreadPoolStageDriverFactory()提供 |
本教程将介绍DedicatedThreadStageDriver,因为这是一个很好的通用驱动程序。某些时候你肯希望编写自己的StageDriver实现,这里没有涉及到一个高级的主题。
Stage内部解析
如果你需要编写自己的stage,本节给出了一些为了实现Stage接口你需要了解的方法的概述。
stage本身定义org.apache.commons.pipeline.Stage接口,它具有下列方法:
Stage Interface Methods
|
|
init(StageContext) | 关联stage和environment。在生命周期中运行一次。 |
preprocess() | 做任何必要的设置。在生命周期中运行一次。 |
process(Object) | 处理数据对象和传递结果到下一stage。跑N次,为每一个传递到其中的对象运行一次。 |
postprocess() | 处理汇总数据等,在生命周期中运行一次。 |
release() | 清理该stage持有的资源。在生命周期中运行一次。 |
一个可用的抽象类为org.apache.commons.pipeline.BaseStage,许多其它的stage都是从来衍生出来的。你可以扩展这个类或者在BaseStage之上建造的其他stage。它提供了stage接口的所有方法的无操作实现。您可以按照需求重写这些方法。对于简单的处理,你可能并不需要重写init(StageContext),postprocess(), nor release()方法 。你几乎只需要提供自己的process(Object)方法。从软件设计的角度来看,认为反转控制的,因为你不是在写一个自定义的主程序来调用标准的子程序,而是,写自定义的子程序通过一个标准的主程序调用。
BaseStage提供了一个emit(Object obj)方法和提交到分支的emit(String branch, Object obj) 方法来传递对象到下一个stage。因此,通常会在process()方法的结束的附近调用EMIT()。终端stage不调用EMIT(),所以没有对象被传递。这也很容易改变,通过添加EMIT()的代码,终端stage变成一个普通的stage。注意,一个stage提交一个数据对象没有后续stage使用它,这样提交的对象只是没有使用而已并没有什么坏处。有时EMIT()方法在postprocess中调用。当处理缓存,或输入和输出对象的汇总,则process()方法通常仅仅存储进入对象的信息,postprocess()完成了工作之后并提交一个新的对象。