Flink Task的数据交换机制概述

时间:2024-05-21 18:53:02

Flink的数据交换机制在设计时遵循两个基本原则:

  1. 数据交换的控制流(例如,为初始化数据交换而发出的消息)是由接收端发起的
  2. 数据交换的数据流(例如,在网络中实际传输的数据被抽象为IntermediateResult的概念)是可插拔的

数据交换也涉及到了一些角色,包括:

  1. JobManager:master节点,负责任务调度,异常恢复,任务协调,并且通过ExecutionGraph这样的数据结构来保存一个作业的全景图。
  2. TaskManagers:工作节点,负责将多个任务并行的在线程中执行,每个TM中包含一个CommunicationManager(在tasks之间共享)和一个MemoryManager (在tasks之间共享)。TM之间通过TCP连接来交互数据。

需要注意的是,在Flink中,数据交换是发生在TM之间的,而不是task之间,在同一个TM中的不同task会复用同一个网络连接。

        在一个TaskManager中可能会同时并行运行多个Task,每个Task都在单独的线程中运行。在不同的TaskManager中运行的Task之间进行数据传输要基于网络进行通信。实际上是TaskManager和另一个TaskManager之间通过网络进行通信,通信是基于Netty创建的标准的TCP连接,同一个TaskManager内运行的不同Task会复用网络连接。关于Flink的数据交换机制的具体流程,Flink的wiki中给出了一个比较详细的说明:

数据交换的控制流:

Flink Task的数据交换机制概述

        上图代表了一个简单的map-reduce类型的作业,有两个并行的任务,有两个TaskManager,每个TaskManager都分别运行一个mapTask和一个reduceTask。我们重点观察M1和R2这两个Task之间的数据传输的发起过程。数据传输用粗箭头表示,消息用细箭头表示。

  1. 首先,M1产出了一个ResultPartition(RP1)(箭头1)
  2. 当这个RP可以被消费时,会告知JobManager(箭头2)
  3. JobManager会通知想要接收这个RP分区数据的接收者(tasksR1andR2)当前分区数据已经准备好。如果接受方还没有被调度,这将会触发对应任务的部署(箭头3a,3b)
  4. 接着,接受方会从RP中请求数据(箭头4a,4b)
  5. 这将会初始化Task之间的数据传输(5a,5b),数据传输可能是本地的(5a),也可能是通过TaskManager的网络栈进行(5b)

        对于一个RP什么时候告知JobManager当前已经出于可用状态,在这个过程中是有充分的*度的:例如,如果在RP1在告知JM之前已经完整地产出了所有的数据(甚至可能写入了本地文件),那么相应的数据传输更类似于Batch的批交换;如果RP1在第一条记录产出时就告知JM,那么就是Streaming流交换。

字节缓冲区在两个Task之间的传输:

Flink Task的数据交换机制概述

上面这张图展示了一个细节更加丰富的流程,描述了一条数据记录从生产者传输到消费者的完整生命周期如下:

  1. 最初,MapDriver生成数据记录(通过Collector收集)并传递给RecordWriter对象。RecordWriter包含一组序列化器,每个消费数据的Task分别对应一个。ChannelSelector会选择一个或多个序列化器处理记录。例如,如果记录需要被广播,那么就会被交给每一个序列化器进行处理;如果记录是按照hash进行分区的,ChannelSelector会计算记录的哈希值,然后选择对应的序列化器。
  2. 序列化器会将记录序列化为二进制数据,并将其存放在固定大小的buffer中(一条记录可能需要跨越多个buffer)。这些buffer被交给BufferWriter处理,写入到ResulePartition(RP)中。RP有多个子分区(ResultSubpartitions-RSs)构成,每一个子分区都只收集特定消费者需要的数据。在上图中,需要被第二个reducer(在TaskManager2中)消费的记录被放在RS2中。由于第一个Buffer已经生成,RS2就变成可被消费的状态了(注意,这个行为实现了一个streaming shuffle),接着它通知JobManager。
  3. JobManager查找RS2的消费者,然后通知TaskManager2一个数据块已经可以访问了。通知TM2的消息会被发送到InputChannel,该inputchannel被认为是接收这个buffer的,接着通知RS2可以初始化一个网络传输了。然后,RS2通过TM1的网络栈请求该buffer,然后双方基于Netty准备进行数据传输。网络连接是在TaskManager(而非特定的task)之间长时间存在的。
  4. 一旦Buffer被TM2接收,它同样会经过一个类似的结构,起始于InputChannel,进入InputGate(它包含多个IC),最终进入一个反序列化器(RecordDeserializer),它会从buffer中将记录还原成指定类型的对象,然后将其传递给接收数据的Task。

Task的数据交换

        执行图是一个包含关于作业计算的“最基本事实”的数据结构。它由表示计算任务的顶点(ExecutionVertex EV)和表示任务生成的数据的中间结果(IntermediateResultPartition )组成。顶点通过执行边(EE)链接到它们消耗的中间结果:

