Apache Beam开发指南

时间:2021-08-17 15:35:47

转摘

http://blog.csdn.net/blwinner/article/details/54908057


本指南用于指导Beam用户使用Beam SDK创建数据处理pipeline(pipeline)。本文会引导您用BeamSDK类构建和测试你的pipeline。本文不会详尽阐述所有内容,但可以看做一门未知的“编程语言”,引导您用编程的方式构建您的Beampipeline。随着开发指南内容愈加丰富,本文将会包含多种语言的示例代码,用于展示如何在您的pipeline中实现Beam概念。
1 概述
为了使用Beam,你首先需要使用BeamSDK中的类创建一个驱动程序(driver program)。你的驱动程序定义(define)了你的pipeline,包括所有的输入、传输、输出;它也用于设置pipeline的运行参数(execution options)(通常是从命令行获取到输入参数)。也包括Pipeline Runner的参数,反过来,它又定义了你的pipeline将运行在什么后台中。
BeamSDK提供了一组抽象概念简化了高可扩展分布式数据处理过程的结构。这些抽象概念可以同时作用于块数据和流数据源。当你创建Beampipeline时,你可以认为你的数据处理任务是在这些抽象概念中执行。抽象概念包括:
1)Pipeline:一个Pipeline封装了你全部的数据处理任务,从开始到结束。它包括了读取输入数据,传输数据,输出数据。所有的Beam驱动程序必须创建一个Pipeline。当你创建Pipeline时,你必须指定Pipeline运行环境和运行方式的参数。
2)PCollection:一个PCollection表述了一个分布式数据集(distribute data set),你的Beampipeline会操作它。数据集可以是有界的(bounded),意思是它可以来自于确切的源比如一个文件,或者是*的(unbounded),意思是它也可以来自于持续更新的源通过一个订阅或者其他途径。你的pipeline通常通过读取外部数据源的数据来创建一个初始的PCollection实例,不过你也可以在你的驱动程序内部用内存数据创建一个PCollection实例。从这里开始,PCollection对象将作为你pipeline中每一步操作的输入输出。
3)Transform:一个Transform代表了一个数据处理操作,也就是你pipeline中的“一步”(a step)。每一个Transform对象把一个或多个PCollection对象作为输入,执行一个你提供给PCollection成员的处理函数,然后产出一个或多个输出PCollection对象。
4)I/O Source和Sink:Beam提供Source和Sink API接口分别代表读取和写出数据。Source封装的代码用于从外部源读取数据到你的pipeline,比如云存储文件或者一个订阅的流数据源。Sink类似的封装了把PCollection数据写出到一个外部数据槽(sink)。
一个典型的Beam驱动程序工作内容如下:
1)创建一个Pipeline对象并设置pipeline的执行参数,包括Pipeline Runner的参数。
2)为pipeline数据创建一个初始的PCollection对象,用于通过SourceAPI读取外部数据源的数据,或者通过创建Transform用内存数据构建一个PCollection对象。
3)应用Transform对象到每一个PCollection对象。Transform对象可以对PCollection数据进行修改,过滤,分组,分析或者其他操作。一个Transform对象创建一个新的输出PCollection并需要输入PCollection。一个典型的pipeline随后反过来把Transform对象应用到每一个新的输出PCollection直到处理过程结束。
4)输出最后,传输PCollection对象,通常就是用Sink接口把数据写出到外部源。
5)用指定的Pipeline Runner运行(run)这个pipeline。
当你运行你的Beam驱动程序时,指定的Pipeline Runner会构造一个pipeline工作流程图(workflow graph),基于之前创建的PCollection对象和应用的Transform对象。该流程图执行合适的后台分布式处理过程,一个后台异步“job”(或等同于)。
2 创建pipeline对象
Pipeline抽象封装了数据处理任务所有的数据和执行步骤。你的Beam驱动程序通常起始于构建一个Pipeline对象,然后用该对象作为基础创建pipeline的数据集比如PCollection对象和操作比如Transform对象。
为了使用Beam,你的驱动程序首先必须创建一个BeamSDK类Pipeline的实例(通常在main函数中)。当你创建了你的pipeline对象,你还需要设置一些配置项。你可以在程序中设置配置项,不过通常提前设置(或从命令行读取)配置项会更简单,然后创建该对象时传递给它。
pipeline配置项定义了很多东西,PipelineRunner定义了pipeline在哪里执行:本地,或者分布式后台。基于pipeline在哪里执行和指定哪个Runner的要求,配置项可以帮你指定执行过程的其他方面。
为了设置pipeline的配置项并创建pipeline对象,可以创建一个类型为PipelineOptions的对象并传递给Pipeline.Create()。通常该操作是通过解析命令行参数进行的:
public static void main(String[] args) {
   // 通过解析传入应用的参数来构建一个PipelineOptions对象
   // 这里, --help 会打印注册项,即 --help=PipelineOptionsClassName
   // 会打印指定类的功能.
   PipelineOptions options =
       PipelineOptionsFactory.fromArgs(args).create();


   Pipeline p = Pipeline.create(options);
   
from apache_beam.utils.pipeline_options import PipelineOptions
BeamSDK包含了PipelineOptions各种子类用于不同的Runner。比如,DirectPipelineOptions包含的配置项用于Direct(本地的)Pipeline Runner,而DataflowPipelineOptions包含的配置项用于Google Cloud Dataflow。通过使用实现继承自类PipelineOptions的接口,你也可以定制自己的PipelineOptions。
3 使用PCollection对象
PCollection抽象表述了可分布式,多元数据集。你可以把一个PCollection对象当做“Pipeline”的数据;Beam的Transform对象使用PCollection作为输入输出。因此,如果你想在你的Pipeline中使用数据,你必须通过PCollection对象的形式。
当你创建完Pipeline后,你需要用某种方式创建至少一个PCollection对象。该对象将作为Pipeline第一个操作的输入。
3.1 创建一个PCollection
创建方式有两种:通过SourceAPI读取外部数据源的数据创建一个PCollection对象,或者通过驱动程序内存数据创建。区别在于pipeline对象怎么获取数据。SourceAPI包含的适配器可以帮助你从外部数据源读取大型云存储文件,数据库,或者内容订阅服务。后者测试和调试目的。
3.1.1 从外部源读取
为了从外部源读取,你需要使用一个Beam提供的I/O适配器。这些适配器在功能上各不相同,但他们都来自于外部数据源并返回一个PCollection对象,它代表了数据源中的数据记录。
每一个数据源适配器都有一个读(Read)Transform对象,为了读取,你必须把该对象应用到Pipeline对象自身。举例来说,TextIO.Read,从一个外部文本文档读取数据,并返回一个PCollection对象,它的元素类型是String,每个String元素代表了文档中的一行。把TextIO.Read用于Pipeline对象创建PCollection对象的代码如下:
public static void main(String[] args) {
    // Create the pipeline.
    PipelineOptions options = 
        PipelineOptionsFactory.fromArgs(args).create();
    Pipeline p = Pipeline.create(options);


    PCollection<String> lines = p.apply(
      "ReadMyFile", TextIO.Read.from("protocol://path/to/some/inputData.txt"));
}
BeamSDK支持的读取不同数据源可参看API文档中I/O部分。
3.1.2 从内存数据创建PCollection对象
为了从内存中Java Collection类创建PCollection对象,你可以使用Beam提供的Create类(一种Transform类)。类似于数据适配器的Read,你可以把Create直接应用到pipeline对象上。
Create接受Java Collection对象和Coder对象作为参数。Coder对象指定了Collection中元素是如何编码的。
举例来说,为了从内存中list对象创建PCollection对象,使用Beam提供的Create类的transform。然后把该transform直接应用到Pipeline对象上。
下面的代码演示了如何从内存List对象创建PCollection对象:
public static void main(String[] args) {
    // Create a Java Collection, in this case a List of Strings.
    static final List<String> LINES = Arrays.asList(
      "To be, or not to be: that is the question: ",
      "Whether 'tis nobler in the mind to suffer ",
      "The slings and arrows of outrageous fortune, ",
      "Or to take arms against a sea of troubles, ");


    // Create the pipeline.
    PipelineOptions options = 
        PipelineOptionsFactory.fromArgs(args).create();
    Pipeline p = Pipeline.create(options);


    // Apply Create, passing the list and the coder, to create the PCollection.
    p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())
}
3.2 PCollection的特性
一个PCollection对象被创建它的Pipeline对象所持有。多个Pipeline不会共享同一个PCollection。在某些方面,PCollection类的功能类似于一个Java Collection类。但是,两者有一些很关键的不同点:
1)元素类型:PCollection的元素可以是任意类型,但是所有元素必须类型相同。然而,为了支持分布式处理,Beam需要能把每个单独元素编码成一个比特流(因此元素可以在分布式工作者间传递)。BeamSDK提供了一个数据编码框架,包括通用数据类型的内建编码,和良好支持的订制编码。
2)不变性:一个PCollection对象是不变的。一旦创建,就不能再添加、移除、修改单个元素。一个Beam Transform对象可以处理每一个PCollection元素,然后生成新的Pipeline数据(即一个新的PCollection对象),但它不需要也不会修改原始输入集。
3)随机访问:PCollection不支持随机访问单个元素。相反,Beam Transform会单独访问PCollection中的每一个元素。
4)数据大小和边界性:PCollection是大型的不变的元素“包”。PCollection包含的元素数量没有上限。PCollection会在单机中占用适量的内存空间,或者代表依靠持久化存储的非常大规模的分布式数据集。
PCollection对象的大小可以是有界或*的。一个有界的PCollection对象代表了一个已知的、大小固定的数据集,而一个*的PCollection对象代表了一个无大小限制的数据集。PCollection的边界性依赖于它所代表的数据集。从块数据读取,比如一个文件或者一个数据库,则创建有界PCollection对象。从流或者持续刷新的数据源读取,比如Pub/Sub或Kafka,则创建*PCollection(除非你明确指明它不是*的)。
有界(或*)性质会影响Beam如何处理你的数据。一个有界PCollection可以用块任务(batch job)进行处理,它可以一次读取整个数据集,然后执行有限长度的处理任务。一个*PCollection用支持运行的流任务处理,就好像整个集合在任何时刻对处理过程都不会全部可用。
当对*PCollection中的一组元素执行操作时,Beam有一个分割持续刷新数据集的概念叫“分窗(Windowing)”,即逻辑上有限大小的窗口。Beam把每个窗口当做一个数据包,然后持续处理数据集产出的数据包。这些逻辑窗口由和时间元素相关的性质所决定,比如一个timestamp(时间戳)。
5)元素时间戳:每一个PCollection中的元素都有一个内联的时间戳。每个元素的时间戳都由创建PCollection的Source赋予初值。Source创建一个*PCollection,并把新元素读取或添加的时间赋予元素的时间戳。
注意:Source为固定大小数据集创建有界PCollection时,也会自动赋予时间戳,但通常习惯是给每个元素赋予相同的时间戳(Long.MIN_VALUE)。
时间戳用于天然带有时间性的PCollection元素时非常有用。如果你的Pipeline读取一个事件流,比如Tweete或其他设计媒体消息,每个元素可以把事件发出的时间当做元素时间戳。
如果Source没有给PCollection元素赋值时间,你也可以手动赋值。当元素天然具有时间戳时,你可能希望这么做,但是时间戳是在元素结构内部的某个角落(比如位于服务器日志入口的“time”属性)。
Beam的Transform对象把一个PCollection作为输入并输出带时间戳的相同的PCollection。
4 应用Transform
在BeamSDK中,Transform代表了Pipeline中的操作。一个Transform对象把一个PCollection对象(或者不止一个PCollection对象)当做输入,对集合中每个元素执行指定的操作,生成一个新的输出PCollection对象。为了调用一个transform,你必须把它应用在输入PCollection中。
在BeamSDK中,每个transform类都有一个通用apply()方法(或者管道操作符"|")。调用多个Beam transform操作就如同调用方法链,不过有一点小小的不同:你把transform用于输入PCollection,则transform操作本身被当做参数,该操作返回输出PCollection。下面是常用代码格式:
[Output PCollection] = [Input PCollection].apply([Transform])
因为Beam把通用apply方法用于PCollection,你可以有序链接transform操作,也可以在transform过程中加入其他transform(称之为"复合transform(composite transforms)")。
如何应用pipeline的transform决定了pipeline的结构。最好把你的pipeline理解为一个有向无环图,图的节点是PCollection,边是transform操作。举例来说,你可以链接transform操作来创建一个有序pipeline,如下:
[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
.apply([Second Transform])
.apply([Third Transform])
上面的pipeline最终工作流图像这样:
[Sequential Graph Graphic]
但是,注意,一个transform操作不消耗或修改输入集合————要记住,PCollection被定义为不可变的。这意味着,你可以把多个transform操作应用到同一个输入PCollection从而创建一个pipeline分支,像这样:
[Output PCollection 1] = [Input PCollection].apply([Transform 1])
[Output PCollection 2] = [Input PCollection].apply([Transform 2])
上面的分支pipeline的最终工作流图是这样:
[Branching Graph Graphic]
你可以创建自己的复合transform操作,在一个大型transform中加入多个子步骤。复合transform
非常有用,特别是在多个地方使用可重复有序执行的简单步骤。
4.1 BeamSDK中的Transform操作
BeamSDK中的Transform操作提供了一个通用处理框架,你可以用函数对象的方式提供处理逻辑(我们常用“用户代码”称呼它)。用户代码被用在输入的PCollection元素。用户代码实例在集群中被多个不同工作者并行执行,并依赖于你选择执行pipeline的pipeline runner和后台。每个工作者运行的用户代码生成的输出元素,最终都被添加到Transform操作生成的输出PCollection中。
4.2 核心Beam transform操作
Beam提供了下面的transform操作,每一个代表了一个不同的处理范式:
1)ParDo
2)GroupByKey
3)Combine
4)Flatten and Partition
4.2.1 ParDo
ParDo用于通用的并行处理过程。ParDo处理范式类似于Map/Shuffle/Reduce模型的“Map"过程:一个ParDo transform操作处理输入PCollection中每个元素,执行一些处理函数(用户代码),发生0个,1个,或多个元素到一个输出PCollection。
ParDo在多种常见数据处理过程中非常有用,包括:
1)过滤数据集:你可以访问PCollection中每个元素,决定输出它到新的集合或者丢弃它。
2)格式化或类型转换集合中每个元素:如果你的输入数据元素有不同的你不想要的类型或格式,你可以用ParDo对每个元素执行转换操作并把结果输出到新的PCollection。
3)提取数据集的一部分:如果你的数据记录包含多个域,你可以把你关心的域解析出来输出到一个新的PCollection。
4)对数据集每个元素执行计算:你可以用ParDo执行简单的或复杂的计算,对每个元素或某些元素,然后输出结果到一个新的PCollection。
在这些规则中,ParDo是一个管道的通用中间步骤。你可以用它提取原始记录集的一部分域,或者把原始输入转变为不同的格式。你也可以用ParDo把待处理数据转换为输出需要的格式,比如数据库表行格式或可打印字符串。
当你使用ParDo操作,你需要用DoFn对象的方式提供用户代码。Dofn是一个BeamSDK类,定义了一个分布式处理的功能。
注意:当你创建一个DoFn的子类时,你的子类必须依照“Beam Transform中编写用户代码的规范”。
使用ParDo
和所有Beam transform相似,在输入PCollection上调用apply方法并把ParDo作为参数,就像下面的例子这样:
// The input PCollection of Strings.
PCollection<String> words = ...;


// The DoFn to perform on each element in the input PCollection.
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }


// Apply a ParDo to the PCollection "words" to compute lengths for each word.
PCollection<Integer> wordLengths = words.apply(
    ParDo
    .of(new ComputeWordLengthFn()));        // The DoFn to perform on each element, which
                                            // we define above.
在这个例子中,我们输入的PCollection包含的是String值。我们应用ParDo并指定一个函数(ComputeWordLengthFn)来计算每个string对象的长度,然后输出的PCollection包含的是Integer值,保存了每个单词的长度。
创建一个DoFn
你传递给ParDo的DoFn对象,包含了处理输入集合元素的业务逻辑。当你使用Beam时,通常你的代码最重要的部分就是实现DoFn————他们定义了你的管道实际处理数据的任务是什么。
注意:当你创建你的DoFn时,要谨记“Beam Transform中编写用户代码的规范”,并确保你的代码严格遵照规范。
DoFn每次处理输入PCollection集合的一个元素。当你新建一个DoFn子类,你需要提供参数类型并匹配输入输出元素。如果你的DoFn处理过程输入String元素并产生Integer元素到输出集合(像前面的例子那样),你的类申明应该像这样:
static class ComputeWordLengthFn extends DoFn<String, Integer> { ... }
在你的DoFn子类内部,你需要写一个方法,它带有@ProcessElement注解,提供实际的处理逻辑。你不许手动从输入集合提取数据。BeamSDK会为你处理。你的@ProcessElement方法需要接受一个类型为ProcessContext的对象。该对象允许你访问一个输入元素,并给你一个输出元素的方法。
static class ComputeWordLengthFn extends DoFn<String, Integer> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    // Get the input element from ProcessContext.
    String word = c.element();
    // Use ProcessContext.output to emit the output element.
    c.output(word.length());
  }
}
注意:如果输入PCollection集合中的元素是键值对,你可以通过ProcessContext.element().getKey()访问键或通过ProcessContext.element().getValue()访问值。
一个DoFn实例经常会被多次调用来处理任意数量的元素。但是,Beam并不保证确切的调用次数;它可能因为失败和重试,在一个工作者节点调用多次。同样的,你也可以通过多次调用处理方法来缓存信息,但是如果你这样做了,你需要确保你的实现不依赖于调用次数。
在你的处理方法中,你也需要遵守不变性要求,以确保Beam和后台处理过程可以安全的序列化和缓存值到管道中。你的方法需要遵循下面的要求:
1)你不能用任何方法修改ProcessContext.element() or ProcessContext.sideInput()的返回元素(后者是的输入元素来自输入集合)。
2)一旦你用ProcessContext.output() or ProcessContext.sideOutput()输出一个值,你不能再用任何方法修改它。
轻量级DoFn和其他抽象概念
如果你的函数关系很明确,你可以通过内联一个轻量级DoFn来使用ParDo,如同一个内部匿名类实例。
下面是之前例子的修改,DoFn被定义为一个匿名内部类:
// The input PCollection.
PCollection<String> words = ...;