Flink Task的数据交换机制概述

        这些是位于JobManager中的逻辑数据结构。在taskmanager上进行实际数据处理的时候,由它们各自的运行时等效结构来负责。与IntermediateResultPartition等价的运行时数据结构称为ResultPartition。

        ExecutionGraph 是 JobManager 中用于描述作业拓扑的一种逻辑上的数据结构,其中表示并行子任务的 ExecutionVertex 会被调度到 TaskManager 中执行,一个 Task 对应一个 ExecutionVertex。同 ExecutionVertex 的输出结果 IntermediateResultPartition 相对应的则是 ResultPartition。IntermediateResultPartition 可能会有多个 ExecutionEdge 作为消费者,那么在 Task 这里,ResultPartition 就会被拆分为多个 ResultSubpartition,下游每一个需要从当前 ResultPartition 消费数据的 Task 都会有一个专属的 ResultSubpartition。

        在 Task 中,InputGate是对输入的封装,InputGate 是和 JobGraph 中 JobEdge 一一对应的。也就是说,InputGate 实际上对应的是该 Task 依赖的上游算子(包含多个并行子任务),每个 InputGate 消费了一个或多个 ResultPartition。InputGate 由 InputChannel 构成,InputChannel 和ExecutionEdge 一一对应;也就是说, InputChannel 和 ResultSubpartition 一一相连,一个 InputChannel接收一个ResultSubpartition 的输出。根据读取的ResultSubpartition 的位置,InputChannel 有 LocalInputChannel 和 RemoteInputChannel 两种不同的实现。

数据交换机制的具体实现

        数据交换从本质上来说就是一个典型的生产者-消费者模型,上游算子生产数据到ResultPartition中,下游算子通过InputGate消费数据。由于不同的Task可能在同一个TaskManager中运行,也可能在不同的TaskManager中运行:对于前者,不同的Task其实就是同一个TaskManager进程中的不同的线程,它们的数据交换就是在本地不同线程间进行的;对于后者,必须要通过网络进行通信。下图所示分别为数据在一个taskmanager内的流转、以及在不同的taskmanager之间的流转:

Flink Task的数据交换机制概述

Flink Task的数据交换机制概述

 

最后详细介绍下载Task数据交换中遇到的组件的基本概念:

在Flink中涉及到数据交换机制的基本组件抽象概念如下;其主要是对Flink作业运行时产生的中间结果的概念抽象。

IntermediateDataset

  • IntermediateDataset是在JobGraph中对中间结果的抽象。我们知道,JobGraph是对StreamGraph进一步进行优化后得到的逻辑图,它尽量把可以chain到一起operator合并为一个JobVertex,而IntermediateDataset就表示一个JobVertex的输出结果。JobVertex的输入是JobEdge,而JobEdge可以看作是IntermediateDataset的消费者。一个JobVertex也可能产生多个IntermediateDataset。需要说明的一点是,目前一个IntermediateDataset实际上只会有一个JobEdge作为消费者,也就是说,一个JobVertex的下游有多少JobVertex需要依赖当前节点的数据,那么当前节点就有对应数量的IntermediateDataset。

IntermediateResult和IntermediateResultpartition

  • 在JobManager中,JobGraph被进一步转换成可以被调度的并行化版本的执行图,即ExecutionGraph。在ExecutionGraph中,和JobVertex对应的节点是ExecutionJobVertex,和IntermediateDataset对应的则是IntermediataResult。由于一个节点在实际运行时可能有多个并行子任务同时运行,所以ExecutionJobVertex按照并行度的设置被拆分为多个ExecutionVertex,每一个表示一个并行的子任务。同样的,一个IntermediataResult也会被拆分为多个IntermediateResultPartition,IntermediateResultPartition对应ExecutionVertex的输出结果。一个IntermediateDataset只有一个消费者,那么一个IntermediataResult也只会有一个消费者;但是到了IntermediateResultPartition这里,由于节点被拆分成了并行化的节点,所以一个IntermediateResultPartition可能会有多个ExecutionEdge作为消费者。

ResultPartition和ResultSubpartition

  • ExecutionGraph还是JobManager中用于描述作业拓扑的一种逻辑上的数据结构,其中表示并行子任务的ExecutionVertex会被调度到TaskManager中执行,一个Task对应一个ExecutionVertex。同ExecutionVertex的输出结果IntermediateResultPartition相对应的则是ResultPartition。IntermediateResultPartition可能会有多个ExecutionEdge作为消费者,那么在Task这里,ResultPartition就会被拆分为多个ResultSubpartition,下游每一个需要从当前ResultPartition消费数据的Task都会有一个专属的ResultSubpartition。
  • ResultPartitionType还可以指定ResultPartition的不同属性,这些属性包括是否流水线模式、是否会产生反压以及是否限制使用的Networkbuffer的数量。ResultPartitionType有三个枚举值:
    • BLOCKING:非流水线模式,无反压,不限制使用的网络缓冲的数量
    • PIPELINED:流水线模式,有反压,不限制使用的网络缓冲的数量
    • PIPELINED_BOUNDED:流水线模式,有反压,限制使用的网络缓冲的数量

      其中是否流水线模式这个属性会对消费行为产生很大的影响:如果是流水线模式,那么在ResultPartition接收到第一个Buffer时,消费者任务就可以进行准备消费;而如果非流水线模式,那么消费者将等到生产端任务生产完数据之后才进行消费。目前在Stream模式下使用的类型是PIPELINED_BOUNDED。

InputGate和InputChannel

  • 在Task中,InputGate是对输入的封装,InputGate是和JobGraph中JobEdge一一对应的。也就是说,InputGate实际上对应的是该Task依赖的上游算子(包含多个并行子任务),每个InputGate消费了一个或多个ResultPartition。InputGate由InputChannel构成,InputChannel和ExecutionGraph中的ExecutionEdge一一对应;也就是说,InputChannel和ResultSubpartition一一相连,一个InputChannel接收一个ResultSubpartition的输出(InputChannel在读取ResultSubpartition的输出的时候,会在ResultSubpartition的数据生产侧先创建对应的ResultSubpartitionView,该类用于读取ResultSubpartition的输出写入的Buffer,读取到对应的Buffer数据后,根据local、remote方式进行不同的网络数据传输)。根据读取的ResultSubpartition的位置,InputChannel有LocalInputChannel和RemoteInputChannel两种不同的实现。