// Apply a ParDo with an anonymous DoFn to the PCollection words.
// Save the result as the PCollection wordLengths.
PCollection<Integer> wordLengths = words.apply(
  "ComputeWordLengths",                     // the transform name
  ParDo.of(new DoFn<String, Integer>() {    // a DoFn as an anonymous inner class instance
      @ProcessElement
      public void processElement(ProcessContext c) {
        c.output(c.element().length());
      }
    }));
如果你的ParDo执行从输入到输出元素一对一的映射————也即,对每一个输入元素,应用方法后只生成唯一的输出元素,你可以使用更高层的MapElements Map transform操作。MapElements 可以接受一个Java8中匿名lambda函数简化流程:
// The input PCollection.
PCollection<String> words = ...;


// Apply a MapElements with an anonymous lambda function to the PCollection words.
// Save the result as the PCollection wordLengths.
PCollection<Integer> wordLengths = words.apply(
  MapElements.via((String word) -> word.length())
      .withOutputType(new TypeDescriptor<Integer>() {});
注意:你可以在其他Beam transform操作中用lambda表达式,包括Filter, FlatMapElements, 和Partition.
4.2.2 使用GroupByKey
GroupByKey是一个用来处理键值对集合的Beam transform操作。它是一个并行收缩操作,类似于
Map/Shuffle/Reduce模型的“Shuffle”过程。GroupByKey的输入是一个键值对集合代表了一个multimap集合,该集合包含多对同键不同值。给定这样一个集合,你可以用GroupByKey收集所有唯一键关联的值。
GroupByKey通常是聚合数据的好方法。举例来说,如果你有一个集合保存了用户订单记录,你可能希望通过邮政编码分组聚合所有的订单(这里“key”就是邮政编码,“value”就是其他记录)。
让我们在一个简单的示例中观察GroupByKey的结构,我们的数据集由一个文本文档的单词组成,并显示它们所在的行号。我们期望所有的行号(value)共享同一个单词(key),让我们可以看到一个具体的单词在文档中出现的所有位置。
我们的输入是一个键值对的PCollection,每个单词是一个键,值是单词在文档中所在的行号。下面是输入集合中的一部分键值对:
cat, 1
dog, 5
and, 1
jump, 3
tree, 2
cat, 5
dog, 2
and, 2
cat, 9
and, 6
...
GroupByKey收集了同一个key的所有value,输出一个新的键值对,键仍然是这个唯一key,而值是输入集合中和该键关联的所以值的集合。如果我们把GroupByKey应用到我们上面的输入集合中,则输出结果类似于:
cat, [1,5,9]
dog, [5,2]
and, [1,2,6]
jump, [3]
tree, [2]
...
因而,GroupByKey代表了一个mulitmap(多个键对应单独值)到一个uni-map(唯一键对应值集合)的transform操作。
4.2.3 使用Combine
Combine是一个组合数据中元素或值的Beam transform操作。Combine在整个PCollection上产生作用,有些combine操作组合PCollection键值对集合中每个key的所有value。
当你使用Combine操作时,你必须提供函数来执行组合元素或值的逻辑。这个组合函数必须是可替换和有关联的,也就是这个函数不需要在给定key的所有value上都调用一次。因为输入数据(包括value集合)可能是分布在多个工作者中,组合函数可能调用多次,每次对value集合的子集执行部分组合操作。BeamSDK也提供了一些预置的组合函数用于通用数字组合操作,比如sum,min和max。
普通的组合操作,比如sum,一般可以用普通函数实现。更多的复杂组合操作需要你创建CombinFn子类,它有一个累积类型,不同于输入/输出类型。
简单方法实现的简单组合:
// Sum a collection of Integer values. The function SumInts implements the interface SerializableFunction.
public static class SumInts implements SerializableFunction<Iterable<Integer>, Integer> {
  @Override
  public Integer apply(Iterable<Integer> input) {
    int sum = 0;
    for (int item : input) {
      sum += item;
    }
    return sum;
  }
}
使用CombinFn实现高级组合
对于更复杂的组合功能,你可以定义一个CombinFn的子类来实现。你需要使用CombinFn来实现组合功能要求的更复杂的累加器,必须执行额外的预处理,可能会改变输出类型,或者把key纳入计算。
一个通用组合功能由四个操作构成。当你创建一个CombinFn的子类时,你必须通过覆写对应的方法来提供这四个操作:
1)创建累加器:创建一个新的“本地化”累加器。在下面的例子中,就是求平均值,一个本地累加器跟踪值的累加结果(就是最终求平均值时的分子),并且统计当前已累加值的数量(也就是分母)。这在分布式环境中可能会调用多次。
2)添加输入:给累加器添加一个输入元素,返回累加的结果。在我们的示例中,它会更新sum的值并递增count。这也可能会被并行调用。
3)合并累加器:合并这些累加器到一个累加器,也就是多个累加器的数据如何在最终计算前进行组合。在求平均值计算的例子中,每一部分参与除法的累加器被合并在一起。这可能在输出上调用多次。
4)提取输出:执行最终计算。在计算平均值的例子中,用组合的所有值的累加除以参与累加的值的数量。这在最后合并的累加器上调用一次。
下面的例子演示了如何定义一个CombinFn来计算一个平均值:
public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
  public static class Accum {
    int sum = 0;
    int count = 0;
  }


  @Override
  public Accum createAccumulator() { return new Accum(); }


  @Override
  public Accum addInput(Accum accum, Integer input) {
      accum.sum += input;
      accum.count++;
      return accum;
  }


  @Override
  public Accum mergeAccumulators(Iterable<Accum> accums) {
    Accum merged = createAccumulator();
    for (Accum accum : accums) {
      merged.sum += accum.sum;
      merged.count += accum.count;
    }
    return merged;
  }


  @Override
  public Double extractOutput(Accum accum) {
    return ((double) accum.sum) / accum.count;
  }
}
如果你组合一个键值对的PCollection集合,“预定义键值组合”一般够用了。如果你想基于键来改变组合策略(比如,MIN或MAX操作),你可以定义一个KeyedCombineFn用来在组合策略中访问key。
组合一个PCollection集合为一个单独值
使用全局组合把一个给定的PCollection集合的转换为一个单独值,在你的管道中就好像一个PCollection只有一个元素。下面的例子演示了如何应用Beam提供的sum组合功能来生成一个单独的累加值,输出一个整型PCollection集合:
// Sum.SumIntegerFn() combines the elements in the input PCollection.
// The resulting PCollection, called sum, contains one value: the sum of all the elements in the input PCollection.
PCollection<Integer> pc = ...;
PCollection<Integer> sum = pc.apply(
   Combine.globally(new Sum.SumIntegerFn()));
全局分窗:如果你输入PCollection集合使用了默认的全局分窗,那么默认行为是返回一个包含一个元素的PCollection。这个元素的值来自于组合函数的累加器,也就是你使用Combine时指定的组合函数。比如,Beam提供的sum组合函数默认返回了一个零值(空输入的累加和),而最小值组合函数会返回一个最大值或无限值。
为了不让Combine在输入为空时返回空的PCollection集合,你可以在使用Combine时指定withoutDefaults(),如下所示:
PCollection<Integer> pc = ...;
PCollection<Integer> sum = pc.apply(
  Combine.globally(new Sum.SumIntegerFn()).withoutDefaults());
非全局分窗:如果你的PCollection集合使用了任何非全局分窗功能,Beam将不再提供默认行为。你在使用Combine时必须指定下列配置项之一:
1)指定withoutDefaults(),默认是输入PCollection集合的窗口是空的,则输出PCollection集合同样是空的。
2)指定asSingletonView(),输出会立即转换为一个PCollectionView对象,它为每个当做输入的空窗口提供了默认值。一般你需要这个选项的情况是,你的管道的Combine操作结果在另一个管道被当做输入。
在按键分组集合中组合值
在创建完一个按键分组的集合(比如,通过使用GroupByKey转换)后,一个常见操作是组合每个键关联的值到一个独立合并的值。借鉴前面GroupByKey的例子,命名为groupedWords的按键分组PCollection集合如下:
cat, [1,5,9]
dog, [5,2]
and, [1,2,6]
jump, [3]
tree, [2]
...
在上面的PCollection集合中,每个元素有一个字符串关键字(比如“cat”),和一个可迭代的整型值(在第一个元素中就是[1,5,9])。如果我们接下来的步骤是聚合这些值(而不是单独考虑每个值),你可以组合这些可迭代整型值为一个独立聚合值,并和每个关键字组成键值对。使用合并value集合的GroupByKey模型等同于按键(PerKey)Combine转换。你提供给Combine PerKey的聚合函数必须是一个关系收缩函数或一个CombinFn的子类:
// PCollection is grouped by key and the Double values associated with each key are combined into a Double.
PCollection<KV<String, Double>> salesRecords = ...;
PCollection<KV<String, Double>> totalSalesPerPerson =
  salesRecords.apply(Combine.<String, Double, Double>perKey(
    new Sum.SumDoubleFn()));


// The combined value is of a different type than the original collection of values per key.
// PCollection has keys of type String and values of type Integer, and the combined value is a Double.


PCollection<KV<String, Integer>> playerAccuracy = ...;
PCollection<KV<String, Double>> avgAccuracyPerPlayer =
  playerAccuracy.apply(Combine.<String, Integer, Double>perKey(
    new MeanInts())));
4.2.4 使用Flatten和Partition
Flatten和Partition用于转换存储相同数据类型的PCollection集合。Flatten合并多个PCollection集合对象为一个逻辑PCollection对象,而Partition把一个独立PCollection对象分割为确切数量的较小的集合。
Flatten
下面的例子演示了如何用Flatten转换来聚合多个PCollection对象:
// Flatten takes a PCollectionList of PCollection objects of a given type.
// Returns a single PCollection that contains all of the elements in the PCollection objects in that list.
PCollection<String> pc1 = ...;
PCollection<String> pc2 = ...;
PCollection<String> pc3 = ...;
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);


PCollection<String> merged = collections.apply(Flatten.<String>pCollections());
聚合后集合数据的编码方式:
默认情况下,输出PCollection集合的编码器和输入PCollectionList的第一个PCollection集合编码器相同。可是,输入PCollection对象每个都可能有不同的编码器,不过只要他们在你选择的开发语言中的数据类型相同即可。
合并分窗集合:
如果使用Flatten合并的PCollection集合对象有分窗策略,那么所有要合并的PCollection集合对象必须使用兼容的分窗策略和窗口大小。举例来说,所以你要合并的集合必须全部(假设)使用相同的每5分钟固定窗口或每30秒滑动4分钟窗口。
如果你的管道尝试把Flatten用在合并窗口不兼容的PCollection集合中,Beam会在管道创建时产生一个IllegalStateException 错误。
Partition
Partition按照你提供的分块函数划分PCollection集合的元素。分块函数的逻辑定义了如何划分输入PCollection集合元素到每个部分结果PCollection集合中。划分数量必须在构造时就指定。举例来说,当运行时(也就是构建管道图时),你可以在命令行输入划分数量,但你不能在管道中期定义分块数。
下面的例子按百分比分组一个PCollection集合:
// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the partitioning function.
// In this example, we define the PartitionFn in-line.
// Returns a PCollectionList containing each of the resulting partitions as individual PCollection objects.
PCollection<Student> students = ...;
// Split students up into 10 partitions, by percentile:
PCollectionList<Student> studentsByPercentile =
    students.apply(Partition.of(10, new PartitionFn<Student>() {
        public int partitionFor(Student student, int numPartitions) {
            return student.getPercentile()  // 0..99
                 * numPartitions / 100;
        }}));


// You can extract each partition from the PCollectionList using the get method, as follows:
PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);
4.3 编写Beam transform用户代码的规范
当你为一个Beam转换编写用户代码时,你必须牢记分布执行性。举例来说,可能有很多份你的函数的拷贝,并行运行在很多不同机器上,而这些拷贝之间没有依赖性,没有交互或共享状态。依赖于你选择的Pipeline Runner和运行后台,用户代码的每份拷贝可能被重试或运行多次。同样的,你需要十分小心用户代码间的状态依赖等状况。
通常来讲,用户代码必须至少满足下列要求:
1)你的功能对象必须是可序列化的。
2)你的功能对象必须是线程兼容的,而且要意识到BeamSDK不是线程安全的。
另外,建议你的功能对象是幂等的(idempotent)。
注意:这样要求应用在Dofn子类(用于ParDo转换的功能对象),CombinFn子类(用于Combine转换的功能对象),WindowFn(用于Window转换的功能对象)。
4.3.1 可序列化
你提供给transform操作的任何功能对象都必须是可完全序列化的。因为在你的处理集群中,功能的拷贝会被序列化并发往远程工作者。用户代码的基类,包括DoFn,CombineFn和WindowFn,甚至包括实现的Serializable接口;无论如何,你的子类不能添加任何非可序列化成员。
下面是其他一些你需要牢记的序列化因素:
1)你的功能对象的瞬时状态域不会被发往工作者实例,因为他们不能自动序列化。
2)避免在序列化前加载包含某个域的大量数据。
3)你的功能对象的单个实例不能共享数据。
4)一个功能对象在使用以后被修改是不起作用的。
5)用匿名内联类实例申明你的功能对象时一定要小心。在非静态环境中,你的内联类实例将隐式包含一个指针,指向该闭合类和该类的状态。该闭合类也应该是可序列化的,因而同样的考虑需要用在功能函数自身和用到它的外部类。
4.3.2 线程兼容性
你的功能对象必须是线程兼容的。功能对象的每一个实例只被一个工作者实例访问,除非你显示创建了自己的线程。注意,无论何时,BeamSDK都是线程不安全的。如果你在用户代码中创建了自己的线程,你必须自己进行同步操作。注意,功能对象的静态成员不会被传递给工作者实例,而且多个功能实例会被不同的线程访问。
4.3.3 幂等
建议你的功能对象是幂等的————也就是说,它可以重复或重试多次而不会产生意外结果。Beam模型不保证你的代码被调用或重试的次数;同样的,保持功能对象幂等性就是保持管道输出的确定性,因而你的transform操作行为会更可预测也更易于调试。
5 侧面输入和侧面出端
5.1 侧面输入
除了主输入PCollection集合外,你还可以提供附加输入到一个ParDo转换操作(按照侧面输入的格式)。一个侧面输入是一个附加输入,你的DoFn每次处理一个主输入PCollection集合元素时都可以访问它。当你指定了一个侧面输入,你就创建了一些其他数据的视图,可以读取自ParDo转换内部的DoFn函数处理每个元素时。
当你的ParDo需要在处理输入集合元素时注入附加数据时,侧面输入会非常有用,但是附加数据需要在运行时定义(而且不能是硬编码)。这些值被输入数据定义,或者依赖于管道的不同块。
示例:
// Pass side inputs to your ParDo transform by invoking .withSideInputs.
  // Inside your DoFn, access the side input by using the method DoFn.ProcessContext.sideInput.


  // The input PCollection to ParDo.
  PCollection<String> words = ...;


  // A PCollection of word lengths that we'll combine into a single value.
  PCollection<Integer> wordLengths = ...; // Singleton PCollection


  // Create a singleton PCollectionView from wordLengths using Combine.globally and View.asSingleton.
  final PCollectionView<Integer> maxWordLengthCutOffView =
     wordLengths.apply(Combine.globally(new Max.MaxIntFn()).asSingletonView());




  // Apply a ParDo that takes maxWordLengthCutOffView as a side input.
  PCollection<String> wordsBelowCutOff =
  words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
                    .of(new DoFn<String, String>() {
      public void processElement(ProcessContext c) {
        String word = c.element();
        // In our DoFn, access the side input.
        int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
        if (word.length() <= lengthCutOff) {
          c.output(word);
        }
  }}));
侧面输入和分窗:
一个可分窗的PCollection集合可能是无限大的,所以不能压缩为单值(或单集合类对象)。当你为可分窗PCollection集合创建了一个PCollectionView,这个PCollectionView就代表了每个窗口的一个独立实体(每个窗口一个单例实体,每个窗口一个列表实体,等等)。
Beam使用主输入元素窗口为侧面输入元素检索合适的窗口。Beam把主输入元素的窗口投影到侧面输入窗口配置,然后用结果窗口作为侧面输入。如果主输入和侧面输入有完全相同的窗口,那么投射操作就提供了相对精确的窗口。但是,如果输入有不同的窗口,Beam会用投影来选择最合适的侧面输入窗口。
比如,如果主输入用固定一分钟时长分窗,侧面输入用固定一小时时长分窗,Beam投影主输入窗口来代替侧面输入窗口的设置,然后从适当的一小时长侧面输入窗口获取侧面输入数据。
如果主输入元素有不止一个窗口,那么processElement会被调用多次,每个窗口一次。每次调用processElement都会投影主输入元素“当前”窗口,并且因此可能每次产生不同的侧面输入视图。
如果侧面输入有多个触发点,Beam使用最近的触发点。如果你用的侧面输入只有一个全局窗口,并且指定了一个触发器,那么该操作会很有用。
侧面输出
一般ParDo会一直生成一个主输出PCollection集合(即apply的返回值),不过你也可以让ParDo生成任意数量的附带输出PCollection集合。如果你选择输出多个集合,ParDo会捆绑返回所有的输出PCollection集合(包括主输出)。
示例:
// To emit elements to a side output PCollection, create a TupleTag object to identify each collection that your ParDo produces.
// For example, if your ParDo produces three output PCollections (the main output and two side outputs), you must create three TupleTags.
// The following example code shows how to create TupleTags for a ParDo with a main output and two side outputs:


  // Input PCollection to our ParDo.
  PCollection<String> words = ...;


  // The ParDo will filter words whose length is below a cutoff and add them to
  // the main ouput PCollection<String>.
  // If a word is above the cutoff, the ParDo will add the word length to a side output
  // PCollection<Integer>.
  // If a word starts with the string "MARKER", the ParDo will add that word to a different
  // side output PCollection<String>.
  final int wordLengthCutOff = 10;


  // Create the TupleTags for the main and side outputs.
  // Main output.
  final TupleTag<String> wordsBelowCutOffTag =
      new TupleTag<String>(){};
  // Word lengths side output.
  final TupleTag<Integer> wordLengthsAboveCutOffTag =
      new TupleTag<Integer>(){};
  // "MARKER" words side output.
  final TupleTag<String> markedWordsTag =
      new TupleTag<String>(){};


// Passing Output Tags to ParDo:
// After you specify the TupleTags for each of your ParDo outputs, pass the tags to your ParDo by invoking .withOutputTags.
// You pass the tag for the main output first, and then the tags for any side outputs in a TupleTagList.
// Building on our previous example, we pass the three TupleTags (one for the main output and two for the side outputs) to our ParDo.
// Note that all of the outputs (including the main output PCollection) are bundled into the returned PCollectionTuple.


  PCollectionTuple results =
      words.apply(
          ParDo
          // Specify the tag for the main output, wordsBelowCutoffTag.
          .withOutputTags(wordsBelowCutOffTag,
          // Specify the tags for the two side outputs as a TupleTagList.
                          TupleTagList.of(wordLengthsAboveCutOffTag)
                                      .and(markedWordsTag))
          .of(new DoFn<String, String>() {
            // DoFn continues here.
            ...
          }
DoFn函数:
// Inside your ParDo's DoFn, you can emit an element to a side output by using the method ProcessContext.sideOutput.
// Pass the appropriate TupleTag for the target side output collection when you call ProcessContext.sideOutput.
// After your ParDo, extract the resulting main and side output PCollections from the returned PCollectionTuple.
// Based on the previous example, this shows the DoFn emitting to the main and side outputs.


  .of(new DoFn<String, String>() {
     public void processElement(ProcessContext c) {
       String word = c.element();
       if (word.length() <= wordLengthCutOff) {
         // Emit this short word to the main output.
         c.output(word);
       } else {
         // Emit this long word's length to a side output.
         c.sideOutput(wordLengthsAboveCutOffTag, word.length());
       }
       if (word.startsWith("MARKER")) {
         // Emit this word to a different side output.
         c.sideOutput(markedWordsTag, word);
       }
     }}));
5 Pipeline I/O
当你创建一个管道时,你通常需要从外部源读取数据,比如外部数据槽中的文件或者一个数据库。同样的,你可能想让你的管道输出结果数据到类似的外部数据槽。Beam为一些通用数据存储类型提供了读写转换(transform)操作。如果想让管道读写内置转换不支持的数据存储格式,可以实现自制的读写转换。
注意:关于如何实现自制Beam IO转换的指南正在编写中。。。
5.1 读取输入数据
读取转换操作从外部源读取数据,然后管道使用的PCollection数据集合。在构造管道并创建一个新PCollection集合的过程中,随时可以进行读取转换操作,虽然它通常是在管道启动以后。
使用读取转换示例:
PCollection<String> lines = p.apply(TextIO.Read.from("gs://some/inputData.txt"));  
5.2 写出输出数据
写出转换操作把一个PCollection数据集合写入一个外部数据源。通常是在管道结束时,用写出操作输出管道的最终结果。不过,你也可以在管道的任意时间执行写出操作输出一个PCollection集合的数据。
示例:
output.apply(TextIO.Write.to("gs://some/outputData"));
5.3 基于文件的输入输出数据
5.3.1 从多个位置读取
很多读取转换操作都支持读取多个输入文件,文件名匹配某种占位符。占位符是由文件系统定义的,遵从文件系统定义的一致性模型。下面的TextIO实例使用了一种占位操作(*)来读取所有在指定位置匹配的输入文件,文件名用“input-”开头,“.csv”结尾:
p.apply(“ReadFromText”,
    TextIO.Read.from("protocol://my_bucket/path/to/input-*.csv");
为了从不同数据源读取数据到一个PCollection集合中,要求读取过程独立,然后用Flatten操作创建一个PCollection集合。
5.3.2 写出到多个输出文件
在基于文件输出数据时,写出转换默认写出到多个输出文件。当你传递一个文件名给一个输出操作时,文件名被用于所有写出操作生成的输出文件的前缀。你可以通过指定后缀为每个输出文件添加后缀。
下面的写出操作示例写出多个输出文件到一个地址。每个文件前缀是“numbers”,一个数字的标识,后缀是“.csv”:
records.apply("WriteToText",
    TextIO.Write.to("protocol://my_bucket/path/to/numbers")
                .withSuffix(".csv"));
5.4 Beam提供的I/O API
具体参见API文档。
File-based:AvroIO,HDFS,TextTIO,XML
Messaging:JMS,Kafka,Kinesis,Google Cloud PubSub
Database:MongoDB,JDBC,Google BigQuery,Google Cloud Bigtable, Google Cloud Datastore
6 运行管道
运行管道需要使用run方法。程序会发送管道的详细说明到一个管道runner,后者会构造运行管道的一系列实际操作。管道默认是异步执行的:
pipeline.run();
如果想阻塞执行,可以加上waitUntilFinish方法:
pipeline.run().waitUntilFinish();